From 620c6fadeaa6222cbf0d7f28d5b09447e7b0cbcd Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 20 Mar 2024 15:31:32 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/import_skg_if.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/airflow/dags/import_skg_if.py b/airflow/dags/import_skg_if.py index cd6ca28..1ea28a6 100644 --- a/airflow/dags/import_skg_if.py +++ b/airflow/dags/import_skg_if.py @@ -88,6 +88,13 @@ def skg_if_pipeline(): pool_maxsize=20 ) + client.cluster.put_settings(body={ + "persistent": { + "cluster.routing.allocation.balance.prefer_primary": True, + "segrep.pressure.enabled": True + } + }) + for entity in ENTITIES: if client.indices.exists(entity): client.indices.delete(entity) @@ -99,12 +106,22 @@ def skg_if_pipeline(): "number_of_replicas": 0, "refresh_interval": -1, + "translog.flush_threshold_size": "2048MB", + "codec": "zstd_no_dict", "replication.type": "SEGMENT" } }, - "mappings": mappings[entity] + # "mappings": mappings[entity] + "mappings":{ + "dynamic": False, + "properties": { + "local_identifier": { + "type": "keyword" + } + } + } }) def compute_batches(ds=None, **kwargs): @@ -122,7 +139,7 @@ def skg_if_pipeline(): return list(split_list(pieces, len(pieces)//BULK_PARALLELISM)) - @task + @task(execution_timeout=timedelta(days=3)) def bulk_load(files: list[(str, str)]): client = OpenSearch( hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],