initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-26 10:54:46 +01:00
parent 10c29f86c2
commit 7e41f71d32
1 changed files with 4 additions and 4 deletions

View File

@ -44,7 +44,7 @@ default_args = {
},
tags=["lot1"],
)
def import_EOSC_catalog(**context):
def import_EOSC_catalog():
@task
def create_indexes():
conn = BaseHook.get_connection('opensearch_default')
@ -83,7 +83,7 @@ def import_EOSC_catalog(**context):
pieces = []
for entity in ENTITIES:
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=context["params"]["EOSC_CATALOG_BUCKET"], prefix=f'{entity}/')
keys = hook.list_keys(bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], prefix=f'{entity}/')
for key in keys:
pieces.append((entity, key))
@ -94,7 +94,7 @@ def import_EOSC_catalog(**context):
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
@task
def bulk_load(files: list[(str, str)]):
def bulk_load(files: list[(str, str)], **kwargs):
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
@ -109,7 +109,7 @@ def import_EOSC_catalog(**context):
def _generate_data():
for (entity, key) in files:
print(f'{entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=context["params"]["EOSC_CATALOG_BUCKET"])
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
buff = io.BufferedReader(gzipfile)
for line in buff: