diff --git a/workflow/dnet/build_openaire_graph_incremental.py b/workflow/dnet/build_openaire_graph_incremental.py index 3b05d79..39e2223 100644 --- a/workflow/dnet/build_openaire_graph_incremental.py +++ b/workflow/dnet/build_openaire_graph_incremental.py @@ -6,8 +6,8 @@ from airflow.models.param import Param from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator -from spark_configurator import SparkConfigurator import dag_utils +from spark_configurator import SparkConfigurator @dag( @@ -99,7 +99,8 @@ def build_new_graph(): jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"], "--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"], - "--relationPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"] + "--relationPath", + "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"] + "/relation" ]).get_configuration(), kubernetes_conn_id="kubernetes_default" ), @@ -112,7 +113,8 @@ def build_new_graph(): name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}", mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob", jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', - arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"], + arguments=["--graphInputPath", + "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"], "--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"], "--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]), "--format", "text"