diff --git a/airflow/dags/import_EOSC_catalog.py b/airflow/dags/import_EOSC_catalog.py index 24ed2b5..ea0697b 100644 --- a/airflow/dags/import_EOSC_catalog.py +++ b/airflow/dags/import_EOSC_catalog.py @@ -18,10 +18,8 @@ from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") -EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-portal-import") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) - ENTITIES = ["interoperability", "services", "training"] BULK_PARALLELISM = 2 @@ -40,9 +38,13 @@ default_args = { start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, + params={ + "S3_CONN_ID": "s3_conn", + "EOSC_CATALOG_BUCKET": "eosc-portal-import" + }, tags=["lot1"], ) -def import_EOSC_catalog(): +def import_EOSC_catalog(**context): @task def create_indexes(): conn = BaseHook.get_connection('opensearch_default') @@ -81,7 +83,7 @@ def import_EOSC_catalog(): pieces = [] for entity in ENTITIES: hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False}) - keys = hook.list_keys(bucket_name=EOSC_CATALOG_BUCKET, prefix=f'{entity}/') + keys = hook.list_keys(bucket_name=context["params"]["EOSC_CATALOG_BUCKET"], prefix=f'{entity}/') for key in keys: pieces.append((entity, key)) @@ -107,7 +109,7 @@ def import_EOSC_catalog(): def _generate_data(): for (entity, key) in files: print(f'{entity}: {key}') - s3_obj = hook.get_key(key, bucket_name=EOSC_CATALOG_BUCKET) + s3_obj = hook.get_key(key, bucket_name=context["params"]["EOSC_CATALOG_BUCKET"]) with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: buff = io.BufferedReader(gzipfile) for line in buff: