Orcid propagation step
This commit is contained in:
parent
ac2fbbb9f9
commit
13ac9767c6
|
@ -0,0 +1,55 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="orcid_propagation_graph",
|
||||
dag_display_name="Propagate ORCID data in graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string',
|
||||
description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string',
|
||||
description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def orcid_propagation_dag():
|
||||
orcid_propagate = SparkKubernetesOperator(
|
||||
task_id='PropagateGraphWithOrcid',
|
||||
task_display_name="Propagate ORCID data",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--matchingSource", "graph"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
orcid_propagate
|
||||
|
||||
|
||||
orcid_propagation_dag()
|
Loading…
Reference in New Issue