initial stage
This commit is contained in:
parent
8018975863
commit
227ec44a21
|
@ -0,0 +1,176 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
import pendulum
|
||||
import requests
|
||||
from airflow.decorators import dag
|
||||
from airflow.decorators import task
|
||||
from airflow.hooks.base import BaseHook
|
||||
from airflow.utils.helpers import chain
|
||||
from opensearchpy import OpenSearch
|
||||
|
||||
from EOSC_indexes import mappings
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="import_Catalogue",
|
||||
schedule=None,
|
||||
dagrun_timeout=None,
|
||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||
catchup=False,
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": "s3_conn",
|
||||
"OPENSEARCH_CONN_ID": "opensearch_default",
|
||||
"EOSC_CATALOG_BUCKET": "eosc-portal-import",
|
||||
"BATCH_LOADERS_NUM": 10,
|
||||
"ENTITIES": ["datasources", "interoperability", "providers", "resource-interoperability", "services",
|
||||
"training"],
|
||||
"SHARDS": 3,
|
||||
"SUFFIX": pendulum.now().format('YYYYMMDDHHmmss')
|
||||
},
|
||||
tags=["lot1"]
|
||||
)
|
||||
def import_catalogue_entities():
|
||||
@task
|
||||
def create_indexes(**kwargs):
|
||||
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': conn.host, 'port': conn.port}],
|
||||
http_auth=(conn.login, conn.password),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20,
|
||||
timeout=180
|
||||
)
|
||||
|
||||
for entity in kwargs["params"]["ENTITIES"]:
|
||||
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
||||
if client.indices.exists(indexname):
|
||||
client.indices.delete(indexname)
|
||||
|
||||
client.indices.create(indexname, {
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": kwargs["params"]["SHARDS"],
|
||||
"number_of_replicas": 0,
|
||||
"refresh_interval": -1,
|
||||
|
||||
"translog.flush_threshold_size": "2048MB",
|
||||
|
||||
"codec": "zstd_no_dict",
|
||||
"replication.type": "SEGMENT"
|
||||
}
|
||||
|
||||
}
|
||||
#"mappings": mappings[entity]
|
||||
})
|
||||
|
||||
@task
|
||||
def harvest_indexes(**kwargs):
|
||||
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': conn.host, 'port': conn.port}],
|
||||
http_auth=(conn.login, conn.password),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20,
|
||||
timeout=180
|
||||
)
|
||||
|
||||
session = requests.session()
|
||||
for entity in kwargs["params"]["ENTITIES"]:
|
||||
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
||||
baseurl = "http://vereniki.athenarc.gr:8080/eic-registry"
|
||||
callurl = f"{baseurl}/{entity}?draft=false&active=true&suspended=false"
|
||||
params = {"draft": "false", "active": "true", "suspended": "false"}
|
||||
|
||||
while True:
|
||||
reply = session.get(url=baseurl, params=params)
|
||||
reply.raise_for_status()
|
||||
content = reply.json()
|
||||
if 'results' not in content:
|
||||
break
|
||||
results = content['results']
|
||||
if len(results) <= 0:
|
||||
break
|
||||
|
||||
for result in results:
|
||||
#TODO: mapping code
|
||||
body = {"doc": result, "doc_as_upsert": True}
|
||||
client.update(
|
||||
index=indexname,
|
||||
body=body,
|
||||
id=result['id'],
|
||||
refresh=True
|
||||
)
|
||||
|
||||
# end of stream conditions
|
||||
if content['to'] >= content['total']:
|
||||
break
|
||||
params['from'] = content['to']
|
||||
|
||||
@task
|
||||
def close_indexes(**kwargs):
|
||||
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': conn.host, 'port': conn.port}],
|
||||
http_auth=(conn.login, conn.password),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20,
|
||||
timeout=180
|
||||
)
|
||||
for entity in kwargs["params"]["ENTITIES"]:
|
||||
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
||||
client.indices.refresh(indexname)
|
||||
client.indices.put_settings(indexname, {
|
||||
"index": {
|
||||
"number_of_replicas": 1,
|
||||
"refresh_interval": "60s",
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
# update aliases
|
||||
for entity in kwargs["params"]["ENTITIES"]:
|
||||
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
||||
client.indices.update_aliases(
|
||||
body={"actions": [
|
||||
{"remove": {"index": f"{entity}_*", "alias": entity}},
|
||||
{"add": {"index": indexname, "alias": entity}},
|
||||
]}
|
||||
)
|
||||
# update "allresources" alias
|
||||
actions = []
|
||||
for entity in kwargs["params"]["ENTITIES"]:
|
||||
if entity in ['products', 'services', 'training', 'interoperability']:
|
||||
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
|
||||
actions.append({"remove": {"index": f"{entity}_*", "alias": "allresources"}})
|
||||
actions.append({"add": {"index": indexname, "alias": "allresources"}})
|
||||
if len(actions) > 0:
|
||||
client.indices.update_aliases(
|
||||
body={"actions": actions}
|
||||
)
|
||||
|
||||
chain(
|
||||
create_indexes.override(task_id="create_indexes")(),
|
||||
harvest_indexes.override(task_id="harvest_indexes")(),
|
||||
close_indexes.override(task_id="close_indexes")()
|
||||
)
|
||||
|
||||
|
||||
import_catalogue_entities()
|
Loading…
Reference in New Issue