code-infrastructure-lab/workflow/dnet/clean.py

110 lines
5.5 KiB
Python

import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
import dag_utils
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="clean_graph",
dag_display_name="Cleaning of Graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"POSTGRES_CONN_ID": Param("postgres_conn", type='string', description=""),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/02_graph_grouped", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/03_graph_cleaned", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/clean", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"COUNTRY": Param("NL", type='string', description=""),
"SHOULD_CLEAN": Param("false", type='string', description=""),
"CONTEXT_ID": Param("sobigdata", type='string', description=""),
"VERIFY_PARAM": Param("gcube", type='string', description=""),
"VERIFY_COUNTRY_PARAM": Param("10.17632;10.5061", type='string', description=""),
"COLLECTED_FROM": Param("NARCIS", type='string', description="")
},
tags=["openaire"]
)
def clean_graph_dag():
getdatasourcefromcountry = SparkKubernetesOperator(
task_id='getdatasourcefromcountry',
task_display_name="Get datasource from Country",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="getdatasourcefromcountry-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
masterduplicateaction = SparkKubernetesOperator(
task_id='masterduplicateaction',
task_display_name="MasterDuplicateAction",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="masterduplicateaction-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--hdfsNameNode", "s3a://graph/",
"--hdfsPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
"--postgresUrl", "jdbc:postgresql://{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).host }}:{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).port }}/dnet_openaireplus",
"--postgresUser", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).login }}",
"--postgresPassword", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).password }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
clean_tasks = []
for entity in dag_utils.GRAPH_ENTITIES:
clean_tasks.append(SparkKubernetesOperator(
task_id='cleansparkjob_' + entity,
task_display_name="Clean " + entity,
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="cleansparkjob-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=[
"--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/" + entity,
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/" + entity,
"--graphTableClassName", dag_utils.GRAPH_ENTITIES_CLASS_NAMES[entity],
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--contextId", "{{ dag_run.conf.get('CONTEXT_ID') }}",
"--verifyParam", "{{ dag_run.conf.get('VERIFY_PARAM') }}",
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
"--verifyCountryParam", "{{ dag_run.conf.get('VERIFY_COUNTRY_PARAM') }}",
"--hostedBy", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby",
"--collectedfrom", "{{ dag_run.conf.get('COLLECTED_FROM') }}",
"--masterDuplicatePath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
"--deepClean", "{{ dag_run.conf.get('SHOULD_CLEAN') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
))
chain(getdatasourcefromcountry,
#masterduplicateaction,
clean_tasks)
clean_graph_dag()