diff --git a/workflow/dnet/consistency.py b/workflow/dnet/consistency.py new file mode 100644 index 0000000..190a754 --- /dev/null +++ b/workflow/dnet/consistency.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor. +In this example, we create two tasks which execute sequentially. +The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application). +and the second task is to check the final state of the sparkApplication that submitted in the first state. + +Spark-on-k8s operator is required to be already installed on Kubernetes +https://github.com/GoogleCloudPlatform/spark-on-k8s-operator +""" + +# [START import_module] +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG +# Operators; we need this to operate! +from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator +from airflow.utils.dates import days_ago + +from spark_configurator import SparkConfigurator + +# [END import_module] + +# [START default_args] +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': days_ago(1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'max_active_runs': 1, + 'retries': 3 +} + +dag = DAG( + 'consistency_graph', + default_args=default_args, + schedule_interval=None, + tags=['example', 'spark'] +) + +propagaterel = SparkKubernetesOperator( + task_id='PropagateRelation', + namespace='dnet-spark-jobs', + template_spec=SparkConfigurator( + name="propagaterels-{{ ds }}-{{ task_instance.try_number }}", + mainClass="eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation", + jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', + arguments=["--graphBasePath", "s3a://graph/tmp/prod_provision/graph/06_graph_dedup", + "--graphOutputPath", "s3a://graph/tmp/prod_provision/graph/07_graph_consistent", + "--workingPath", "s3a://graph/tmp/prod_provision/working_dir/dedup" + ], + executor_cores=8, + executor_memory="4G", + executor_instances=1, + executor_memoryOverhead="3G").get_configuration(), + kubernetes_conn_id="kubernetes_default", + dag=dag +) + +group_entities = SparkKubernetesOperator( + task_id='GroupEntities', + namespace='dnet-spark-jobs', + template_spec=SparkConfigurator( + name="groupentities-{{ ds }}-{{ task_instance.try_number }}", + mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob", + jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', + arguments=["--graphInputPath", "s3a://graph/tmp/prod_provision/graph/06_graph_dedup", + "--checkpointPath", "s3a://graph/tmp/prod_provision/working_dir/dedup/grouped_entities", + "--outputPath", "s3a://graph/tmp/prod_provision/graph/07_graph_consistent", + "--isLookupUrl", "http://services.openaire.eu:8280/is/services/isLookUp?wsdl", + "--filterInvisible", "true" + ], +# + executor_cores=8, + executor_memory="4G", + executor_instances=1, + executor_memoryOverhead="3G").get_configuration(), + kubernetes_conn_id="kubernetes_default", + dag=dag +) + +propagaterel >> group_entities \ No newline at end of file