From 227ec44a2141b49a1470143fca179eb0bc8087c4 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 10 Jul 2024 21:43:02 +0200 Subject: [PATCH] initial stage --- airflow/dags/import_Catalogues.py | 176 ++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 airflow/dags/import_Catalogues.py diff --git a/airflow/dags/import_Catalogues.py b/airflow/dags/import_Catalogues.py new file mode 100644 index 0000000..1736d2b --- /dev/null +++ b/airflow/dags/import_Catalogues.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import os +from datetime import timedelta + +import pendulum +import requests +from airflow.decorators import dag +from airflow.decorators import task +from airflow.hooks.base import BaseHook +from airflow.utils.helpers import chain +from opensearchpy import OpenSearch + +from EOSC_indexes import mappings + +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +@dag( + dag_id="import_Catalogue", + schedule=None, + dagrun_timeout=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + default_args=default_args, + params={ + "S3_CONN_ID": "s3_conn", + "OPENSEARCH_CONN_ID": "opensearch_default", + "EOSC_CATALOG_BUCKET": "eosc-portal-import", + "BATCH_LOADERS_NUM": 10, + "ENTITIES": ["datasources", "interoperability", "providers", "resource-interoperability", "services", + "training"], + "SHARDS": 3, + "SUFFIX": pendulum.now().format('YYYYMMDDHHmmss') + }, + tags=["lot1"] +) +def import_catalogue_entities(): + @task + def create_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + + for entity in kwargs["params"]["ENTITIES"]: + indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' + if client.indices.exists(indexname): + client.indices.delete(indexname) + + client.indices.create(indexname, { + "settings": { + "index": { + "number_of_shards": kwargs["params"]["SHARDS"], + "number_of_replicas": 0, + "refresh_interval": -1, + + "translog.flush_threshold_size": "2048MB", + + "codec": "zstd_no_dict", + "replication.type": "SEGMENT" + } + + } + #"mappings": mappings[entity] + }) + + @task + def harvest_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + + session = requests.session() + for entity in kwargs["params"]["ENTITIES"]: + indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' + baseurl = "http://vereniki.athenarc.gr:8080/eic-registry" + callurl = f"{baseurl}/{entity}?draft=false&active=true&suspended=false" + params = {"draft": "false", "active": "true", "suspended": "false"} + + while True: + reply = session.get(url=baseurl, params=params) + reply.raise_for_status() + content = reply.json() + if 'results' not in content: + break + results = content['results'] + if len(results) <= 0: + break + + for result in results: + #TODO: mapping code + body = {"doc": result, "doc_as_upsert": True} + client.update( + index=indexname, + body=body, + id=result['id'], + refresh=True + ) + + # end of stream conditions + if content['to'] >= content['total']: + break + params['from'] = content['to'] + + @task + def close_indexes(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + for entity in kwargs["params"]["ENTITIES"]: + indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' + client.indices.refresh(indexname) + client.indices.put_settings(indexname, { + "index": { + "number_of_replicas": 1, + "refresh_interval": "60s", + } + + }) + + # update aliases + for entity in kwargs["params"]["ENTITIES"]: + indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' + client.indices.update_aliases( + body={"actions": [ + {"remove": {"index": f"{entity}_*", "alias": entity}}, + {"add": {"index": indexname, "alias": entity}}, + ]} + ) + # update "allresources" alias + actions = [] + for entity in kwargs["params"]["ENTITIES"]: + if entity in ['products', 'services', 'training', 'interoperability']: + indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' + actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}}) + actions.append({"add": {"index": indexname, "alias": "allresources"}}) + if len(actions) > 0: + client.indices.update_aliases( + body={"actions": actions} + ) + + chain( + create_indexes.override(task_id="create_indexes")(), + harvest_indexes.override(task_id="harvest_indexes")(), + close_indexes.override(task_id="close_indexes")() + ) + + +import_catalogue_entities()