initial stage

This commit is contained in:
Giambattista Bloisi 2024-04-08 14:15:02 +02:00
parent b89d7f2646
commit 51b695c1b7
1 changed files with 35 additions and 36 deletions

View File

@ -153,45 +153,44 @@ for config_name, config in configs.items():
continue
print(f'Processing {indexname}: {key}')
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
with s3_obj.get()["Body"] as body:
with gzip.GzipFile(fileobj=body) if key.endswith(".gz") else body as gzipfile:
def _generate_data():
buff = io.BufferedReader(gzipfile)
for line in buff:
data = json.loads(line)
data['_index'] = indexname
if entity in transform_entities:
data = transform_entities[entity](data)
yield data
with gzip.GzipFile(s3_obj.get()["Body"]) if key.endswith(".gz") else s3_obj.get()["Body"] as gzipfile:
def _generate_data():
buff = io.BufferedReader(gzipfile)
for line in buff:
data = json.loads(line)
data['_index'] = indexname
if entity in transform_entities:
data = transform_entities[entity](data)
yield data
# disable success post logging
logging.getLogger("opensearch").setLevel(logging.WARN)
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
else:
print(item["index"]["error"])
failed = failed + 1
if failed > 0:
print(f"There were {failed} errors:")
# disable success post logging
logging.getLogger("opensearch").setLevel(logging.WARN)
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
else:
hook.load_string(
"",
f"{key}.PROCESSED",
bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"],
replace=False
)
print(item["index"]["error"])
failed = failed + 1
if succeeded > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
if failed > 0:
print(f"There were {failed} errors:")
else:
hook.load_string(
"",
f"{key}.PROCESSED",
bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"],
replace=False
)
if succeeded > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
@task
def close_indexes(**kwargs):