Add resolvereletion step

This commit is contained in:
Giambattista Bloisi 2024-10-29 14:39:01 +01:00
parent 9058dbb957
commit 004be2e97f
2 changed files with 47 additions and 0 deletions

View File

@ -72,6 +72,52 @@ def build_new_graph():
"--format", "text" "--format", "text"
]).get_configuration(), ]).get_configuration(),
kubernetes_conn_id="kubernetes_default" kubernetes_conn_id="kubernetes_default"
),
TriggerDagRunOperator(
task_id="clean_graph",
task_display_name="Clean Results",
trigger_dag_id="clean_graph",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
}
),
SparkKubernetesOperator(
task_id='resolverels',
task_display_name="Resolve Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="resolverels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelationById",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
"--relationPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"]
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copyresolveents',
task_display_name="Copy entities to Resolved Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
"--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]),
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
) )
# , TriggerDagRunOperator( # , TriggerDagRunOperator(

View File

@ -5,6 +5,7 @@ BUILD_PHASES = {
"raw": "01_graph_raw", "raw": "01_graph_raw",
"grouped": "02_graph_grouped", "grouped": "02_graph_grouped",
"clean": "03_graph_cleaned", "clean": "03_graph_cleaned",
"resolve": "04_graph_resolved",
"inference": "05_graph_inferred", "inference": "05_graph_inferred",
"dedup": "06_graph_dedup", "dedup": "06_graph_dedup",