lot1-kickoff/airflow/dags/import_Catalogues.py

216 lines
7.8 KiB
Python

from __future__ import annotations
import os
from datetime import timedelta
import opensearchpy
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from airflow.utils.helpers import chain
from opensearchpy import OpenSearch, helpers
from catalogue.RawCatalogOpensearch import RawCatalogOpensearch
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
dag_id="import_Catalogue",
schedule=None,
dagrun_timeout=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args=default_args,
params={
"OPENSEARCH_CONN_ID": "opensearch_default",
"SHARDS": 3,
"SUFFIX": pendulum.now().format('YYYYMMDDHHmmss')
},
tags=["lot1"]
)
def import_catalogue_entities():
@task
def create_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
for entity in RawCatalogOpensearch.entities:
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
if client.indices.exists(indexname):
client.indices.delete(indexname)
@task
def harvest_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
catalog = RawCatalogOpensearch(client, kwargs["params"]["SUFFIX"])
session = requests.session()
for entity in RawCatalogOpensearch.entities:
indexname = catalog.get_index(entity)
baseurl = "http://vereniki.athenarc.gr:8080/eic-registry"
callurl = f"{baseurl}/{entity}"
params = {"draft": "false", "active": "true", "suspended": "false"}
if client.indices.exists(indexname):
client.indices.delete(indexname)
while True:
reply = session.get(url=callurl, params=params)
reply.raise_for_status()
content = reply.json()
if 'results' not in content:
break
results = content['results']
if len(results) <= 0:
break
def streamed_results():
for r in results:
yield {"_index": indexname, "_id": r['id'], "_source": r}
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=streamed_results(), timeout=5 * 60):
if success:
succeeded = succeeded + 1
else:
print("error: " + str(item))
failed = failed + 1
# end of stream conditions
if content['to'] >= content['total']:
break
params['from'] = content['to']
client.indices.refresh(indexname)
@task
def map_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
catalog = RawCatalogOpensearch(client, kwargs["params"]["SUFFIX"])
for entity in ["interoperability-records", "training-resources", "services"]:
mapped_index = catalog.get_mapped_index(entity)
def streamed_results():
nonlocal mapped_index
for hit in opensearchpy.helpers.scan(client, index=catalog.get_index(entity),
query={"query": {"match_all": {}}}):
r = hit['_source']
doc = None
match entity:
case "interoperability-records":
doc = catalog.map_interoperability(r)
case "training-resources":
doc = catalog.map_training(r)
case "services":
doc = catalog.map_service(r)
yield {"_index": mapped_index, "_id": doc['id'], "_source": doc}
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=streamed_results(), timeout=5 * 60):
if success:
succeeded = succeeded + 1
else:
print("error: " + str(item))
failed = failed + 1
@task
def close_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
catalog = RawCatalogOpensearch(client, kwargs["params"]["SUFFIX"])
def refresh_index(index_name):
if index_name is not None:
client.indices.refresh(index_name)
client.indices.put_settings(index=index_name, body={
"index": {
"number_of_replicas": 1,
"refresh_interval": "60s",
}
})
def update_aliases(index_name, alias_name):
if index_name is not None and alias_name is not None:
client.indices.update_aliases(
body={"actions": [
{"remove": {"index": f"{alias_name}_*", "alias": alias_name}},
{"add": {"index": index_name, "alias": alias_name}},
]}
)
for entity in RawCatalogOpensearch.entities:
refresh_index(catalog.get_index(entity))
refresh_index(catalog.get_mapped_index(entity))
update_aliases(catalog.get_index(entity), catalog.get_alias(entity))
update_aliases(catalog.get_mapped_index(entity), catalog.get_mapped_alias(entity))
# update "allresources" alias with mapped indices
actions = []
for entity in RawCatalogOpensearch.mapped_entities:
index_name = catalog.get_mapped_index(entity)
entity_alias = catalog.get_mapped_alias(entity)
actions.append({"remove": {"index": f"{entity_alias}_*", "alias": "allresources"}})
actions.append({"add": {"index": index_name, "alias": "allresources"}})
if len(actions) > 0:
client.indices.update_aliases(
body={"actions": actions}
)
chain(
create_indexes.override(task_id="create_indexes")(),
harvest_indexes.override(task_id="harvest_indexes")(),
map_indexes.override(task_id="map_indexes")(),
close_indexes.override(task_id="close_indexes")()
)
import_catalogue_entities()