Add resolvereletion step
This commit is contained in:
parent
004be2e97f
commit
c0788fcd10
|
@ -6,8 +6,8 @@ from airflow.models.param import Param
|
||||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||||
|
|
||||||
from spark_configurator import SparkConfigurator
|
|
||||||
import dag_utils
|
import dag_utils
|
||||||
|
from spark_configurator import SparkConfigurator
|
||||||
|
|
||||||
|
|
||||||
@dag(
|
@dag(
|
||||||
|
@ -99,7 +99,8 @@ def build_new_graph():
|
||||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
||||||
"--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
"--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
||||||
"--relationPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"]
|
"--relationPath",
|
||||||
|
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"] + "/relation"
|
||||||
]).get_configuration(),
|
]).get_configuration(),
|
||||||
kubernetes_conn_id="kubernetes_default"
|
kubernetes_conn_id="kubernetes_default"
|
||||||
),
|
),
|
||||||
|
@ -112,7 +113,8 @@ def build_new_graph():
|
||||||
name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}",
|
name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}",
|
||||||
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
|
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
|
||||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||||
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
arguments=["--graphInputPath",
|
||||||
|
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
||||||
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
||||||
"--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]),
|
"--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]),
|
||||||
"--format", "text"
|
"--format", "text"
|
||||||
|
|
Loading…
Reference in New Issue