lot1-kickoff/airflow/dags/import_EOSC_graph.py

309 lines
13 KiB
Python
Raw Normal View History

2024-03-22 14:06:07 +01:00
from __future__ import annotations
2024-04-08 14:22:56 +02:00
import codecs
2024-03-22 14:06:07 +01:00
import gzip
import io
import json
import logging
import os
from datetime import timedelta
from kubernetes.client import models as k8s
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.helpers import chain
2024-03-25 21:19:50 +01:00
from airflow.hooks.base import BaseHook
2024-03-22 14:06:07 +01:00
from opensearchpy import OpenSearch, helpers
2024-03-25 17:54:23 +01:00
from EOSC_indexes import mappings
2024-06-27 14:07:59 +02:00
from EOSC_entity_trasform import filter_entities, transform_entities
2024-03-22 14:06:07 +01:00
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
2024-03-26 22:25:02 +01:00
configs = {
"all": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues", "interoperability", "services", "training"]},
"skg-if": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"]},
2024-03-27 00:15:26 +01:00
"catalogue": {"ENTITIES": ["interoperability", "services", "training"]},
2024-03-26 22:25:02 +01:00
}
2024-03-22 14:06:07 +01:00
2024-03-26 22:25:02 +01:00
for config_name, config in configs.items():
dag_id = f"import_EOSC_{config_name}"
@dag(
dag_id=dag_id,
schedule=None,
dagrun_timeout=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": "s3_conn",
"OPENSEARCH_CONN_ID": "opensearch_default",
2024-06-10 14:31:26 +02:00
"KEY_PREFIX": "/",
2024-03-26 22:25:02 +01:00
"EOSC_CATALOG_BUCKET": "eosc-portal-import",
"BATCH_LOADERS_NUM": 10,
"ENTITIES": config["ENTITIES"],
"SUFFIX": pendulum.now().format('YYYYMMDDHHmmss')
},
tags=["lot1"]
)
def import_EOSC_entities():
@task
def create_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
2024-06-27 14:31:34 +02:00
pool_maxsize=20,
timeout=180
2024-03-26 22:25:02 +01:00
)
2024-03-22 14:06:07 +01:00
2024-03-26 22:25:02 +01:00
client.cluster.put_settings(body={
"persistent": {
"cluster.routing.allocation.balance.prefer_primary": True,
"segrep.pressure.enabled": True
}
2024-03-22 14:06:07 +01:00
})
2024-03-26 22:25:02 +01:00
for entity in kwargs["params"]["ENTITIES"]:
2024-03-27 22:48:13 +01:00
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
if client.indices.exists(indexname):
client.indices.delete(indexname)
2024-03-26 22:25:02 +01:00
2024-03-27 22:48:13 +01:00
client.indices.create(indexname, {
2024-03-26 22:25:02 +01:00
"settings": {
"index": {
"number_of_shards": 40,
"number_of_replicas": 0,
"refresh_interval": -1,
"translog.flush_threshold_size": "2048MB",
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
}
},
"mappings": mappings[entity]
})
def compute_batches(ds=None, **kwargs):
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
pieces = []
2024-03-27 00:15:26 +01:00
for entity in kwargs["params"]["ENTITIES"]:
2024-06-10 14:31:26 +02:00
s3_path = os.path.normpath(kwargs["params"]["KEY_PREFIX"] + "/" + entity + "/")
keys = hook.list_keys(bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], prefix=s3_path)
2024-03-26 22:25:02 +01:00
to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys))
for obj in to_delete:
hook.get_conn().delete_object(Bucket=kwargs["params"]["EOSC_CATALOG_BUCKET"], Key=obj)
for key in keys:
2024-04-08 14:11:50 +02:00
if key.endswith(('.json.gz', '.json')):
2024-03-26 22:25:02 +01:00
pieces.append((entity, key))
def split_list(list_a, chunk_size):
for i in range(0, len(list_a), chunk_size):
yield {"files": list_a[i:i + chunk_size]}
2024-06-11 21:58:07 +02:00
if len(pieces) <= 0:
print("Nothing found in: " + kwargs["params"]["KEY_PREFIX"])
return list()
2024-03-26 22:25:02 +01:00
num_batches = len(pieces)//kwargs["params"]["BATCH_LOADERS_NUM"]
if num_batches > 0:
return list(split_list(pieces, num_batches))
2024-03-27 13:00:23 +01:00
return list(split_list(pieces, len(pieces)))
2024-03-26 22:25:02 +01:00
@task(executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "550m",
"memory": "256Mi"
}
)
2024-03-22 14:06:07 +01:00
)
2024-03-26 22:25:02 +01:00
]
)
2024-03-22 14:06:07 +01:00
)
2024-03-26 22:25:02 +01:00
})
def bulk_load(files: list[(str, str)], **kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
2024-06-27 14:31:34 +02:00
pool_maxsize=20,
timeout=180
2024-03-26 22:25:02 +01:00
)
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
for (entity, key) in files:
2024-03-27 22:48:13 +01:00
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
2024-03-26 22:25:02 +01:00
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]):
print(f'Skipping {entity}: {key}')
continue
2024-03-27 22:48:13 +01:00
print(f'Processing {indexname}: {key}')
2024-03-26 22:25:02 +01:00
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
2024-04-09 09:40:44 +02:00
with gzip.GzipFile(fileobj=s3_obj.get()["Body"], mode='rb') if key.endswith(".gz") else codecs.getreader('utf-8')(s3_obj.get()["Body"]) as s3file:
2024-04-08 14:15:02 +02:00
def _generate_data():
2024-04-08 14:25:11 +02:00
for line in s3file:
2024-06-27 14:07:59 +02:00
data: dict = json.loads(line)
2024-04-08 14:15:02 +02:00
if entity in transform_entities:
data = transform_entities[entity](data)
2024-06-27 14:07:59 +02:00
if entity in filter_entities:
if filter_entities[entity](data):
print(data["local_identifier"] + " does not meet inclusion policies")
continue
index = {"update": {"_index": indexname, "_id": data.pop("_id")}}
yield index, {"doc": data, "doc_as_upsert": True}
2024-04-08 14:15:02 +02:00
# disable success post logging
logging.getLogger("opensearch").setLevel(logging.WARN)
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
2024-06-27 14:45:36 +02:00
expand_action_callback=lambda arg: arg,
2024-04-08 14:15:02 +02:00
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
2024-03-22 14:06:07 +01:00
else:
2024-06-27 15:08:52 +02:00
print("error: " + str(item))
2024-04-08 14:15:02 +02:00
failed = failed + 1
if failed > 0:
print(f"There were {failed} errors:")
else:
hook.load_string(
"",
f"{key}.PROCESSED",
bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"],
replace=False
)
2024-03-26 22:25:02 +01:00
2024-04-08 14:15:02 +02:00
if succeeded > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
2024-03-26 22:25:02 +01:00
2024-06-27 14:07:59 +02:00
@task
def merge_curation_db(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
if "products" in kwargs["params"]["ENTITIES"]:
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
curationdb_index = 'curation'
if client.indices.exists(curationdb_index):
client.reindex(body={
"source": {
"index": curationdb_index,
"_source": ["status"]
},
"dest": {
"index": products_index
}
},
refresh=False,
requests_per_second=-1,
scroll="4h",
slices="auto",
timeout=60*60*4,
wait_for_completion=True)
@task
def delete_missing_curated(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
if "products" in kwargs["params"]["ENTITIES"]:
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
2024-06-27 19:45:25 +02:00
client.indices.refresh(products_index)
2024-06-27 14:07:59 +02:00
client.delete_by_query(index=products_index,
body={"query": {"bool": {"must_not": {"exists": {"field": "local_identifier"}}}}},
2024-06-27 19:45:25 +02:00
refresh=True
2024-06-27 14:07:59 +02:00
)
2024-03-26 22:25:02 +01:00
@task
def close_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
2024-03-27 00:15:26 +01:00
for entity in kwargs["params"]["ENTITIES"]:
2024-03-27 22:48:13 +01:00
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
client.indices.refresh(indexname)
# update aliases
for entity in kwargs["params"]["ENTITIES"]:
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
client.indices.update_aliases(
2024-03-27 22:58:50 +01:00
body={"actions": [
2024-03-27 22:48:13 +01:00
{"remove": {"index": f"{entity}_*", "alias": entity}},
{"add": {"index": indexname, "alias": entity}},
2024-03-27 22:58:50 +01:00
]}
2024-03-27 22:48:13 +01:00
)
# update "allresources" alias
actions = []
for entity in kwargs["params"]["ENTITIES"]:
if entity in ['products', 'services', 'training', 'interoperability']:
2024-03-27 23:01:07 +01:00
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
2024-03-27 22:48:13 +01:00
actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}})
2024-03-27 23:01:07 +01:00
actions.append({"add": {"index": indexname, "alias": "allresources"}})
2024-03-27 22:48:13 +01:00
if len(actions) > 0:
client.indices.update_aliases(
2024-03-27 22:58:50 +01:00
body={"actions": actions}
2024-03-27 22:48:13 +01:00
)
2024-03-22 14:06:07 +01:00
2024-03-26 22:25:02 +01:00
parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches)
2024-03-22 14:06:07 +01:00
2024-03-26 22:25:02 +01:00
chain(
create_indexes.override(task_id="create_indexes")(),
2024-06-27 14:07:59 +02:00
merge_curation_db.override(task_id="merge_curation_db")(),
2024-03-26 22:25:02 +01:00
parallel_batches,
bulk_load.expand_kwargs(parallel_batches.output),
2024-06-27 14:07:59 +02:00
delete_missing_curated.override(task_id="delete_missing_curated_recs")(),
2024-03-26 22:25:02 +01:00
close_indexes.override(task_id="close_indexes")()
)
2024-03-25 15:54:49 +01:00
2024-03-26 22:25:02 +01:00
import_EOSC_entities()