diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
index 67068e072..b723de955 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
@@ -37,6 +37,8 @@ public class MergeGraphSparkJob {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
+
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@@ -47,6 +49,11 @@ public class MergeGraphSparkJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
+ String priority = Optional
+ .ofNullable(parser.get("priority"))
+ .orElse(PRIORITY_DEFAULT);
+ log.info("priority: {}", priority);
+
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
@@ -76,12 +83,13 @@ public class MergeGraphSparkJob {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
- mergeGraphTable(spark, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
+ mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
});
}
private static
void mergeGraphTable(
SparkSession spark,
+ String priority,
String betaInputPath,
String prodInputPath,
Class
p_clazz,
@@ -96,13 +104,13 @@ public class MergeGraphSparkJob {
.map((MapFunction, Tuple2>, P>) value -> {
Optional p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
- if (b.isPresent() & !p.isPresent()) {
- return (P)b.get();
+ switch (priority) {
+ default:
+ case "BETA":
+ return mergeWithPriorityToBETA(p, b);
+ case "PROD":
+ return mergeWithPriorityToPROD(p, b);
}
- if (p.isPresent()) {
- return p.get();
- }
- return null;
}, Encoders.bean(p_clazz))
.filter((FilterFunction
) Objects::nonNull)
.write()
@@ -111,6 +119,26 @@ public class MergeGraphSparkJob {
.json(outputPath);
}
+ private static
P mergeWithPriorityToPROD(Optional
p, Optional b) {
+ if (b.isPresent() & !p.isPresent()) {
+ return (P) b.get();
+ }
+ if (p.isPresent()) {
+ return p.get();
+ }
+ return null;
+ }
+
+ private static
P mergeWithPriorityToBETA(Optional
p, Optional b) {
+ if (p.isPresent() & !b.isPresent()) {
+ return p.get();
+ }
+ if (b.isPresent()) {
+ return (P) b.get();
+ }
+ return null;
+ }
+
private static Dataset> readTableFromPath(
SparkSession spark, String inputEntityPath, Class clazz) {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
index 0a512fb6a..07a125fb6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
@@ -13,6 +13,10 @@
graphOutputPath
the output merged graph root path
+
+ priority
+ decides from which infrastructure the content must win in case of ID clash
+
sparkDriverMemory
@@ -88,6 +92,7 @@
--prodInputPath${prodInputGgraphPath}/publication
--outputPath${graphOutputPath}/publication
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
+ --priority${priority}
@@ -114,6 +119,7 @@
--prodInputPath${prodInputGgraphPath}/dataset
--outputPath${graphOutputPath}/dataset
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --priority${priority}
@@ -140,6 +146,7 @@
--prodInputPath${prodInputGgraphPath}/otherresearchproduct
--outputPath${graphOutputPath}/otherresearchproduct
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --priority${priority}
@@ -166,6 +173,7 @@
--prodInputPath${prodInputGgraphPath}/software
--outputPath${graphOutputPath}/software
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
+ --priority${priority}
@@ -192,6 +200,7 @@
--prodInputPath${prodInputGgraphPath}/datasource
--outputPath${graphOutputPath}/datasource
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
+ --priority${priority}
@@ -218,6 +227,7 @@
--prodInputPath${prodInputGgraphPath}/organization
--outputPath${graphOutputPath}/organization
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
+ --priority${priority}
@@ -244,6 +254,7 @@
--prodInputPath${prodInputGgraphPath}/project
--outputPath${graphOutputPath}/project
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
+ --priority${priority}
@@ -266,11 +277,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
-
--betaInputPath${betaInputGgraphPath}/relation
--prodInputPath${prodInputGgraphPath}/relation
--outputPath${graphOutputPath}/relation
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
+ --priority${priority}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
index 6018b7e93..1a612807b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
@@ -28,5 +28,11 @@
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
+ },
+ {
+ "paramName": "pr",
+ "paramLongName": "priority",
+ "paramDescription": "decides from which infrastructure the content must win in case of ID clash",
+ "paramRequired": false
}
]
\ No newline at end of file