From e99002329edbb6e2e1d4a8f359037e11610ca1ee Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 6 Mar 2024 23:51:53 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/populate_opensearch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/populate_opensearch.py b/airflow/dags/populate_opensearch.py index 0495fa4..85fed69 100644 --- a/airflow/dags/populate_opensearch.py +++ b/airflow/dags/populate_opensearch.py @@ -48,9 +48,6 @@ DAG_ID = "populate_opensearch" @task def bulk_load(): - hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) - - keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/') session = requests.Session() session.auth = ("admin", "admin") @@ -63,11 +60,14 @@ def bulk_load(): }), headers={"Content-Type": "application/json"}, verify=False) - + hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) + keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/') for key in keys: + print(key) s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: for line in gzipfile: + print(line) session.post("https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/organization_index/_doc", data=line, headers={"Content-Type": "application/json"},