diff --git a/airflow/dags/import_skg_if.py b/airflow/dags/import_skg_if.py index 1ea28a6..4f34364 100644 --- a/airflow/dags/import_skg_if.py +++ b/airflow/dags/import_skg_if.py @@ -28,9 +28,9 @@ OPENSEARCH_URL= Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1- OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") -ENTITIES = [ "datasources", "grants", "organizations", "persons", "products", "topics", "venues"] +ENTITIES = ["datasources", "grants", "organizations", "persons", "products", "topics", "venues"] -BULK_PARALLELISM = 7 +BULK_PARALLELISM = 16 # @@ -171,7 +171,7 @@ def skg_if_pipeline(): for success, item in helpers.parallel_bulk(client, actions=_generate_data(), raise_on_exception=False, raise_on_error=False, - chunk_size=500, max_chunk_bytes=10 * 1024 * 1024, + chunk_size=5000, max_chunk_bytes=50 * 1024 * 1024, timeout=180): if success: succeeded = succeeded + 1