diff --git a/workflow/dnet/orcid_enrich.py b/workflow/dnet/orcid_enrich.py index c1e3bcd..b18f8bb 100644 --- a/workflow/dnet/orcid_enrich.py +++ b/workflow/dnet/orcid_enrich.py @@ -2,6 +2,7 @@ import os from datetime import timedelta from airflow.decorators import dag +from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator @@ -32,7 +33,7 @@ default_args = { tags=["openaire"] ) def orcid_enrichment_dag(): - orcid_enrich = SparkKubernetesOperator( + chain(SparkKubernetesOperator( task_id='EnrichGraphWithOrcidAuthors', task_display_name='Enrich Authors with ORCID', namespace='dnet-spark-jobs', @@ -47,9 +48,23 @@ def orcid_enrichment_dag(): "--master", "" ]).get_configuration(), kubernetes_conn_id="kubernetes_default" + ), + SparkKubernetesOperator( + task_id='copyorcidenrichrels', + task_display_name="Copy relations to ORCID Enriched graph", + namespace='dnet-spark-jobs', + template_spec=SparkConfigurator( + name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}", + mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob", + jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', + arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/", + "--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/", + "--entities", "relation", + "--format", "text" + ]).get_configuration(), + kubernetes_conn_id="kubernetes_default" + ) ) - orcid_enrich - orcid_enrichment_dag()