174 lines
8.7 KiB
Python
174 lines
8.7 KiB
Python
import os
|
|
from datetime import timedelta
|
|
|
|
from airflow.decorators import dag
|
|
from airflow.models.baseoperator import chain
|
|
from airflow.models.param import Param
|
|
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
|
|
|
from spark_configurator import SparkConfigurator
|
|
|
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
|
|
|
default_args = {
|
|
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
|
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
|
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
|
}
|
|
|
|
|
|
@dag(
|
|
dag_id="results_deduplication",
|
|
dag_display_name="Deduplicate Research Results",
|
|
default_args=default_args,
|
|
params={
|
|
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
|
|
|
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/05_graph_inferred", type='string', description=""),
|
|
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
|
|
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
|
|
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
|
description=""),
|
|
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description="")
|
|
},
|
|
tags=["openaire"]
|
|
)
|
|
def results_deduplication_dag():
|
|
simrel = SparkKubernetesOperator(
|
|
task_id='CreateSimRel',
|
|
task_display_name="Create Similarity Relations",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="createsimrels-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--numPartitions", "64"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
whitelist = SparkKubernetesOperator(
|
|
task_id='WhitelistSimRels',
|
|
task_display_name="Add Whitelist Similarity Relations",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="whitelistsimrels-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--whiteListPath", "s3a://graph/data/dedup/whitelist_prod", # TODO: copy!
|
|
"--numPartitions", "64"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
createmergerel = SparkKubernetesOperator(
|
|
task_id='CreateMergeRels',
|
|
task_display_name="Create Merge Relations",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="createmergerels-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--cutConnectedComponent", "200",
|
|
"--hiveMetastoreUris", "",
|
|
"--pivotHistoryDatabase", ""
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
creatededuprecord = SparkKubernetesOperator(
|
|
task_id='CreateDedupRecord',
|
|
task_display_name="Create Dedup Record",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="creatededuprecord-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
copyopenorgsmergerel = SparkKubernetesOperator(
|
|
task_id='CopyOpenorgsMergeRels',
|
|
task_display_name="Copy Openorgs Merge Relations",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="copyopenorgsmergerels-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--numPartitions", "64"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
createorgsdeduprecord = SparkKubernetesOperator(
|
|
task_id='CreateOrgsDedupRecord',
|
|
task_display_name="Create Organizations Dedup Records",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="createorgsdeduprecord-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
|
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
updateentity = SparkKubernetesOperator(
|
|
task_id='UpdateEntity',
|
|
task_display_name="Update Entity",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="updateentity-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
copyrelations = SparkKubernetesOperator(
|
|
task_id='copyRelations',
|
|
task_display_name="Copy Non-Openorgs Relations",
|
|
namespace='dnet-spark-jobs',
|
|
template_spec=SparkConfigurator(
|
|
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
|
|
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
|
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
|
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
|
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
|
|
]).get_configuration(),
|
|
kubernetes_conn_id="kubernetes_default"
|
|
)
|
|
|
|
chain(simrel, whitelist, createmergerel, creatededuprecord, copyopenorgsmergerel, createorgsdeduprecord, updateentity, copyrelations)
|
|
|
|
|
|
results_deduplication_dag()
|