simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-20 15:31:32 +01:00
parent b71bcfabf8
commit 620c6fadea
1 changed files with 19 additions and 2 deletions

View File

@ -88,6 +88,13 @@ def skg_if_pipeline():
pool_maxsize=20
)
client.cluster.put_settings(body={
"persistent": {
"cluster.routing.allocation.balance.prefer_primary": True,
"segrep.pressure.enabled": True
}
})
for entity in ENTITIES:
if client.indices.exists(entity):
client.indices.delete(entity)
@ -99,12 +106,22 @@ def skg_if_pipeline():
"number_of_replicas": 0,
"refresh_interval": -1,
"translog.flush_threshold_size": "2048MB",
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
}
},
"mappings": mappings[entity]
# "mappings": mappings[entity]
"mappings":{
"dynamic": False,
"properties": {
"local_identifier": {
"type": "keyword"
}
}
}
})
def compute_batches(ds=None, **kwargs):
@ -122,7 +139,7 @@ def skg_if_pipeline():
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
@task
@task(execution_timeout=timedelta(days=3))
def bulk_load(files: list[(str, str)]):
client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],