initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 15:54:49 +01:00
parent 349db6f602
commit 0c27895e13
1 changed files with 16 additions and 5 deletions

View File

@ -24,8 +24,9 @@ from opensearch_indexes import mappings
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu")
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST= Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL= Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL = Variable.get("OPENSEARCH_URL",
"https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
@ -64,9 +65,16 @@ def map_access_right(ar: str) -> str:
def map_product(p: dict) -> dict:
p['accessRight'] = list(filter(lambda ar: ar != '', map(lambda m: map_access_right(m.get('access_rights')), p.get('manifestations'))))
p['accessRight'] = list(
filter(lambda ar: ar != '', map(lambda m: map_access_right(m.get('access_rights')), p.get('manifestations'))))
return p
map_entities = {
'products': map_product
}
@dag(
schedule=None,
dagrun_timeout=None,
@ -152,7 +160,7 @@ def import_EOSC_graph():
for i in range(0, len(list_a), chunk_size):
yield {"files": list_a[i:i + chunk_size]}
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
return list(split_list(pieces, len(pieces) // BULK_PARALLELISM))
@task(executor_config={
"pod_override": k8s.V1Pod(
@ -196,6 +204,8 @@ def import_EOSC_graph():
data = json.loads(line)
data['_index'] = entity
data['_id'] = data['local_identifier']
if entity in map_entities:
data = map_entities[entity](data)
yield data
# disable success post logging
@ -212,7 +222,7 @@ def import_EOSC_graph():
succeeded = succeeded + 1
else:
print(item["index"]["error"])
failed = failed+1
failed = failed + 1
if failed > 0:
print(f"There were {failed} errors:")
@ -251,4 +261,5 @@ def import_EOSC_graph():
close_indexes.override(task_id="close_indexes")()
)
import_EOSC_graph()