56 lines
2.3 KiB
Python
56 lines
2.3 KiB
Python
import os
|
|
from datetime import timedelta
|
|
|
|
from airflow.decorators import dag
|
|
from airflow.models.param import Param
|
|
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
|
|
|
from spark_configurator import SparkConfigurator
|
|
|
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
|
|
|
default_args = {
|
|
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
|
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
|
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
|
}
|
|
|
|
|
|
@dag(
|
|
dag_id="orcid_propagation_graph",
|
|
dag_display_name="Propagate ORCID data in graph",
|
|
default_args=default_args,
|
|
params={
|
|
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
|
"ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
|
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
|
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string',
|
|
description=""),
|
|
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string',
|
|
description="")
|
|
},
|
|
tags=["openaire"]
|
|
)
|
|
def orcid_propagation_dag():
|
|
orcid_propagate = SparkKubernetesOperator(
|
|
task_id='PropagateGraphWithOrcid',
|
|
task_display_name="Propagate ORCID data",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
|
|
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
|
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--matchingSource", "graph"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
orcid_propagate
|
|
|
|
|
|
orcid_propagation_dag()
|