diff --git a/airflow/dags/import_EOSC_catalog.py b/airflow/dags/import_EOSC_catalog.py index 331cd66..7c267a9 100644 --- a/airflow/dags/import_EOSC_catalog.py +++ b/airflow/dags/import_EOSC_catalog.py @@ -22,7 +22,6 @@ EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-catalog") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local") -OPENSEARCH_URL = Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200") OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") diff --git a/airflow/dags/import_EOSC_graph.py b/airflow/dags/import_EOSC_graph.py index 025bcf0..6cf2582 100644 --- a/airflow/dags/import_EOSC_graph.py +++ b/airflow/dags/import_EOSC_graph.py @@ -14,7 +14,7 @@ from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.helpers import chain -from airflow.models import Variable +from airflow.hooks.base import BaseHook from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings @@ -23,40 +23,32 @@ from EOSC_entity_trasform import transform_entities S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu") S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) -OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local") -OPENSEARCH_URL = Variable.get("OPENSEARCH_URL", - "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200") -OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") -OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") ENTITIES = ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"] BULK_PARALLELISM = 10 -# - default_args = { "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } - - @dag( schedule=None, dagrun_timeout=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, - tags=["lot1"], + tags=["lot1"] ) def import_EOSC_graph(): @task def create_indexes(): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], - http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, @@ -127,9 +119,9 @@ def import_EOSC_graph(): ) }) def bulk_load(files: list[(str, str)]): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], - http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), + hosts=[{'host': conn.host, 'port': conn.port}], use_ssl=True, verify_certs=False, ssl_show_warn=False, @@ -186,8 +178,9 @@ def import_EOSC_graph(): @task def close_indexes(): + conn = BaseHook.get_connection('opensearch_default') client = OpenSearch( - hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], + hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), use_ssl=True, verify_certs=False,