diff --git a/airflow/dags/test_dag.py b/airflow/dags/test_dag.py index a09ae15..9f55563 100644 --- a/airflow/dags/test_dag.py +++ b/airflow/dags/test_dag.py @@ -47,13 +47,11 @@ def import_s3_openaire_dump(): @task_group def load_and_map_entity(entity: str): - kwargs = get_current_context() - @task def compute_batches(): nonlocal entity kwargs = get_current_context() - return [(entity, '1'), (entity, '2')] + return [[(entity, '1'), (entity, '2')], [], []] @task(executor_config={ "pod_override": k8s.V1Pod( @@ -72,24 +70,21 @@ def import_s3_openaire_dump(): ) ) }) - def bulk_load(files: list[(str, str)], **kwargs): + def parallel_load(files: list[(str, str)], **kwargs): kwargs = get_current_context() print(files) - if entity in kwargs["params"]["ENTITIES"]: - bulk_load(compute_batches()) - else: - EmptyOperator(task_id=f"${entity}_skipped") + parallel_load.expand(compute_batches()) @task def merge_curation_db(**kwargs): pass - @task(task_display_name="Cleanup missing curated entities") + @task def delete_missing_curated(**kwargs): pass - @task(task_display_name="Refresh indexes and link them to aliases") + @task def close_indexes(**kwargs): pass