initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-27 12:54:10 +01:00
parent e684e4cae5
commit 6aa4108b2d
1 changed files with 6 additions and 5 deletions

View File

@ -74,7 +74,7 @@ def import_EOSC_catalog():
"mappings": mappings[entity]
})
def compute_batches(ds=None, **kwargs) -> list[dict]:
def compute_batches(ds=None, **kwargs):
pieces = []
for entity in ENTITIES:
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
@ -92,8 +92,9 @@ def import_EOSC_catalog():
return list(split_list(pieces, len(pieces)))
@task
def bulk_load(files: list[(str, str)], params: dict):
conn = BaseHook.get_connection(params["OPENSEARCH_CONN_ID"])
def bulk_load(**kwargs):
files = kwargs["files"]
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
@ -102,12 +103,12 @@ def import_EOSC_catalog():
ssl_show_warn=False,
pool_maxsize=20
)
hook = S3Hook(params["S3_CONN_ID"], transfer_config_args={'use_threads': False})
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
def _generate_data():
for (entity, key) in files:
print(f'{entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=params["EOSC_CATALOG_BUCKET"])
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
buff = io.BufferedReader(gzipfile)
for line in buff: