From 439854609598a6f308533c32c35a160cc0af18dc Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 25 Mar 2024 22:09:41 +0100 Subject: [PATCH] initial stage --- airflow/dags/import_EOSC_catalog.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airflow/dags/import_EOSC_catalog.py b/airflow/dags/import_EOSC_catalog.py index cbeeea7..24ed2b5 100644 --- a/airflow/dags/import_EOSC_catalog.py +++ b/airflow/dags/import_EOSC_catalog.py @@ -12,7 +12,7 @@ from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.helpers import chain -from airflow.models import Variable +from airflow.hooks.base import BaseHook from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings @@ -21,9 +21,6 @@ S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-portal-import") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) -OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local") -OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") -OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") ENTITIES = ["interoperability", "services", "training"] @@ -48,9 +45,10 @@ default_args = { def import_EOSC_catalog(): @task def create_indexes(): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], - http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, @@ -95,9 +93,10 @@ def import_EOSC_catalog(): @task def bulk_load(files: list[(str, str)]): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], - http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, @@ -135,9 +134,10 @@ def import_EOSC_catalog(): @task def close_indexes(): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], - http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False,