from __future__ import annotations import codecs import gzip import io import json import logging import os from datetime import timedelta from kubernetes.client import models as k8s import pendulum from airflow.decorators import dag from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.helpers import chain from airflow.hooks.base import BaseHook from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings from EOSC_entity_trasform import filter_entities, transform_entities EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } configs = { "all": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues", "interoperability", "services", "training"]}, "skg-if": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"]}, "catalogue": {"ENTITIES": ["interoperability", "services", "training"]}, } for config_name, config in configs.items(): dag_id = f"import_EOSC_{config_name}" @dag( dag_id=dag_id, schedule=None, dagrun_timeout=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, params={ "S3_CONN_ID": "s3_conn", "OPENSEARCH_CONN_ID": "opensearch_default", "KEY_PREFIX": "/", "EOSC_CATALOG_BUCKET": "eosc-portal-import", "BATCH_LOADERS_NUM": 10, "ENTITIES": config["ENTITIES"], "SUFFIX": pendulum.now().format('YYYYMMDDHHmmss') }, tags=["lot1"] ) def import_EOSC_entities(): @task def create_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) client.cluster.put_settings(body={ "persistent": { "cluster.routing.allocation.balance.prefer_primary": True, "segrep.pressure.enabled": True } }) for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' if client.indices.exists(indexname): client.indices.delete(indexname) client.indices.create(indexname, { "settings": { "index": { "number_of_shards": 40, "number_of_replicas": 0, "refresh_interval": -1, "translog.flush_threshold_size": "2048MB", "codec": "zstd_no_dict", "replication.type": "SEGMENT" } }, "mappings": mappings[entity] }) def compute_batches(ds=None, **kwargs): hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False}) pieces = [] for entity in kwargs["params"]["ENTITIES"]: s3_path = os.path.normpath(kwargs["params"]["KEY_PREFIX"] + "/" + entity + "/") keys = hook.list_keys(bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], prefix=s3_path) to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys)) for obj in to_delete: hook.get_conn().delete_object(Bucket=kwargs["params"]["EOSC_CATALOG_BUCKET"], Key=obj) for key in keys: if key.endswith(('.json.gz', '.json')): pieces.append((entity, key)) def split_list(list_a, chunk_size): for i in range(0, len(list_a), chunk_size): yield {"files": list_a[i:i + chunk_size]} if len(pieces) <= 0: print("Nothing found in: " + kwargs["params"]["KEY_PREFIX"]) return list() num_batches = len(pieces)//kwargs["params"]["BATCH_LOADERS_NUM"] if num_batches > 0: return list(split_list(pieces, num_batches)) return list(split_list(pieces, len(pieces))) @task(executor_config={ "pod_override": k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", resources=k8s.V1ResourceRequirements( requests={ "cpu": "550m", "memory": "256Mi" } ) ) ] ) ) }) def bulk_load(files: list[(str, str)], **kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180, request_timeout=5*60 ) hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False}) for (entity, key) in files: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]): print(f'Skipping {entity}: {key}') continue print(f'Processing {indexname}: {key}') s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]) with gzip.GzipFile(fileobj=s3_obj.get()["Body"], mode='rb') if key.endswith(".gz") else codecs.getreader('utf-8')(s3_obj.get()["Body"]) as s3file: def _generate_data(): for line in s3file: data: dict = json.loads(line) if entity in transform_entities: data = transform_entities[entity](data) if entity in filter_entities: if filter_entities[entity](data): print(data["local_identifier"] + " does not meet inclusion policies") continue index = {"update": {"_index": indexname, "_id": data.pop("_id")}} yield index, {"doc": data, "doc_as_upsert": True} # disable success post logging logging.getLogger("opensearch").setLevel(logging.WARN) succeeded = 0 failed = 0 for success, item in helpers.parallel_bulk(client, actions=_generate_data(), expand_action_callback=lambda arg: arg, raise_on_exception=False, raise_on_error=False, chunk_size=5000, max_chunk_bytes=50 * 1024 * 1024, timeout=5*60): if success: succeeded = succeeded + 1 else: print("error: " + str(item)) failed = failed + 1 if failed > 0: print(f"There were {failed} errors:") else: hook.load_string( "", f"{key}.PROCESSED", bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], replace=False ) if succeeded > 0: print(f"Bulk-inserted {succeeded} items (streaming_bulk).") @task def merge_curation_db(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) if "products" in kwargs["params"]["ENTITIES"]: products_index = f'products_{kwargs["params"]["SUFFIX"]}' curationdb_index = 'curation' if client.indices.exists(curationdb_index): client.reindex(body={ "source": { "index": curationdb_index, "_source": ["status"] }, "dest": { "index": products_index } }, refresh=False, requests_per_second=-1, scroll="4h", slices="auto", timeout=60*60*4, wait_for_completion=True) @task def delete_missing_curated(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) if "products" in kwargs["params"]["ENTITIES"]: products_index = f'products_{kwargs["params"]["SUFFIX"]}' client.indices.refresh(products_index) client.delete_by_query(index=products_index, body={"query": {"bool": {"must_not": {"exists": {"field": "local_identifier"}}}}}, refresh=True ) @task def close_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20, timeout=180 ) for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' client.indices.refresh(indexname) # update aliases for entity in kwargs["params"]["ENTITIES"]: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' client.indices.update_aliases( body={"actions": [ {"remove": {"index": f"{entity}_*", "alias": entity}}, {"add": {"index": indexname, "alias": entity}}, ]} ) # update "allresources" alias actions = [] for entity in kwargs["params"]["ENTITIES"]: if entity in ['products', 'services', 'training', 'interoperability']: indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}}) actions.append({"add": {"index": indexname, "alias": "allresources"}}) if len(actions) > 0: client.indices.update_aliases( body={"actions": actions} ) parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches) chain( create_indexes.override(task_id="create_indexes")(), merge_curation_db.override(task_id="merge_curation_db")(), parallel_batches, bulk_load.expand_kwargs(parallel_batches.output), delete_missing_curated.override(task_id="delete_missing_curated_recs")(), close_indexes.override(task_id="close_indexes")() ) import_EOSC_entities()