From 13ac9767c6534cbaf2b205c95f617c166fa1a722 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 6 Nov 2024 15:18:17 +0100 Subject: [PATCH] Orcid propagation step --- workflow/dnet/orcid_propagate.py | 55 ++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 workflow/dnet/orcid_propagate.py diff --git a/workflow/dnet/orcid_propagate.py b/workflow/dnet/orcid_propagate.py new file mode 100644 index 0000000..f8b73dd --- /dev/null +++ b/workflow/dnet/orcid_propagate.py @@ -0,0 +1,55 @@ +import os +from datetime import timedelta + +from airflow.decorators import dag +from airflow.models.param import Param +from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator + +from spark_configurator import SparkConfigurator + +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))) +} + + +@dag( + dag_id="orcid_propagation_graph", + dag_display_name="Propagate ORCID data in graph", + default_args=default_args, + params={ + "S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"), + "ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""), + "INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""), + "OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string', + description=""), + "WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string', + description="") + }, + tags=["openaire"] +) +def orcid_propagation_dag(): + orcid_propagate = SparkKubernetesOperator( + task_id='PropagateGraphWithOrcid', + task_display_name="Propagate ORCID data", + namespace='dnet-spark-jobs', + template_spec=SparkConfigurator( + name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}", + mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor", + jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', + arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}", + "--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}", + "--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}", + "--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}", + "--matchingSource", "graph" + ]).get_configuration(), + kubernetes_conn_id="kubernetes_default" + ) + + orcid_propagate + + +orcid_propagation_dag()