initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-26 10:52:07 +01:00
parent 4398546095
commit 10c29f86c2
1 changed files with 7 additions and 5 deletions

View File

@ -18,10 +18,8 @@ from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-portal-import")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
ENTITIES = ["interoperability", "services", "training"]
BULK_PARALLELISM = 2
@ -40,9 +38,13 @@ default_args = {
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": "s3_conn",
"EOSC_CATALOG_BUCKET": "eosc-portal-import"
},
tags=["lot1"],
)
def import_EOSC_catalog():
def import_EOSC_catalog(**context):
@task
def create_indexes():
conn = BaseHook.get_connection('opensearch_default')
@ -81,7 +83,7 @@ def import_EOSC_catalog():
pieces = []
for entity in ENTITIES:
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=EOSC_CATALOG_BUCKET, prefix=f'{entity}/')
keys = hook.list_keys(bucket_name=context["params"]["EOSC_CATALOG_BUCKET"], prefix=f'{entity}/')
for key in keys:
pieces.append((entity, key))
@ -107,7 +109,7 @@ def import_EOSC_catalog():
def _generate_data():
for (entity, key) in files:
print(f'{entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=EOSC_CATALOG_BUCKET)
s3_obj = hook.get_key(key, bucket_name=context["params"]["EOSC_CATALOG_BUCKET"])
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
buff = io.BufferedReader(gzipfile)
for line in buff: