simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-15 16:34:17 +01:00
parent c0bfa81d97
commit 5cc3b050ce
1 changed files with 4 additions and 2 deletions

View File

@ -173,11 +173,13 @@ def skg_if_pipeline():
for entity in ENTITIES: for entity in ENTITIES:
client.indices.refresh(entity) client.indices.refresh(entity)
parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches)
chain( chain(
unzip_to_s3.override(task_id="unzip_to_s3")("dump.zip", S3_BUCKET_NAME), unzip_to_s3.override(task_id="unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
create_indexes.override(task_id="create_indexes")(), create_indexes.override(task_id="create_indexes")(),
PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches), parallel_batches,
bulk_load.expand_kwargs(compute_batches.output), bulk_load.expand_kwargs(parallel_batches.output),
close_indexes.override(task_id="close_indexes")() close_indexes.override(task_id="close_indexes")()
) )