DAG to build the graph from a delta

This commit is contained in:
Giambattista Bloisi 2024-10-28 17:04:44 +01:00
parent e64b3ec6b1
commit b3d7dda0c1
2 changed files with 129 additions and 0 deletions

View File

@ -0,0 +1,125 @@
from __future__ import annotations
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
import dag_utils
@dag(
dag_id="build_openaire_graph_incremental",
dag_display_name="Build the OpenAIRE graph incrementally",
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint"),
"GRAPH_PATH": Param("s3a://graph/tmp/prod_provision/graph", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description=""),
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description=""),
"DELTA_PATH": Param("s3a://graph/data/delta", type='string', description=""),
},
tags=["openaire"]
)
def build_new_graph():
chain(
SparkKubernetesOperator(
task_id='raw_graph',
task_display_name="Generate Raw Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="rawgraph-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.raw.CopyIncrementalOafSparkApplication",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--inputPath", "{{ dag_run.conf.get('DELTA_PATH') }}",
"--graphOutputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"]
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='grouped_graph',
task_display_name="Generate Grouped-by-id Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="groupedgraph-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"--checkpointPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/grouped_entities",
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--filterInvisible", "false"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copygroupedrels',
task_display_name="Copy relations to Grouped-by-id Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"--entities", "relation",
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
# , TriggerDagRunOperator(
# task_id="dedup",
# task_display_name="Deduplicate Research Results",
# trigger_dag_id="results_deduplication",
# wait_for_completion=True,
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["inference"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
# "DEDUP_CONFIG_ID": "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}"
# }
# ),
# TriggerDagRunOperator(
# task_id="consistency",
# task_display_name="Enforce Consistency of Graph",
# trigger_dag_id="consistency_graph",
# wait_for_completion=True,
#
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
# }
# ),
# TriggerDagRunOperator(
# task_id="orcid_enrichment",
# task_display_name="Enrich Graph with ORCID data",
# trigger_dag_id="orcid_enrichment_graph",
# wait_for_completion=True,
#
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
# }
# )
)
build_new_graph()

View File

@ -2,6 +2,10 @@ from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
BUILD_PHASES = { BUILD_PHASES = {
"raw": "01_graph_raw",
"grouped": "02_graph_grouped",
"clean": "03_graph_cleaned",
"inference": "05_graph_inferred", "inference": "05_graph_inferred",
"dedup": "06_graph_dedup", "dedup": "06_graph_dedup",
"consistency": "07_graph_consistent", "consistency": "07_graph_consistent",