initial stage

This commit is contained in:
Giambattista Bloisi 2024-07-26 17:52:37 +02:00
parent 1ad367efcc
commit 48d2f69bc6
5 changed files with 386 additions and 383 deletions

View File

@ -6,9 +6,9 @@ mappings['datasource'] = {
"data_source_classification": { "data_source_classification": {
"type": "keyword" "type": "keyword"
}, },
# "eoscId": { "entity_type": {
# "type": "keyword" "type": "keyword"
# }, },
"identifiers": { "identifiers": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -86,6 +86,9 @@ mappings['datasource'] = {
mappings['venues'] = { mappings['venues'] = {
"properties": { "properties": {
"entity_type": {
"type": "keyword"
},
"identifiers": { "identifiers": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -124,6 +127,9 @@ mappings['venues'] = {
mappings['topics'] = { mappings['topics'] = {
"properties": { "properties": {
"entity_type": {
"type": "keyword"
},
"identifiers": { "identifiers": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -151,6 +157,9 @@ mappings['topics'] = {
mappings['persons'] = { mappings['persons'] = {
"properties": { "properties": {
"entity_type": {
"type": "keyword"
},
"family_name": { "family_name": {
"fields": { "fields": {
"keyword": { "keyword": {
@ -197,6 +206,9 @@ mappings['organizations'] = {
"country": { "country": {
"type": "keyword" "type": "keyword"
}, },
"entity_type": {
"type": "keyword"
},
"identifiers": { "identifiers": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -394,9 +406,9 @@ mappings['products'] = {
"contributions": { "contributions": {
"type": "object", "type": "object",
"properties": { "properties": {
# "declared_affiliations": { "is_listed_author": {
# "type": "keyword" "type": "boolean"
# }, },
"person": { "person": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -420,11 +432,11 @@ mappings['products'] = {
"index": False, "index": False,
"type": "long" "type": "long"
}, },
# "roles": {
# "type": "keyword"
# }
} }
}, },
"entity_type": {
"type": "keyword"
},
"funding": { "funding": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -506,15 +518,9 @@ mappings['products'] = {
"end_page": { "end_page": {
"type": "text" "type": "text"
}, },
# "hosting_data_source": {
# "type": "text"
# },
"issue": { "issue": {
"type": "text" "type": "text"
}, },
# "number": {
# "type": "text"
# },
"publisher": { "publisher": {
"type": "text" "type": "text"
}, },
@ -537,9 +543,6 @@ mappings['products'] = {
} }
} }
}, },
"eoscId": {
"type": "keyword"
},
"hosting_datasource": { "hosting_datasource": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -612,9 +615,6 @@ mappings['products'] = {
"pmid": { "pmid": {
"type": "keyword" "type": "keyword"
}, },
# "title": {
# "type": "text"
# }
} }
}, },
"relation_type": { "relation_type": {
@ -686,6 +686,7 @@ mappings['products'] = {
} }
} }
}, },
# Aliases
"type": { "type": {
"path": "product_type", "path": "product_type",
"type": "alias" "type": "alias"

View File

@ -0,0 +1,283 @@
from datetime import datetime
from opensearchpy import OpenSearch
from catalogue.dictutils import extract_nested, extract_map_nested, delete_none
from catalogue.vocabulary import CATALOG_VOCABULARY
class RawCatalogOpensearch:
def __init__(self, os_client: OpenSearch, suffix: str | None):
self.os_client = os_client
self.suffix = suffix
def get_index(self, name: str):
return "catalog_" + name + ("" if self.suffix is None else f"_{self.suffix}")
def get_resource_interoperability_records(self, resource_id):
response = self.os_client.search(
body={
'query': {
'term': {
'resourceInteroperabilityRecord.resourceId.keyword': resource_id,
}
},
"fields": [
"resourceInteroperabilityRecord.interoperabilityRecordIds"
],
"_source": False
},
index=self.get_index('resource-interoperability-records')
)
interoperability_ids = []
interoperability_records = []
for hit in response['hits']['hits']:
interoperability_ids.extend(
extract_nested(hit, ['fields', 'resourceInteroperabilityRecord.interoperabilityRecordIds']) or [])
if len(interoperability_ids) > 0:
response = self.os_client.search(
body={
"query": {
"ids": {
"values": interoperability_ids,
}
},
},
index=self.get_index('interoperability-records')
)
for hit in response['hits']['hits']:
interoperability_records.append(extract_nested(hit, ['_source']))
return interoperability_records
def get_providers(self, provider_ids: list[str]) -> list:
provider_records = []
if provider_ids is not None and len(provider_ids) > 0:
response = self.os_client.search(
body={
"query": {
"ids": {
"values": provider_ids if isinstance(provider_ids, list) else [provider_ids],
}
},
},
index=self.get_index('providers')
)
for hit in response['hits']['hits']:
provider_records.append(extract_nested(hit, ['_source']))
return provider_records
def get_provider(self, provider_id: str):
if provider_id is not None:
providers = self.get_providers([provider_id])
if providers is not None and len(providers) > 0:
return providers[0]
return {}
def get_services(self, service_ids: list[str]) -> list:
service_records = []
if service_ids is not None and len(service_ids) > 0:
response = self.os_client.search(
body={
"query": {
"ids": {
"values": service_ids if isinstance(service_ids, list) else [
service_ids],
}
},
},
index=self.get_index('services')
)
for hit in response['hits']['hits']:
service_records.append(extract_nested(hit, ['_source']))
return service_records
def get_datasource_of_service(self, service_id: str):
response = self.os_client.search(
body={
'query': {
'term': {
'datasource.serviceId.keyword': service_id,
}
}
},
index=self.get_index('datasources')
)
for hit in response['hits']['hits']:
return extract_nested(hit, ['_source'])
return {}
def get_services_of_interoperability(self, interoperability_id: str):
svc_ids = []
response = self.os_client.search(
body={
'query': {
'term': {
'resourceInteroperabilityRecord.interoperabilityRecordIds.keyword': interoperability_id,
}
},
"fields": [
"resourceInteroperabilityRecord.resourceId"
],
"_source": False
},
index=self.get_index('resource-interoperability-records')
)
for hit in response['hits']['hits']:
svc_ids.extend(extract_nested(hit, ['fields', 'resourceInteroperabilityRecord.resourceId']) or [])
return svc_ids
def map_service(self, raw_svc: dict) -> dict:
interoperability_records = self.get_resource_interoperability_records(raw_svc['id'])
organization = self.get_provider(extract_nested(raw_svc, ['service', 'resourceOrganisation']))
provider_records = self.get_providers(list(
filter(lambda i: len(i) > 0, extract_nested(raw_svc, ['service', 'resourceProviders']) or [])))
related_resources_records = self.get_services(list(
filter(lambda i: len(i) > 0, extract_nested(raw_svc, ['service', 'relatedResources']) or [])))
datasource = self.get_datasource_of_service(raw_svc['id'])
res = {
"accessRestriction": extract_nested(raw_svc,
"service.geographicalAvailabilities".split(".")),
"accessTypes": extract_map_nested(raw_svc, 'access_type', "service.accessTypes".split(".")),
"access_modes": extract_map_nested(raw_svc, 'access_mode', "service.accessModes".split(".")),
"category": list(map(lambda c: {"category": CATALOG_VOCABULARY['categories'][c['category']],
"subcategory": CATALOG_VOCABULARY['subcategories'][c['subcategory']]},
extract_nested(raw_svc, "service.categories".split(".")))),
"description": extract_nested(raw_svc, "service.description".split(".")),
"domain": list(map(lambda c: {"domain": CATALOG_VOCABULARY['domains'][c['scientificDomain']],
"subdomain": CATALOG_VOCABULARY['subdomains'][c['scientificSubdomain']]},
extract_nested(raw_svc, "service.scientificDomains".split(".")))),
"grantProjectNames": extract_nested(raw_svc, "service.grantProjectNames".split(".")),
"helpdeskPage": extract_nested(raw_svc, "service.helpdeskPage".split(".")),
"horizontal": extract_nested(raw_svc, "service.horizontalService".split(".")) or False,
"id": extract_nested(raw_svc, "service.id".split(".")),
"interoperabilityGuidelines": list(
map(lambda ig: ig['interoperabilityRecord']['title'], interoperability_records)),
"language": extract_map_nested(raw_svc, 'languages', "service.languageAvailabilities".split(".")),
"name": extract_nested(raw_svc, "service.name".split(".")),
"orderType": extract_map_nested(raw_svc, 'order_type', "service.orderType".split(".")),
"organization": extract_nested(organization, "provider.name".split(".")),
"pricing": extract_nested(raw_svc, "service.pricing".split(".")),
"privacyPolicy": extract_nested(raw_svc, "service.privacyPolicy".split(".")),
"providers": list(map(lambda p: p['provider']['name'], provider_records)),
"relatedPlatforms": extract_map_nested(raw_svc, 'related_platform', "service.relatedPlatforms".split(".")),
"relatedResources": list(map(lambda p: p['service']['name'], related_resources_records)),
"tags": extract_nested(raw_svc, "service.tags".split(".")),
"targetUsers": extract_map_nested(raw_svc, 'target_user', "service.targetUsers".split(".")),
"termsOfUse": extract_nested(raw_svc, "service.termsOfUse".split(".")),
"thematic": extract_nested(datasource, "datasource.thematic".split(".")) or False,
"trl": extract_map_nested(raw_svc, 'trl', "service.trl".split(".")),
"type": 'datasource' if extract_nested(datasource, "datasource.id".split(".")) is not None else 'service',
"useCases": extract_nested(raw_svc, "service.useCases".split(".")),
"userManual": extract_nested(raw_svc, "service.userManual".split(".")),
"webpage": extract_nested(raw_svc, "service.webpage".split(".")),
"year": datetime.fromtimestamp(
int(extract_nested(raw_svc, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)
def map_training(self, raw_trn: dict) -> dict:
organization = self.get_provider(extract_nested(raw_trn, ['trainingResource', 'resourceOrganisation']))
res = {
"accessRight": extract_map_nested(raw_trn, 'tr_access', "trainingResource.accessRights".split(".")),
"alternativeIdentifiers": extract_nested(raw_trn,
"trainingResource.alternativeIdentifiers".split(".")),
"authors": extract_nested(raw_trn,
"trainingResource.authors".split(".")),
"contentResourceType": extract_map_nested(raw_trn, 'tr_content',
"trainingResource.contentResourceTypes".split(".")),
"description": extract_nested(raw_trn,
"trainingResource.description".split(".")),
"domain": list(map(lambda c: {"domain": CATALOG_VOCABULARY['domains'][c['scientificDomain']],
"subdomain": CATALOG_VOCABULARY['subdomains'][c['scientificSubdomain']]},
extract_nested(raw_trn, "trainingResource.scientificDomains".split(".")))),
"duration": extract_nested(raw_trn,
"trainingResource.duration".split(".")),
"expertiseLevel": extract_map_nested(raw_trn, 'expertise_level',
"trainingResource.expertiseLevel".split(".")),
"id": extract_nested(raw_trn,
"trainingResource.id".split(".")),
"keyword": extract_nested(raw_trn,
"trainingResource.keywords".split(".")),
"language": extract_map_nested(raw_trn, 'languages', "trainingResource.languages".split(".")),
"learningOutcomes": extract_nested(raw_trn,
"trainingResource.learningOutcomes".split(".")),
"learningResourceType": extract_map_nested(raw_trn, 'tr_dcmi',
"trainingResource.learningResourceTypes".split(".")),
"license": extract_nested(raw_trn,
"trainingResource.license".split(".")),
"organization": extract_nested(organization, "provider.name".split(".")),
"qualifications": extract_map_nested(raw_trn, 'qualification',
"trainingResource.qualifications".split(".")),
"targetGroup": extract_map_nested(raw_trn, 'target_user', "trainingResource.targetGroups".split(".")),
"title": extract_nested(raw_trn,
"trainingResource.title".split(".")),
"type": 'trainingResource',
"url": extract_nested(raw_trn,
"trainingResource.url".split(".")),
"year": datetime.fromtimestamp(
int(extract_nested(raw_trn, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)
def map_interoperability(self, raw_itr: dict) -> dict:
organization = self.get_provider(extract_nested(raw_itr, ['interoperabilityRecord', 'providerId']))
service_records = self.get_services(self.get_services_of_interoperability(raw_itr['id']))
res = {
"alternativeIdentifiers": extract_nested(raw_itr,
"interoperabilityRecord.alternativeIdentifiers".split(".")),
"creators": list(map(lambda c: {
"affiliation": extract_nested(c, ['creatorAffiliationInfo', 'affiliation']),
"givenName": extract_nested(c, ['givenName']),
"familyName": extract_nested(c, ['familyName']),
"fullName": extract_nested(c, ['creatorNameTypeInfo', 'creatorName']),
"type": extract_nested(c, ['creatorNameTypeInfo', 'nameType'])
}, extract_nested(raw_itr, "interoperabilityRecord.creators".split(".")))),
"description": extract_nested(raw_itr,
"interoperabilityRecord.description".split(".")),
"doi": extract_nested(raw_itr, ['identifierInfo', 'identifier']) if
extract_nested(raw_itr, ['identifierInfo', 'identifierType']) == 'ir_identifier_type-doi' else None,
"domain": {'domain': extract_map_nested(raw_itr, 'domains',
"interoperabilityRecord.domain".split("."))},
"guidelineType": extract_map_nested(raw_itr, 'guideline_type',
"interoperabilityRecord.eoscGuidelineType".split(".")),
"id": extract_nested(raw_itr,
"interoperabilityRecord.id".split(".")),
"license": extract_nested(raw_itr, "interoperabilityRecord.rights.rightIdentifier".split(".")),
"licenseDetails": list(map(lambda c: {
"identifier": extract_nested(c, ['rightIdentifier']),
"title": extract_nested(c, ['rightTitle']),
"uri": extract_nested(c, ['rightURI'])
}, extract_nested(raw_itr, "interoperabilityRecord.rights".split(".")))),
"organization": extract_nested(organization, "provider.name".split(".")),
"provider": extract_nested(organization, "provider.name".split(".")),
"publicationYear": extract_nested(raw_itr, "interoperabilityRecord.publicationYear".split(".")),
"services": list(map(lambda s: {
"name": extract_nested(organization, "service.name".split(".")),
"organization": extract_nested(organization, "service.organization".split(".")),
# s.organization on already mapped services
}, service_records)),
"status": extract_nested(raw_itr, "interoperabilityRecord.status".split(".")),
"title": extract_nested(raw_itr, "interoperabilityRecord.title".split(".")),
"type": 'interoperabilityRecord',
# "year": datetime.fromtimestamp(int(extract_nested(raw_data, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)

View File

@ -0,0 +1,41 @@
from typing import Dict, Any, List
from catalogue.vocabulary import CATALOG_VOCABULARY
def extract_nested(current_value: Dict[str, Any], labels: List[str]) -> Any | None:
if len(labels) <= 0:
return current_value
for label in labels:
if isinstance(current_value, dict) and label in current_value:
current_value = current_value[label]
else:
return None
return current_value
def extract_map_nested(current_value: Dict[str, Any], dictionary: str, labels: List[str]) -> Any | None:
value = extract_nested(current_value, labels)
if value is None:
return None
if isinstance(value, list):
return list(map(lambda d: CATALOG_VOCABULARY[dictionary][d] if d else None, value))
if isinstance(value, str) and value != '':
return CATALOG_VOCABULARY[dictionary][value]
return None
def delete_none(_dict):
"""Delete None values recursively from all of the dictionaries, tuples, lists, sets"""
if isinstance(_dict, dict):
for key, value in list(_dict.items()):
if isinstance(value, (list, dict, tuple, set)):
_dict[key] = delete_none(value)
elif value is None or key is None:
del _dict[key]
elif isinstance(_dict, (list, set, tuple)):
_dict = type(_dict)(delete_none(item) for item in _dict if item is not None)
return _dict

View File

@ -3,347 +3,11 @@ from typing import Dict, Any, List
from opensearchpy import OpenSearch from opensearchpy import OpenSearch
from catalogue.dictutils import extract_nested, extract_map_nested, delete_none
from catalogue.vocabulary import CATALOG_VOCABULARY from catalogue.vocabulary import CATALOG_VOCABULARY
def extract_nested(current_value: Dict[str, Any], labels: List[str]) -> Any | None:
if len(labels) <= 0:
return current_value
for label in labels:
if isinstance(current_value, dict) and label in current_value:
current_value = current_value[label]
else:
return None
return current_value
def extract_map_nested(current_value: Dict[str, Any], dictionary: str, labels: List[str]) -> Any | None:
value = extract_nested(current_value, labels)
if value is None:
return None
if isinstance(value, list):
return list(map(lambda d: CATALOG_VOCABULARY[dictionary][d] if d else None, value))
if isinstance(value, str) and value != '':
return CATALOG_VOCABULARY[dictionary][value]
return None
def delete_none(_dict):
"""Delete None values recursively from all of the dictionaries, tuples, lists, sets"""
if isinstance(_dict, dict):
for key, value in list(_dict.items()):
if isinstance(value, (list, dict, tuple, set)):
_dict[key] = delete_none(value)
elif value is None or key is None:
del _dict[key]
elif isinstance(_dict, (list, set, tuple)):
_dict = type(_dict)(delete_none(item) for item in _dict if item is not None)
return _dict
def map_service(raw_data: dict, os_client: OpenSearch) -> dict:
response = os_client.search(
body={
'query': {
'term': {
'resourceInteroperabilityRecord.resourceId.keyword': raw_data['id'],
}
},
"fields": [
"resourceInteroperabilityRecord.interoperabilityRecordIds"
],
"_source": False
},
index='resource-interoperability-records_test'
)
ig_ids = []
interoperability_records = []
for hit in response['hits']['hits']:
ig_ids.extend(extract_nested(hit, ['fields', 'resourceInteroperabilityRecord.interoperabilityRecordIds']) or [])
if len(ig_ids) > 0:
response = os_client.search(
body={
"query": {
"ids": {
"values": ig_ids,
}
},
},
index='interoperability-records_test'
)
for hit in response['hits']['hits']:
interoperability_records.append(extract_nested(hit, ['_source']))
organization = {} # "provider.name through service.resourceOrganisation=provider.id", for service.Organization
if extract_nested(raw_data, ['service', 'resourceOrganisation']) is not None:
response = os_client.search(
body={
"query": {
"ids": {
"values": [extract_nested(raw_data, ['service', 'resourceOrganisation'])],
}
},
},
index='providers_test'
)
for hit in response['hits']['hits']:
organization = extract_nested(hit, ['_source'])
break
# INNER JOIN! extract_map_nested(raw_data, 'rel_service', "service.relatedResources".split(".")),
# TODO: relatedResources via query
related_resources_records = []
related_resources_ids = list(
filter(lambda i: len(i) > 0, extract_nested(raw_data, ['service', 'relatedResources']) or []))
print(related_resources_ids)
if related_resources_ids is not None and len(related_resources_ids) > 0:
response = os_client.search(
body={
"query": {
"ids": {
"values": related_resources_ids if isinstance(related_resources_ids, list) else [
related_resources_ids],
}
},
},
index='services_test'
)
for hit in response['hits']['hits']:
related_resources_records.append(extract_nested(hit, ['_source']))
provider_records = [] # "provider.name through s.service.resourceProviders=provider.id",
provider_ids = list(filter(lambda i: len(i) > 0, extract_nested(raw_data, ['service', 'resourceProviders']) or []))
print(provider_ids)
if provider_ids is not None and len(provider_ids) > 0:
response = os_client.search(
body={
"query": {
"ids": {
"values": provider_ids if isinstance(provider_ids, list) else [provider_ids],
}
},
},
index='providers_test'
)
for hit in response['hits']['hits']:
provider_records.append(extract_nested(hit, ['_source']))
datasource = {} # datasource that point to this via serviceID
response = os_client.search(
body={
'query': {
'term': {
'datasource.serviceId.keyword': raw_data['id'],
}
}
},
index='datasources_test'
)
for hit in response['hits']['hits']:
datasource = extract_nested(hit, ['_source'])
break
res = {
"accessRestriction": extract_nested(raw_data,
"service.geographicalAvailabilities".split(".")),
"accessTypes": extract_map_nested(raw_data, 'access_type', "service.accessTypes".split(".")),
"access_modes": extract_map_nested(raw_data, 'access_mode', "service.accessModes".split(".")),
"category": list(map(lambda c: {"category": CATALOG_VOCABULARY['categories'][c['category']],
"subcategory": CATALOG_VOCABULARY['subcategories'][c['subcategory']]},
extract_nested(raw_data, "service.categories".split(".")))),
"description": extract_nested(raw_data, "service.description".split(".")),
"domain": list(map(lambda c: {"domain": CATALOG_VOCABULARY['domains'][c['scientificDomain']],
"subdomain": CATALOG_VOCABULARY['subdomains'][c['scientificSubdomain']]},
extract_nested(raw_data, "service.scientificDomains".split(".")))),
"grantProjectNames": extract_nested(raw_data, "service.grantProjectNames".split(".")),
"helpdeskPage": extract_nested(raw_data, "service.helpdeskPage".split(".")),
"horizontal": extract_nested(raw_data, "service.horizontalService".split(".")) or False,
"id": extract_nested(raw_data, "service.id".split(".")),
"interoperabilityGuidelines": list(
map(lambda ig: ig['interoperabilityRecord']['title'], interoperability_records)),
"language": extract_map_nested(raw_data, 'languages', "service.languageAvailabilities".split(".")),
"name": extract_nested(raw_data, "service.name".split(".")),
"orderType": extract_map_nested(raw_data, 'order_type', "service.orderType".split(".")),
"organization": extract_nested(organization, "provider.name".split(".")),
"pricing": extract_nested(raw_data, "service.pricing".split(".")),
"privacyPolicy": extract_nested(raw_data, "service.privacyPolicy".split(".")),
"providers": list(map(lambda p: p['provider']['name'], provider_records)),
"relatedPlatforms": extract_map_nested(raw_data, 'related_platform', "service.relatedPlatforms".split(".")),
"relatedResources": list(map(lambda p: p['service']['name'], related_resources_records)),
"tags": extract_nested(raw_data, "service.tags".split(".")),
"targetUsers": extract_map_nested(raw_data, 'target_user', "service.targetUsers".split(".")),
"termsOfUse": extract_nested(raw_data, "service.termsOfUse".split(".")),
"thematic": extract_nested(datasource, "datasource.thematic".split(".")) or False,
"trl": extract_map_nested(raw_data, 'trl', "service.trl".split(".")),
"type": 'datasource' if extract_nested(datasource, "datasource.id".split(".")) is not None else 'service',
"useCases": extract_nested(raw_data, "service.useCases".split(".")),
"userManual": extract_nested(raw_data, "service.userManual".split(".")),
"webpage": extract_nested(raw_data, "service.webpage".split(".")),
"year": datetime.fromtimestamp(int(extract_nested(raw_data, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)
def map_training(raw_data: dict, os_client: OpenSearch) -> dict:
organization = {} # "provider.name through service.resourceOrganisation=provider.id", for service.Organization
if extract_nested(raw_data, ['trainingResource', 'resourceOrganisation']) is not None:
response = os_client.search(
body={
"query": {
"ids": {
"values": [extract_nested(raw_data, ['trainingResource', 'resourceOrganisation'])],
}
},
},
index='providers_test'
)
for hit in response['hits']['hits']:
organization = extract_nested(hit, ['_source'])
break
res = {
"accessRight": extract_map_nested(raw_data, 'tr_access', "trainingResource.accessRights".split(".")),
"alternativeIdentifiers": extract_nested(raw_data,
"trainingResource.alternativeIdentifiers".split(".")),
"authors": extract_nested(raw_data,
"trainingResource.authors".split(".")),
"contentResourceType": extract_map_nested(raw_data, 'tr_content',
"trainingResource.contentResourceTypes".split(".")),
"description": extract_nested(raw_data,
"trainingResource.description".split(".")),
"domain": list(map(lambda c: {"domain": CATALOG_VOCABULARY['domains'][c['scientificDomain']],
"subdomain": CATALOG_VOCABULARY['subdomains'][c['scientificSubdomain']]},
extract_nested(raw_data, "trainingResource.scientificDomains".split(".")))),
"duration": extract_nested(raw_data,
"trainingResource.duration".split(".")),
"expertiseLevel": extract_map_nested(raw_data, 'expertise_level', "trainingResource.expertiseLevel".split(".")),
"id": extract_nested(raw_data,
"trainingResource.id".split(".")),
"keyword": extract_nested(raw_data,
"trainingResource.keywords".split(".")),
"language": extract_map_nested(raw_data, 'languages', "trainingResource.languages".split(".")),
"learningOutcomes": extract_nested(raw_data,
"trainingResource.learningOutcomes".split(".")),
"learningResourceType": extract_map_nested(raw_data, 'tr_dcmi',
"trainingResource.learningResourceTypes".split(".")),
"license": extract_nested(raw_data,
"trainingResource.license".split(".")),
"organization": extract_nested(organization, "provider.name".split(".")),
"qualifications": extract_map_nested(raw_data, 'qualification', "trainingResource.qualifications".split(".")),
"targetGroup": extract_map_nested(raw_data, 'target_user', "trainingResource.targetGroups".split(".")),
"title": extract_nested(raw_data,
"trainingResource.title".split(".")),
"type": 'trainingResource',
"url": extract_nested(raw_data,
"trainingResource.url".split(".")),
"year": datetime.fromtimestamp(int(extract_nested(raw_data, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)
def map_interoperability(raw_data: dict, os_client: OpenSearch) -> dict:
organization = {} # "provider.name through service.resourceOrganisation=provider.id", for service.Organization
if extract_nested(raw_data, ['interoperabilityRecord', 'providerId']) is not None:
response = os_client.search(
body={
"query": {
"ids": {
"values": [extract_nested(raw_data, ['interoperabilityRecord', 'providerId'])],
}
},
},
index='providers_test'
)
for hit in response['hits']['hits']:
organization = extract_nested(hit, ['_source'])
break
response = os_client.search(
body={
'query': {
'term': {
'resourceInteroperabilityRecord.interoperabilityRecordIds.keyword': raw_data['id'],
}
},
"fields": [
"resourceInteroperabilityRecord.resourceId"
],
"_source": False
},
index='resource-interoperability-records_test'
)
svc_ids = []
service_records = []
for hit in response['hits']['hits']:
svc_ids.extend(extract_nested(hit, ['fields', 'resourceInteroperabilityRecord.resourceId']) or [])
print(raw_data)
print(svc_ids)
if len(svc_ids) > 0:
response = os_client.search(
body={
"query": {
"ids": {
"values": svc_ids,
}
},
},
index='services_test'
)
for hit in response['hits']['hits']:
service_records.append(extract_nested(hit, ['_source']))
res = {
"alternativeIdentifiers": extract_nested(raw_data,
"interoperabilityRecord.alternativeIdentifiers".split(".")),
"creators": list(map(lambda c: {
"affiliation": extract_nested(c, ['creatorAffiliationInfo', 'affiliation']),
"givenName": extract_nested(c, ['givenName']),
"familyName": extract_nested(c, ['familyName']),
"fullName": extract_nested(c, ['creatorNameTypeInfo', 'creatorName']),
"type": extract_nested(c, ['creatorNameTypeInfo', 'nameType'])
}, extract_nested(raw_data, "interoperabilityRecord.creators".split(".")))),
"description": extract_nested(raw_data,
"interoperabilityRecord.description".split(".")),
"doi": extract_nested(raw_data, ['identifierInfo', 'identifier']) if
extract_nested(raw_data, ['identifierInfo', 'identifierType']) == 'ir_identifier_type-doi' else None,
"domain": {'domain': extract_map_nested(raw_data, 'domains',
"interoperabilityRecord.domain".split("."))},
"guidelineType": extract_map_nested(raw_data, 'guideline_type',
"interoperabilityRecord.eoscGuidelineType".split(".")),
"id": extract_nested(raw_data,
"interoperabilityRecord.id".split(".")),
"license": extract_nested(raw_data, "interoperabilityRecord.rights.rightIdentifier".split(".")),
"licenseDetails": list(map(lambda c: {
"identifier": extract_nested(c, ['rightIdentifier']),
"title": extract_nested(c, ['rightTitle']),
"uri": extract_nested(c, ['rightURI'])
}, extract_nested(raw_data, "interoperabilityRecord.rights".split(".")))),
"organization": extract_nested(organization, "provider.name".split(".")),
"provider": extract_nested(organization, "provider.name".split(".")),
"publicationYear": extract_nested(raw_data, "interoperabilityRecord.publicationYear".split(".")),
"services": list(map(lambda s: {
"name": extract_nested(organization, "service.name".split(".")),
"organization": extract_nested(organization, "service.organization".split(".")),
# s.organization on already mapped services
}, service_records)),
"status": extract_nested(raw_data, "interoperabilityRecord.status".split(".")),
"title": extract_nested(raw_data, "interoperabilityRecord.title".split(".")),
"type": 'interoperabilityRecord',
# "year": datetime.fromtimestamp(int(extract_nested(raw_data, "metadata.registeredAt".split("."))) / 1000).year,
}
return delete_none(res)

View File

@ -2,8 +2,7 @@ from __future__ import annotations
import json import json
import os import os
from datetime import timedelta, datetime from datetime import timedelta
from typing import Any, List, Dict
import opensearchpy import opensearchpy
import pendulum import pendulum
@ -12,9 +11,9 @@ from airflow.decorators import dag
from airflow.decorators import task from airflow.decorators import task
from airflow.hooks.base import BaseHook from airflow.hooks.base import BaseHook
from airflow.utils.helpers import chain from airflow.utils.helpers import chain
from opensearchpy import OpenSearch from opensearchpy import OpenSearch, helpers
from catalogue.mappers import map_interoperability, map_training, map_service from catalogue.RawCatalogOpensearch import RawCatalogOpensearch
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
@ -34,7 +33,6 @@ default_args = {
default_args=default_args, default_args=default_args,
params={ params={
"OPENSEARCH_CONN_ID": "opensearch_default", "OPENSEARCH_CONN_ID": "opensearch_default",
"BATCH_LOADERS_NUM": 10,
"ENTITIES": ["datasources", "ENTITIES": ["datasources",
"interoperability-records", "interoperability-records",
"providers", "providers",
@ -94,14 +92,18 @@ def import_catalogue_entities():
pool_maxsize=20, pool_maxsize=20,
timeout=180 timeout=180
) )
catalog = RawCatalogOpensearch(client, kwargs["params"]["SUFFIX"])
session = requests.session() session = requests.session()
for entity in kwargs["params"]["ENTITIES"]: for entity in kwargs["params"]["ENTITIES"]:
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}' indexname = catalog.get_index(entity)
baseurl = "http://vereniki.athenarc.gr:8080/eic-registry" baseurl = "http://vereniki.athenarc.gr:8080/eic-registry"
callurl = f"{baseurl}/{entity}" callurl = f"{baseurl}/{entity}"
params = {"draft": "false", "active": "true", "suspended": "false"} params = {"draft": "false", "active": "true", "suspended": "false"}
if client.indices.exists(indexname):
client.indices.delete(indexname)
while True: while True:
reply = session.get(url=callurl, params=params) reply = session.get(url=callurl, params=params)
reply.raise_for_status() reply.raise_for_status()
@ -112,20 +114,24 @@ def import_catalogue_entities():
if len(results) <= 0: if len(results) <= 0:
break break
for result in results: def streamed_results():
# TODO: mapping code for r in results:
body = {"doc": result, "doc_as_upsert": True} yield {"_index": indexname, "_id": r['id'], "_source": r}
client.update(
index=indexname, succeeded = 0
body=body, failed = 0
id=result['id'], for success, item in helpers.parallel_bulk(client, actions=streamed_results(), timeout=5*60):
refresh=True if success:
) succeeded = succeeded + 1
else:
print("error: " + str(item))
failed = failed + 1
# end of stream conditions # end of stream conditions
if content['to'] >= content['total']: if content['to'] >= content['total']:
break break
params['from'] = content['to'] params['from'] = content['to']
client.indices.refresh(indexname)
@task @task
def map_indexes(**kwargs): def map_indexes(**kwargs):
@ -140,22 +146,31 @@ def import_catalogue_entities():
timeout=180 timeout=180
) )
session = requests.session() catalog = RawCatalogOpensearch(client, kwargs["params"]["SUFFIX"])
for entity in {"interoperability-records", "training-resources", "services"}.intersection(kwargs["params"]["ENTITIES"]):
indexname = f'{entity}_{kwargs["params"]["SUFFIX"]}'
for hit in opensearchpy.helpers.scan(client, index=indexname, query={"query": {"match_all": {}}}): for entity in {"interoperability-records", "training-resources", "services"}.intersection(
kwargs["params"]["ENTITIES"]):
for hit in opensearchpy.helpers.scan(client, index=catalog.get_index(entity),
query={"query": {"match_all": {}}}):
s = hit['_source'] s = hit['_source']
doc = None
match entity: match entity:
case "interoperability-records": case "interoperability-records":
print(json.dumps(map_interoperability(s, client), indent=-1)) doc = catalog.map_interoperability(s)
case "training-resources": case "training-resources":
print(json.dumps(map_training(s, client), indent=-1)) doc = catalog.map_training(s)
case "services": case "services":
print(json.dumps(map_service(s, client), indent=-1)) doc = catalog.map_service(s)
case _:
pass if doc is not None:
client.update(
index=f'{entity}_{kwargs["params"]["SUFFIX"]}',
body={"doc": doc, "doc_as_upsert": True},
id=doc['id'],
refresh=True
)
@task @task
def close_indexes(**kwargs): def close_indexes(**kwargs):
@ -177,7 +192,6 @@ def import_catalogue_entities():
"number_of_replicas": 1, "number_of_replicas": 1,
"refresh_interval": "60s", "refresh_interval": "60s",
} }
}) })
# update aliases # update aliases