From 7af06fbda5bfaff63c3a05ac3b2f2b89982d3d37 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 28 Oct 2024 21:10:38 +0100 Subject: [PATCH] Clean DAG --- workflow/dnet/clean.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/dnet/clean.py b/workflow/dnet/clean.py index 93ca057..d6b0e48 100644 --- a/workflow/dnet/clean.py +++ b/workflow/dnet/clean.py @@ -24,7 +24,7 @@ default_args = { default_args=default_args, params={ "S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"), - "POSTGRES_CONN_ID": Param("postgres_conn", type='string', description="Airflow connection of S3 endpoint"), + "POSTGRES_CONN_ID": Param("postgres_conn", type='string', description=""), "INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/02_graph_grouped", type='string', description=""), "OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/03_graph_cleaned", type='string', description=""), @@ -43,7 +43,7 @@ default_args = { def clean_graph_dag(): getdatasourcefromcountry = SparkKubernetesOperator( task_id='getdatasourcefromcountry', - task_display_name="Propagate Relations", + task_display_name="Get datasource from Country", namespace='dnet-spark-jobs', template_spec=SparkConfigurator( name="getdatasourcefromcountry-{{ ds }}-{{ task_instance.try_number }}", @@ -101,7 +101,7 @@ def clean_graph_dag(): kubernetes_conn_id="kubernetes_default" )) - chain([getdatasourcefromcountry, masterduplicateaction], clean_tasks) + chain(getdatasourcefromcountry, masterduplicateaction, clean_tasks) clean_graph_dag()