From 52be021867f52af880028c5c64fd65b1caf91570 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 27 Jun 2024 14:07:59 +0200 Subject: [PATCH] initial stage --- airflow/dags/EOSC_entity_trasform.py | 48 ++++++++++++++++++++ airflow/dags/EOSC_indexes.py | 53 +++++++++++++--------- airflow/dags/import_EOSC_graph.py | 67 ++++++++++++++++++++++++++-- 3 files changed, 143 insertions(+), 25 deletions(-) diff --git a/airflow/dags/EOSC_entity_trasform.py b/airflow/dags/EOSC_entity_trasform.py index d4c9539..cfd72f7 100644 --- a/airflow/dags/EOSC_entity_trasform.py +++ b/airflow/dags/EOSC_entity_trasform.py @@ -1,3 +1,4 @@ +from typing import Dict, Any, List def map_access_right(ar: str) -> str: @@ -86,3 +87,50 @@ transform_entities = { "services": trasform_catalog_entity, "training": trasform_catalog_entity, } + +def isEmpty(current_value: Dict[str, Any], labels: List[str]) -> bool: + if len(labels) <= 0: + return True + for label in labels: + if isinstance(current_value, list): + current_value = current_value[0] + if isinstance(current_value, dict) and label in current_value: + current_value = current_value[label] + else: + return True + if current_value is None: + return True + if isinstance(current_value, list): + if len(current_value) > 0: + return current_value[0] == "" + else: + return True + + return str(current_value) == "" + + +# +# Filter products that do not meet inclusion policy +# +def filter_product(p: dict) -> bool: + if isEmpty(p, ["titles", "none"]): + return True + + if isEmpty(p, ["firstPublishDate"]): + return True + + if p['product_type'] == "literature": + if isEmpty(p, ["abstracts", "none"]): + return True + if isEmpty(p, ["contributions", "person", "local_identifier"]): + return True + elif p['product_type'] == "research data": + if isEmpty(p, ["contributions", "person", "local_identifier"]): + return True + + return False + + +filter_entities = { + "products": filter_product +} diff --git a/airflow/dags/EOSC_indexes.py b/airflow/dags/EOSC_indexes.py index 9806b02..1e13570 100644 --- a/airflow/dags/EOSC_indexes.py +++ b/airflow/dags/EOSC_indexes.py @@ -6,9 +6,9 @@ mappings['datasource'] = { "data_source_classification": { "type": "keyword" }, - "eoscId": { - "type": "keyword" - }, + # "eoscId": { + # "type": "keyword" + # }, "identifiers": { "type": "object", "properties": { @@ -320,6 +320,9 @@ mappings['grants'] = { } } }, + "keywords": { + "type": "keyword" + }, "local_identifier": { "type": "keyword" }, @@ -391,9 +394,9 @@ mappings['products'] = { "contributions": { "type": "object", "properties": { - "declared_affiliations": { - "type": "keyword" - }, + # "declared_affiliations": { + # "type": "keyword" + # }, "person": { "type": "object", "properties": { @@ -414,11 +417,12 @@ mappings['products'] = { } }, "rank": { + "index": False, "type": "long" }, - "roles": { - "type": "keyword" - } + # "roles": { + # "type": "keyword" + # } } }, "funding": { @@ -455,23 +459,29 @@ mappings['products'] = { } }, "indicator": { + "dynamic": False, "type": "object", "properties": { "downloadsAndViews": { + "dynamic": False, "type": "object", "properties": { "downloads": { + "index": False, "type": "long" }, "views": { + "index": False, "type": "long" } } }, "impact": { + "dynamic": False, "type": "object", "properties": { "citationCount": { + "index": False, "type": "long" } } @@ -488,6 +498,7 @@ mappings['products'] = { "type": "keyword" }, "biblio": { + "index": False, "type": "object", "properties": { "edition": { @@ -496,15 +507,15 @@ mappings['products'] = { "end_page": { "type": "text" }, - "hosting_data_source": { - "type": "text" - }, + # "hosting_data_source": { + # "type": "text" + # }, "issue": { "type": "text" }, - "number": { - "type": "text" - }, + # "number": { + # "type": "text" + # }, "publisher": { "type": "text" }, @@ -602,9 +613,9 @@ mappings['products'] = { "pmid": { "type": "keyword" }, - "title": { - "type": "text" - } + # "title": { + # "type": "text" + # } } }, "relation_type": { @@ -651,12 +662,12 @@ mappings['products'] = { "type": "object", "properties": { "trust": { - "type": "keyword", - "index": "false" + "type": "double", + "index": False }, "type": { "type": "keyword", - "index": "false" + "index": False } } }, diff --git a/airflow/dags/import_EOSC_graph.py b/airflow/dags/import_EOSC_graph.py index 74f4a97..5e03ea2 100644 --- a/airflow/dags/import_EOSC_graph.py +++ b/airflow/dags/import_EOSC_graph.py @@ -19,7 +19,7 @@ from airflow.hooks.base import BaseHook from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings -from EOSC_entity_trasform import transform_entities +from EOSC_entity_trasform import filter_entities, transform_entities EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) @@ -163,17 +163,22 @@ for config_name, config in configs.items(): with gzip.GzipFile(fileobj=s3_obj.get()["Body"], mode='rb') if key.endswith(".gz") else codecs.getreader('utf-8')(s3_obj.get()["Body"]) as s3file: def _generate_data(): for line in s3file: - data = json.loads(line) - data['_index'] = indexname + data: dict = json.loads(line) if entity in transform_entities: data = transform_entities[entity](data) - yield data + if entity in filter_entities: + if filter_entities[entity](data): + print(data["local_identifier"] + " does not meet inclusion policies") + continue + index = {"update": {"_index": indexname, "_id": data.pop("_id")}} + yield index, {"doc": data, "doc_as_upsert": True} # disable success post logging logging.getLogger("opensearch").setLevel(logging.WARN) succeeded = 0 failed = 0 for success, item in helpers.parallel_bulk(client, actions=_generate_data(), + expand_action_callback=None, raise_on_exception=False, raise_on_error=False, chunk_size=5000, @@ -198,6 +203,58 @@ for config_name, config in configs.items(): if succeeded > 0: print(f"Bulk-inserted {succeeded} items (streaming_bulk).") + @task + def merge_curation_db(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + if "products" in kwargs["params"]["ENTITIES"]: + products_index = f'products_{kwargs["params"]["SUFFIX"]}' + curationdb_index = 'curation' + if client.indices.exists(curationdb_index): + client.reindex(body={ + "source": { + "index": curationdb_index, + "_source": ["status"] + }, + "dest": { + "index": products_index + } + }, + refresh=False, + requests_per_second=-1, + scroll="4h", + slices="auto", + timeout=60*60*4, + wait_for_completion=True) + + @task + def delete_missing_curated(**kwargs): + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + if "products" in kwargs["params"]["ENTITIES"]: + products_index = f'products_{kwargs["params"]["SUFFIX"]}' + client.delete_by_query(index=products_index, + body={"query": {"bool": {"must_not": {"exists": {"field": "local_identifier"}}}}}, + refresh=False + ) + + @task def close_indexes(**kwargs): conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) @@ -238,8 +295,10 @@ for config_name, config in configs.items(): chain( create_indexes.override(task_id="create_indexes")(), + merge_curation_db.override(task_id="merge_curation_db")(), parallel_batches, bulk_load.expand_kwargs(parallel_batches.output), + delete_missing_curated.override(task_id="delete_missing_curated_recs")(), close_indexes.override(task_id="close_indexes")() )