310 lines
13 KiB
Python
310 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import codecs
|
|
import gzip
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import timedelta
|
|
|
|
from kubernetes.client import models as k8s
|
|
import pendulum
|
|
from airflow.decorators import dag
|
|
from airflow.decorators import task
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
|
from airflow.utils.helpers import chain
|
|
from airflow.hooks.base import BaseHook
|
|
|
|
from opensearchpy import OpenSearch, helpers
|
|
from EOSC_indexes import mappings
|
|
from EOSC_entity_trasform import filter_entities, transform_entities
|
|
|
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
|
|
|
default_args = {
|
|
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
|
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
|
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
|
|
}
|
|
|
|
configs = {
|
|
"all": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues", "interoperability", "services", "training"]},
|
|
"skg-if": {"ENTITIES": ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"]},
|
|
"catalogue": {"ENTITIES": ["interoperability", "services", "training"]},
|
|
}
|
|
|
|
for config_name, config in configs.items():
|
|
dag_id = f"import_EOSC_{config_name}"
|
|
|
|
@dag(
|
|
dag_id=dag_id,
|
|
schedule=None,
|
|
dagrun_timeout=None,
|
|
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
|
catchup=False,
|
|
default_args=default_args,
|
|
params={
|
|
"S3_CONN_ID": "s3_conn",
|
|
"OPENSEARCH_CONN_ID": "opensearch_default",
|
|
"KEY_PREFIX": "/",
|
|
"EOSC_CATALOG_BUCKET": "eosc-portal-import",
|
|
"BATCH_LOADERS_NUM": 10,
|
|
"ENTITIES": config["ENTITIES"],
|
|
"SUFFIX": pendulum.now().format('YYYYMMDDHHmmss')
|
|
},
|
|
tags=["lot1"]
|
|
)
|
|
def import_EOSC_entities():
|
|
@task
|
|
def create_indexes(**kwargs):
|
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
|
client = OpenSearch(
|
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
|
http_auth=(conn.login, conn.password),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
timeout=180
|
|
)
|
|
|
|
client.cluster.put_settings(body={
|
|
"persistent": {
|
|
"cluster.routing.allocation.balance.prefer_primary": True,
|
|
"segrep.pressure.enabled": True
|
|
}
|
|
})
|
|
|
|
for entity in kwargs["params"]["ENTITIES"]:
|
|
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
|
if client.indices.exists(indexname):
|
|
client.indices.delete(indexname)
|
|
|
|
client.indices.create(indexname, {
|
|
"settings": {
|
|
"index": {
|
|
"number_of_shards": 40,
|
|
"number_of_replicas": 0,
|
|
"refresh_interval": -1,
|
|
|
|
"translog.flush_threshold_size": "2048MB",
|
|
|
|
"codec": "zstd_no_dict",
|
|
"replication.type": "SEGMENT"
|
|
}
|
|
|
|
},
|
|
"mappings": mappings[entity]
|
|
})
|
|
|
|
def compute_batches(ds=None, **kwargs):
|
|
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
|
|
pieces = []
|
|
for entity in kwargs["params"]["ENTITIES"]:
|
|
s3_path = os.path.normpath(kwargs["params"]["KEY_PREFIX"] + "/" + entity + "/")
|
|
keys = hook.list_keys(bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], prefix=s3_path)
|
|
to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys))
|
|
for obj in to_delete:
|
|
hook.get_conn().delete_object(Bucket=kwargs["params"]["EOSC_CATALOG_BUCKET"], Key=obj)
|
|
for key in keys:
|
|
if key.endswith(('.json.gz', '.json')):
|
|
pieces.append((entity, key))
|
|
|
|
def split_list(list_a, chunk_size):
|
|
for i in range(0, len(list_a), chunk_size):
|
|
yield {"files": list_a[i:i + chunk_size]}
|
|
|
|
if len(pieces) <= 0:
|
|
print("Nothing found in: " + kwargs["params"]["KEY_PREFIX"])
|
|
return list()
|
|
|
|
num_batches = len(pieces)//kwargs["params"]["BATCH_LOADERS_NUM"]
|
|
if num_batches > 0:
|
|
return list(split_list(pieces, num_batches))
|
|
return list(split_list(pieces, len(pieces)))
|
|
|
|
@task(executor_config={
|
|
"pod_override": k8s.V1Pod(
|
|
spec=k8s.V1PodSpec(
|
|
containers=[
|
|
k8s.V1Container(
|
|
name="base",
|
|
resources=k8s.V1ResourceRequirements(
|
|
requests={
|
|
"cpu": "550m",
|
|
"memory": "256Mi"
|
|
}
|
|
)
|
|
)
|
|
]
|
|
)
|
|
)
|
|
})
|
|
def bulk_load(files: list[(str, str)], **kwargs):
|
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
|
client = OpenSearch(
|
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
|
http_auth=(conn.login, conn.password),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
timeout=180
|
|
)
|
|
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
|
|
|
|
for (entity, key) in files:
|
|
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
|
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]):
|
|
print(f'Skipping {entity}: {key}')
|
|
continue
|
|
print(f'Processing {indexname}: {key}')
|
|
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
|
|
with gzip.GzipFile(fileobj=s3_obj.get()["Body"], mode='rb') if key.endswith(".gz") else codecs.getreader('utf-8')(s3_obj.get()["Body"]) as s3file:
|
|
def _generate_data():
|
|
for line in s3file:
|
|
data: dict = json.loads(line)
|
|
if entity in transform_entities:
|
|
data = transform_entities[entity](data)
|
|
if entity in filter_entities:
|
|
if filter_entities[entity](data):
|
|
print(data["local_identifier"] + " does not meet inclusion policies")
|
|
continue
|
|
index = {"update": {"_index": indexname, "_id": data.pop("_id")}}
|
|
yield index, {"doc": data, "doc_as_upsert": True}
|
|
|
|
# disable success post logging
|
|
logging.getLogger("opensearch").setLevel(logging.WARN)
|
|
succeeded = 0
|
|
failed = 0
|
|
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
|
|
expand_action_callback=lambda arg: arg,
|
|
raise_on_exception=False,
|
|
raise_on_error=False,
|
|
chunk_size=5000,
|
|
max_chunk_bytes=50 * 1024 * 1024,
|
|
timeout=5*60,
|
|
request_timeout=5*60):
|
|
if success:
|
|
succeeded = succeeded + 1
|
|
else:
|
|
print("error: " + str(item))
|
|
failed = failed + 1
|
|
|
|
if failed > 0:
|
|
print(f"There were {failed} errors:")
|
|
else:
|
|
hook.load_string(
|
|
"",
|
|
f"{key}.PROCESSED",
|
|
bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"],
|
|
replace=False
|
|
)
|
|
|
|
if succeeded > 0:
|
|
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
|
|
|
|
@task
|
|
def merge_curation_db(**kwargs):
|
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
|
client = OpenSearch(
|
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
|
http_auth=(conn.login, conn.password),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
timeout=180
|
|
)
|
|
if "products" in kwargs["params"]["ENTITIES"]:
|
|
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
|
|
curationdb_index = 'curation'
|
|
if client.indices.exists(curationdb_index):
|
|
client.reindex(body={
|
|
"source": {
|
|
"index": curationdb_index,
|
|
"_source": ["status"]
|
|
},
|
|
"dest": {
|
|
"index": products_index
|
|
}
|
|
},
|
|
refresh=False,
|
|
requests_per_second=-1,
|
|
scroll="4h",
|
|
slices="auto",
|
|
timeout=60*60*4,
|
|
wait_for_completion=True)
|
|
|
|
@task
|
|
def delete_missing_curated(**kwargs):
|
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
|
client = OpenSearch(
|
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
|
http_auth=(conn.login, conn.password),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
timeout=180
|
|
)
|
|
if "products" in kwargs["params"]["ENTITIES"]:
|
|
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
|
|
client.indices.refresh(products_index)
|
|
client.delete_by_query(index=products_index,
|
|
body={"query": {"bool": {"must_not": {"exists": {"field": "local_identifier"}}}}},
|
|
refresh=True
|
|
)
|
|
|
|
|
|
@task
|
|
def close_indexes(**kwargs):
|
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
|
client = OpenSearch(
|
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
|
http_auth=(conn.login, conn.password),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
timeout=180
|
|
)
|
|
for entity in kwargs["params"]["ENTITIES"]:
|
|
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
|
client.indices.refresh(indexname)
|
|
# update aliases
|
|
for entity in kwargs["params"]["ENTITIES"]:
|
|
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
|
client.indices.update_aliases(
|
|
body={"actions": [
|
|
{"remove": {"index": f"{entity}_*", "alias": entity}},
|
|
{"add": {"index": indexname, "alias": entity}},
|
|
]}
|
|
)
|
|
# update "allresources" alias
|
|
actions = []
|
|
for entity in kwargs["params"]["ENTITIES"]:
|
|
if entity in ['products', 'services', 'training', 'interoperability']:
|
|
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
|
actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}})
|
|
actions.append({"add": {"index": indexname, "alias": "allresources"}})
|
|
if len(actions) > 0:
|
|
client.indices.update_aliases(
|
|
body={"actions": actions}
|
|
)
|
|
|
|
parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches)
|
|
|
|
chain(
|
|
create_indexes.override(task_id="create_indexes")(),
|
|
merge_curation_db.override(task_id="merge_curation_db")(),
|
|
parallel_batches,
|
|
bulk_load.expand_kwargs(parallel_batches.output),
|
|
delete_missing_curated.override(task_id="delete_missing_curated_recs")(),
|
|
close_indexes.override(task_id="close_indexes")()
|
|
)
|
|
|
|
import_EOSC_entities()
|