initial stage

This commit is contained in:
Giambattista Bloisi 2024-06-27 14:07:59 +02:00
parent df9cab15c3
commit 52be021867
3 changed files with 143 additions and 25 deletions

View File

@ -1,3 +1,4 @@
from typing import Dict, Any, List
def map_access_right(ar: str) -> str:
@ -86,3 +87,50 @@ transform_entities = {
"services": trasform_catalog_entity,
"training": trasform_catalog_entity,
}
def isEmpty(current_value: Dict[str, Any], labels: List[str]) -> bool:
if len(labels) <= 0:
return True
for label in labels:
if isinstance(current_value, list):
current_value = current_value[0]
if isinstance(current_value, dict) and label in current_value:
current_value = current_value[label]
else:
return True
if current_value is None:
return True
if isinstance(current_value, list):
if len(current_value) > 0:
return current_value[0] == ""
else:
return True
return str(current_value) == ""
#
# Filter products that do not meet inclusion policy
#
def filter_product(p: dict) -> bool:
if isEmpty(p, ["titles", "none"]):
return True
if isEmpty(p, ["firstPublishDate"]):
return True
if p['product_type'] == "literature":
if isEmpty(p, ["abstracts", "none"]):
return True
if isEmpty(p, ["contributions", "person", "local_identifier"]):
return True
elif p['product_type'] == "research data":
if isEmpty(p, ["contributions", "person", "local_identifier"]):
return True
return False
filter_entities = {
"products": filter_product
}

View File

@ -6,9 +6,9 @@ mappings['datasource'] = {
"data_source_classification": {
"type": "keyword"
},
"eoscId": {
"type": "keyword"
},
# "eoscId": {
# "type": "keyword"
# },
"identifiers": {
"type": "object",
"properties": {
@ -320,6 +320,9 @@ mappings['grants'] = {
}
}
},
"keywords": {
"type": "keyword"
},
"local_identifier": {
"type": "keyword"
},
@ -391,9 +394,9 @@ mappings['products'] = {
"contributions": {
"type": "object",
"properties": {
"declared_affiliations": {
"type": "keyword"
},
# "declared_affiliations": {
# "type": "keyword"
# },
"person": {
"type": "object",
"properties": {
@ -414,11 +417,12 @@ mappings['products'] = {
}
},
"rank": {
"index": False,
"type": "long"
},
"roles": {
"type": "keyword"
}
# "roles": {
# "type": "keyword"
# }
}
},
"funding": {
@ -455,23 +459,29 @@ mappings['products'] = {
}
},
"indicator": {
"dynamic": False,
"type": "object",
"properties": {
"downloadsAndViews": {
"dynamic": False,
"type": "object",
"properties": {
"downloads": {
"index": False,
"type": "long"
},
"views": {
"index": False,
"type": "long"
}
}
},
"impact": {
"dynamic": False,
"type": "object",
"properties": {
"citationCount": {
"index": False,
"type": "long"
}
}
@ -488,6 +498,7 @@ mappings['products'] = {
"type": "keyword"
},
"biblio": {
"index": False,
"type": "object",
"properties": {
"edition": {
@ -496,15 +507,15 @@ mappings['products'] = {
"end_page": {
"type": "text"
},
"hosting_data_source": {
"type": "text"
},
# "hosting_data_source": {
# "type": "text"
# },
"issue": {
"type": "text"
},
"number": {
"type": "text"
},
# "number": {
# "type": "text"
# },
"publisher": {
"type": "text"
},
@ -602,9 +613,9 @@ mappings['products'] = {
"pmid": {
"type": "keyword"
},
"title": {
"type": "text"
}
# "title": {
# "type": "text"
# }
}
},
"relation_type": {
@ -651,12 +662,12 @@ mappings['products'] = {
"type": "object",
"properties": {
"trust": {
"type": "keyword",
"index": "false"
"type": "double",
"index": False
},
"type": {
"type": "keyword",
"index": "false"
"index": False
}
}
},

View File

@ -19,7 +19,7 @@ from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings
from EOSC_entity_trasform import transform_entities
from EOSC_entity_trasform import filter_entities, transform_entities
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
@ -163,17 +163,22 @@ for config_name, config in configs.items():
with gzip.GzipFile(fileobj=s3_obj.get()["Body"], mode='rb') if key.endswith(".gz") else codecs.getreader('utf-8')(s3_obj.get()["Body"]) as s3file:
def _generate_data():
for line in s3file:
data = json.loads(line)
data['_index'] = indexname
data: dict = json.loads(line)
if entity in transform_entities:
data = transform_entities[entity](data)
yield data
if entity in filter_entities:
if filter_entities[entity](data):
print(data["local_identifier"] + " does not meet inclusion policies")
continue
index = {"update": {"_index": indexname, "_id": data.pop("_id")}}
yield index, {"doc": data, "doc_as_upsert": True}
# disable success post logging
logging.getLogger("opensearch").setLevel(logging.WARN)
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
expand_action_callback=None,
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
@ -198,6 +203,58 @@ for config_name, config in configs.items():
if succeeded > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
@task
def merge_curation_db(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
if "products" in kwargs["params"]["ENTITIES"]:
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
curationdb_index = 'curation'
if client.indices.exists(curationdb_index):
client.reindex(body={
"source": {
"index": curationdb_index,
"_source": ["status"]
},
"dest": {
"index": products_index
}
},
refresh=False,
requests_per_second=-1,
scroll="4h",
slices="auto",
timeout=60*60*4,
wait_for_completion=True)
@task
def delete_missing_curated(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20,
timeout=180
)
if "products" in kwargs["params"]["ENTITIES"]:
products_index = f'products_{kwargs["params"]["SUFFIX"]}'
client.delete_by_query(index=products_index,
body={"query": {"bool": {"must_not": {"exists": {"field": "local_identifier"}}}}},
refresh=False
)
@task
def close_indexes(**kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
@ -238,8 +295,10 @@ for config_name, config in configs.items():
chain(
create_indexes.override(task_id="create_indexes")(),
merge_curation_db.override(task_id="merge_curation_db")(),
parallel_batches,
bulk_load.expand_kwargs(parallel_batches.output),
delete_missing_curated.override(task_id="delete_missing_curated_recs")(),
close_indexes.override(task_id="close_indexes")()
)