diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index c8f634f..9b2d424 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -173,11 +173,13 @@ def skg_if_pipeline(): for entity in ENTITIES: client.indices.refresh(entity) + parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches) + chain( unzip_to_s3.override(task_id="unzip_to_s3")("dump.zip", S3_BUCKET_NAME), create_indexes.override(task_id="create_indexes")(), - PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches), - bulk_load.expand_kwargs(compute_batches.output), + parallel_batches, + bulk_load.expand_kwargs(parallel_batches.output), close_indexes.override(task_id="close_indexes")() )