From fc5f884f4d29ac73490a5748a9616d86563cc16f Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 26 Mar 2024 14:20:45 +0100 Subject: [PATCH] initial stage --- airflow/dags/import_EOSC_catalog.py | 11 ++++++----- airflow/dags/import_EOSC_graph.py | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/airflow/dags/import_EOSC_catalog.py b/airflow/dags/import_EOSC_catalog.py index afd624c..dc01ba6 100644 --- a/airflow/dags/import_EOSC_catalog.py +++ b/airflow/dags/import_EOSC_catalog.py @@ -40,14 +40,15 @@ default_args = { default_args=default_args, params={ "S3_CONN_ID": "s3_conn", + "OPENSEARCH_CONN_ID": "opensearch_default", "EOSC_CATALOG_BUCKET": "eosc-portal-import" }, tags=["lot1"], ) def import_EOSC_catalog(): @task - def create_indexes(): - conn = BaseHook.get_connection('opensearch_default') + def create_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), @@ -98,7 +99,7 @@ def import_EOSC_catalog(): @task def bulk_load(files: list[(str, str)], **kwargs): - conn = BaseHook.get_connection('opensearch_default') + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), @@ -138,8 +139,8 @@ def import_EOSC_catalog(): print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).") @task - def close_indexes(): - conn = BaseHook.get_connection('opensearch_default') + def close_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), diff --git a/airflow/dags/import_EOSC_graph.py b/airflow/dags/import_EOSC_graph.py index 418d022..c68d9e6 100644 --- a/airflow/dags/import_EOSC_graph.py +++ b/airflow/dags/import_EOSC_graph.py @@ -42,14 +42,15 @@ default_args = { default_args=default_args, params={ "S3_CONN_ID": "s3_conn", + "OPENSEARCH_CONN_ID": "opensearch_default", "EOSC_CATALOG_BUCKET": "eosc-portal-import" }, tags=["lot1"] ) def import_EOSC_graph(): @task - def create_indexes(): - conn = BaseHook.get_connection('opensearch_default') + def create_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), @@ -123,7 +124,7 @@ def import_EOSC_graph(): ) }) def bulk_load(files: list[(str, str)], **kwargs): - conn = BaseHook.get_connection('opensearch_default') + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), @@ -182,8 +183,8 @@ def import_EOSC_graph(): print(f"Bulk-inserted {succeeded} items (streaming_bulk).") @task - def close_indexes(): - conn = BaseHook.get_connection('opensearch_default') + def close_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password),