diff --git a/airflow/dags/import_EOSC_catalog.py b/airflow/dags/import_EOSC_catalog.py index 6fb45f9..d238d89 100644 --- a/airflow/dags/import_EOSC_catalog.py +++ b/airflow/dags/import_EOSC_catalog.py @@ -92,8 +92,8 @@ def import_EOSC_catalog(): return list(split_list(pieces, len(pieces))) @task - def bulk_load(files: list[(str, str)], **kwargs): - conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + def bulk_load(files: list[(str, str)], params: dict): + conn = BaseHook.get_connection(params["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), @@ -102,12 +102,12 @@ def import_EOSC_catalog(): ssl_show_warn=False, pool_maxsize=20 ) - hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False}) + hook = S3Hook(params["S3_CONN_ID"], transfer_config_args={'use_threads': False}) def _generate_data(): for (entity, key) in files: print(f'{entity}: {key}') - s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]) + s3_obj = hook.get_key(key, bucket_name=params["EOSC_CATALOG_BUCKET"]) with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: buff = io.BufferedReader(gzipfile) for line in buff: