code-infrastructure-lab/workflow/dnet/orcid_propagate.py

56 lines
2.3 KiB
Python

import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="orcid_propagation_graph",
dag_display_name="Propagate ORCID data in graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string',
description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string',
description="")
},
tags=["openaire"]
)
def orcid_propagation_dag():
orcid_propagate = SparkKubernetesOperator(
task_id='PropagateGraphWithOrcid',
task_display_name="Propagate ORCID data",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--matchingSource", "graph"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
orcid_propagate
orcid_propagation_dag()