diff --git a/airflow/dags/test_dag.py b/airflow/dags/test_dag.py index 601200e..2df3eae 100644 --- a/airflow/dags/test_dag.py +++ b/airflow/dags/test_dag.py @@ -6,6 +6,7 @@ from datetime import timedelta import pendulum from airflow.decorators import dag, task_group from airflow.decorators import task +from airflow.exceptions import AirflowSkipException from airflow.operators.empty import EmptyOperator from airflow.operators.python import get_current_context from airflow.utils.helpers import chain @@ -51,6 +52,8 @@ def import_s3_openaire_dump(): def compute_batches(): nonlocal entity kwargs = get_current_context() + if entity not in kwargs["params"]["ENTITIES"]: + raise AirflowSkipException(f"Skipping {entity}") return [[(entity, '1'), (entity, '2')], [], []] @task(executor_config={