Copy relations in ORCID enrichment DAG

This commit is contained in:
Giambattista Bloisi 2024-11-06 16:07:34 +01:00
parent 13ac9767c6
commit 9e69ded5ef
1 changed files with 18 additions and 3 deletions

View File

@ -2,6 +2,7 @@ import os
from datetime import timedelta from datetime import timedelta
from airflow.decorators import dag from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
@ -32,7 +33,7 @@ default_args = {
tags=["openaire"] tags=["openaire"]
) )
def orcid_enrichment_dag(): def orcid_enrichment_dag():
orcid_enrich = SparkKubernetesOperator( chain(SparkKubernetesOperator(
task_id='EnrichGraphWithOrcidAuthors', task_id='EnrichGraphWithOrcidAuthors',
task_display_name='Enrich Authors with ORCID', task_display_name='Enrich Authors with ORCID',
namespace='dnet-spark-jobs', namespace='dnet-spark-jobs',
@ -47,9 +48,23 @@ def orcid_enrichment_dag():
"--master", "" "--master", ""
]).get_configuration(), ]).get_configuration(),
kubernetes_conn_id="kubernetes_default" kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copyorcidenrichrels',
task_display_name="Copy relations to ORCID Enriched graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/",
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/",
"--entities", "relation",
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
) )
orcid_enrich
orcid_enrichment_dag() orcid_enrichment_dag()