code-infrastructure-lab/workflow/dnet/dedup.py

174 lines
8.7 KiB
Python

import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="results_deduplication",
dag_display_name="Deduplicate Research Results",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/05_graph_inferred", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description="")
},
tags=["openaire"]
)
def results_deduplication_dag():
simrel = SparkKubernetesOperator(
task_id='CreateSimRel',
task_display_name="Create Similarity Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createsimrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
whitelist = SparkKubernetesOperator(
task_id='WhitelistSimRels',
task_display_name="Add Whitelist Similarity Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="whitelistsimrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--whiteListPath", "s3a://graph/data/dedup/whitelist_prod", # TODO: copy!
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
createmergerel = SparkKubernetesOperator(
task_id='CreateMergeRels',
task_display_name="Create Merge Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createmergerels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--cutConnectedComponent", "200",
"--hiveMetastoreUris", "",
"--pivotHistoryDatabase", ""
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
creatededuprecord = SparkKubernetesOperator(
task_id='CreateDedupRecord',
task_display_name="Create Dedup Record",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="creatededuprecord-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
copyopenorgsmergerel = SparkKubernetesOperator(
task_id='CopyOpenorgsMergeRels',
task_display_name="Copy Openorgs Merge Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyopenorgsmergerels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
createorgsdeduprecord = SparkKubernetesOperator(
task_id='CreateOrgsDedupRecord',
task_display_name="Create Organizations Dedup Records",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createorgsdeduprecord-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
updateentity = SparkKubernetesOperator(
task_id='UpdateEntity',
task_display_name="Update Entity",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="updateentity-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
copyrelations = SparkKubernetesOperator(
task_id='copyRelations',
task_display_name="Copy Non-Openorgs Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
chain(simrel, whitelist, createmergerel, creatededuprecord, copyopenorgsmergerel, createorgsdeduprecord, updateentity, copyrelations)
results_deduplication_dag()