From c6a12baeba71ed6d8ed35f5acffee39b7cfb8e94 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 26 Jul 2024 22:23:33 +0200 Subject: [PATCH] initial stage --- airflow/dags/import_Catalogues.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow/dags/import_Catalogues.py b/airflow/dags/import_Catalogues.py index da618b0..a0aac75 100644 --- a/airflow/dags/import_Catalogues.py +++ b/airflow/dags/import_Catalogues.py @@ -145,15 +145,15 @@ def import_catalogue_entities(): yield {"_index": mapped_index, "_id": doc['id'], "_source": doc} - succeeded = 0 - failed = 0 - for success, item in helpers.parallel_bulk(client, actions=streamed_results(), timeout=5 * 60): - if success: - succeeded = succeeded + 1 - else: - print("error: " + str(item)) - failed = failed + 1 - print(f"Entity: {entity} succes: {success} error: {failed}") + succeeded = 0 + failed = 0 + for success, item in helpers.parallel_bulk(client, actions=streamed_results(), timeout=5 * 60): + if success: + succeeded = succeeded + 1 + else: + print("error: " + str(item)) + failed = failed + 1 + print(f"Entity: {entity} succes: {success} error: {failed}") @task def close_indexes(**kwargs):