diff --git a/airflow/dags/import_raw_graph.py b/airflow/dags/import_raw_graph.py index dc59222..1ac436e 100644 --- a/airflow/dags/import_raw_graph.py +++ b/airflow/dags/import_raw_graph.py @@ -30,7 +30,7 @@ OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") ENTITIES = ["dataset", "datasource", "organization", "otherresearchproduct", "project", "publication", "relation", "software"] -BULK_PARALLELISM = 4 +BULK_PARALLELISM = 2 # @@ -133,23 +133,21 @@ def import_raw_graph(): yield data succeeded = 0 - failed = [] - for success, item in helpers.parallel_bulk(client, actions=_generate_data()): + failed = 0 + for success, item in helpers.parallel_bulk(client, actions=_generate_data(), raise_on_exception=False, + chunk_size=500, max_chunk_bytes=10 * 1024 * 1024): if success: succeeded = succeeded + 1 else: - failed.append(item) - - if len(failed) > 0: - print(f"There were {len(failed)} errors:") - for item in failed: print(item["index"]["error"]) + failed = failed+1 + + if failed > 0: + print(f"There were {failed} errors:") if len(succeeded) > 0: print(f"Bulk-inserted {succeeded} items (streaming_bulk).") - - @task def close_indexes(): client = OpenSearch(