initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-27 12:49:33 +01:00
parent 6c76a3e0b8
commit e684e4cae5
1 changed files with 4 additions and 4 deletions

View File

@ -92,8 +92,8 @@ def import_EOSC_catalog():
return list(split_list(pieces, len(pieces)))
@task
def bulk_load(files: list[(str, str)], **kwargs):
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
def bulk_load(files: list[(str, str)], params: dict):
conn = BaseHook.get_connection(params["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
@ -102,12 +102,12 @@ def import_EOSC_catalog():
ssl_show_warn=False,
pool_maxsize=20
)
hook = S3Hook(kwargs["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
hook = S3Hook(params["S3_CONN_ID"], transfer_config_args={'use_threads': False})
def _generate_data():
for (entity, key) in files:
print(f'{entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
s3_obj = hook.get_key(key, bucket_name=params["EOSC_CATALOG_BUCKET"])
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
buff = io.BufferedReader(gzipfile)
for line in buff: