simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-17 19:56:26 +01:00
parent 0c272f7ff2
commit fd25f9bf59
1 changed files with 8 additions and 10 deletions

View File

@ -30,7 +30,7 @@ OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
ENTITIES = ["dataset", "datasource", "organization", "otherresearchproduct", ENTITIES = ["dataset", "datasource", "organization", "otherresearchproduct",
"project", "publication", "relation", "software"] "project", "publication", "relation", "software"]
BULK_PARALLELISM = 4 BULK_PARALLELISM = 2
# #
@ -133,23 +133,21 @@ def import_raw_graph():
yield data yield data
succeeded = 0 succeeded = 0
failed = [] failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data()): for success, item in helpers.parallel_bulk(client, actions=_generate_data(), raise_on_exception=False,
chunk_size=500, max_chunk_bytes=10 * 1024 * 1024):
if success: if success:
succeeded = succeeded + 1 succeeded = succeeded + 1
else: else:
failed.append(item)
if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"]) print(item["index"]["error"])
failed = failed+1
if failed > 0:
print(f"There were {failed} errors:")
if len(succeeded) > 0: if len(succeeded) > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).") print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
@task @task
def close_indexes(): def close_indexes():
client = OpenSearch( client = OpenSearch(