from __future__ import annotations import os from datetime import timedelta import pendulum import requests from airflow.decorators import dag from airflow.decorators import task from airflow.hooks.base import BaseHook from airflow.utils.helpers import chain from opensearchpy import OpenSearch from EOSC_indexes import mappings EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } @dag( dag_id="import_Catalogue", schedule=None, dagrun_timeout=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, params={ "S3_CONN_ID": "s3_conn", "OPENSEARCH_CONN_ID": "opensearch_default", "EOSC_CATALOG_BUCKET": "eosc-portal-import", "BATCH_LOADERS_NUM": 10, "ENTITIES": ["datasources", "interoperability-records", "providers", "resource-interoperability-records", "services", "training"], "SHARDS": 3, "SUFFIX": pendulum.now().format('YYYYMMDDHHmmss') }, tags=["lot1"] ) def import_catalogue_entities(): @task def create_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' if client.indices.exists(indexname): client.indices.delete(indexname) client.indices.create(indexname, { "settings": { "index": { "number_of_shards": kwargs["params"]["SHARDS"], "number_of_replicas": 0, "refresh_interval": -1, "translog.flush_threshold_size": "2048MB", "codec": "zstd_no_dict", "replication.type": "SEGMENT" } } #"mappings": mappings[entity] }) @task def harvest_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) session = requests.session() for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' baseurl = "http://vereniki.athenarc.gr:8080/eic-registry" callurl = f"{baseurl}/{entity}" params = {"draft": "false", "active": "true", "suspended": "false"} while True: reply = session.get(url=callurl, params=params) reply.raise_for_status() content = reply.json() if 'results' not in content: break results = content['results'] if len(results) <= 0: break for result in results: #TODO: mapping code body = {"doc": result, "doc_as_upsert": True} client.update( index=indexname, body=body, id=result['id'], refresh=True ) # end of stream conditions if content['to'] >= content['total']: break params['from'] = content['to'] @task def close_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' client.indices.refresh(indexname) client.indices.put_settings(indexname, { "index": { "number_of_replicas": 1, "refresh_interval": "60s", } }) # update aliases for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' client.indices.update_aliases( body={"actions": [ {"remove": {"index": f"{entity}_*", "alias": entity}}, {"add": {"index": indexname, "alias": entity}}, ]} ) # update "allresources" alias actions = [] for entity in kwargs["params"]["ENTITIES"]: if entity in ['products', 'services', 'training', 'interoperability']: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}}) actions.append({"add": {"index": indexname, "alias": "allresources"}}) if len(actions) > 0: client.indices.update_aliases( body={"actions": actions} ) chain( create_indexes.override(task_id="create_indexes")(), harvest_indexes.override(task_id="harvest_indexes")(), close_indexes.override(task_id="close_indexes")() ) import_catalogue_entities()