From 602fedc6cb63e2df811ce4046421cff0fad33f91 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 14 Mar 2024 22:27:51 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/skg_if_pipeline.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 3b3a21b..561676a 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -73,11 +73,11 @@ def skg_if_pipeline(): session = requests.Session() session.auth = (OPENSEARCH_USER, OPENSEARCH_PASSWD) - response = session.delete(f'${OPENSEARCH_URL}/{entity}_index', + response = session.delete(f'{OPENSEARCH_URL}/{entity}_index', verify=False) response.raise_for_status() - response = session.put(f'${OPENSEARCH_URL}/{entity}_index', + response = session.put(f'{OPENSEARCH_URL}/{entity}_index', data=json.dumps({ "settings": { "index": { @@ -107,14 +107,14 @@ def skg_if_pipeline(): for line in buff: data = json.loads(line) response = session.post( - f'${OPENSEARCH_URL}/{entity}_index/_doc/' + requests.utils.quote( + f'{OPENSEARCH_URL}/{entity}_index/_doc/' + requests.utils.quote( data['local_identifier'], safe='') + "?refresh=false", data=json.dumps(data), headers={"Content-Type": "application/json"}, verify=False) response.raise_for_status() response = session.post( - f'${OPENSEARCH_URL}/{entity}_index/_refresh', + f'{OPENSEARCH_URL}/{entity}_index/_refresh', verify=False) response.raise_for_status()