Clean DAG
This commit is contained in:
parent
b3d7dda0c1
commit
6c25db9ac2
|
@ -0,0 +1,107 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
import dag_utils
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="clean_graph",
|
||||
dag_display_name="Cleaning of Graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"POSTGRES_CONN_ID": Param("postgres_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/02_graph_grouped", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/03_graph_cleaned", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/clean", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description=""),
|
||||
"COUNTRY": Param("NL", type='string', description=""),
|
||||
"SHOULD_CLEAN": Param("false", type='string', description=""),
|
||||
"CONTEXT_ID": Param("sobigdata", type='string', description=""),
|
||||
"VERIFY_PARAM": Param("gcube", type='string', description=""),
|
||||
"VERIFY_COUNTRY_PARAM": Param("10.17632;10.5061", type='string', description=""),
|
||||
"COLLECTED_FROM": Param("NARCIS", type='string', description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def clean_graph_dag():
|
||||
getdatasourcefromcountry = SparkKubernetesOperator(
|
||||
task_id='getdatasourcefromcountry',
|
||||
task_display_name="Propagate Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="getdatasourcefromcountry-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
|
||||
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
masterduplicateaction = SparkKubernetesOperator(
|
||||
task_id='masterduplicateaction',
|
||||
task_display_name="MasterDuplicateAction",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="masterduplicateaction-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--hdfsNameNode", "s3a://graph/",
|
||||
"--hdfsPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
|
||||
"--postgresUrl", "jdbc:postgresql://{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).host }}:{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).port }}/dnet_openaireplus",
|
||||
"--postgresUser", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).login }}",
|
||||
"--postgresPassword", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).password }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
clean_tasks = []
|
||||
for entity in dag_utils.GRAPH_ENTITIES:
|
||||
clean_tasks.append(SparkKubernetesOperator(
|
||||
task_id='masterduplicateaction',
|
||||
task_display_name="MasterDuplicateAction",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="cleansparkjob-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=[
|
||||
"--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/" + entity,
|
||||
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/" + entity,
|
||||
"--graphTableClassName", dag_utils.GRAPH_ENTITIES_CLASS_NAMES[entity],
|
||||
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--contextId", "{{ dag_run.conf.get('CONTEXT_ID') }}",
|
||||
"--verifyParam", "{{ dag_run.conf.get('VERIFY_PARAM') }}",
|
||||
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
|
||||
"--verifyCountryParam", "{{ dag_run.conf.get('VERIFY_COUNTRY_PARAM') }}",
|
||||
"--hostedBy", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby",
|
||||
"--collectedfrom", "{{ dag_run.conf.get('COLLECTED_FROM') }}",
|
||||
"--masterDuplicatePath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
|
||||
"--deepClean", "{{ dag_run.conf.get('SHOULD_CLEAN') }}"
|
||||
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
))
|
||||
|
||||
chain([getdatasourcefromcountry, masterduplicateaction], clean_tasks)
|
||||
|
||||
|
||||
clean_graph_dag()
|
|
@ -26,3 +26,18 @@ def get_default_bucket():
|
|||
return hook.service_config['bucket_name']
|
||||
except KeyError:
|
||||
return ''
|
||||
|
||||
|
||||
GRAPH_ENTITIES = ["publication", "dataset", "otherresearchproduct", "software", "datasource", "organization", "project", "relation"]
|
||||
|
||||
|
||||
GRAPH_ENTITIES_CLASS_NAMES = {
|
||||
"publication": "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"dataset": "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"otherresearchproduct": "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"software": "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"datasource": "eu.dnetlib.dhp.schema.oaf.Datasource",
|
||||
"organization": "eu.dnetlib.dhp.schema.oaf.Organization",
|
||||
"project": "eu.dnetlib.dhp.schema.oaf.Project",
|
||||
"relation": "eu.dnetlib.dhp.schema.oaf.Relation"
|
||||
}
|
Loading…
Reference in New Issue