From e0c4cf6f7bff8894eb12e1d72e99f57d8aafc17a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 20 Jul 2020 10:48:01 +0200 Subject: [PATCH] added parameter to drive the graph merge strategy: priority (BETA|PROD) --- .../oa/graph/merge/MergeGraphSparkJob.java | 42 +++++++++++++++---- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 13 +++++- .../dhp/oa/graph/merge_graphs_parameters.json | 6 +++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index 67068e072..b723de955 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -37,6 +37,8 @@ public class MergeGraphSparkJob { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -47,6 +49,11 @@ public class MergeGraphSparkJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); + String priority = Optional + .ofNullable(parser.get("priority")) + .orElse(PRIORITY_DEFAULT); + log.info("priority: {}", priority); + Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) @@ -76,12 +83,13 @@ public class MergeGraphSparkJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - mergeGraphTable(spark, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); + mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); }); } private static

void mergeGraphTable( SparkSession spark, + String priority, String betaInputPath, String prodInputPath, Class

p_clazz, @@ -96,13 +104,13 @@ public class MergeGraphSparkJob { .map((MapFunction, Tuple2>, P>) value -> { Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (b.isPresent() & !p.isPresent()) { - return (P)b.get(); + switch (priority) { + default: + case "BETA": + return mergeWithPriorityToBETA(p, b); + case "PROD": + return mergeWithPriorityToPROD(p, b); } - if (p.isPresent()) { - return p.get(); - } - return null; }, Encoders.bean(p_clazz)) .filter((FilterFunction

) Objects::nonNull) .write() @@ -111,6 +119,26 @@ public class MergeGraphSparkJob { .json(outputPath); } + private static

P mergeWithPriorityToPROD(Optional

p, Optional b) { + if (b.isPresent() & !p.isPresent()) { + return (P) b.get(); + } + if (p.isPresent()) { + return p.get(); + } + return null; + } + + private static

P mergeWithPriorityToBETA(Optional

p, Optional b) { + if (p.isPresent() & !b.isPresent()) { + return p.get(); + } + if (b.isPresent()) { + return (P) b.get(); + } + return null; + } + private static Dataset> readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 0a512fb6a..07a125fb6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -13,6 +13,10 @@ graphOutputPath the output merged graph root path + + priority + decides from which infrastructure the content must win in case of ID clash + sparkDriverMemory @@ -88,6 +92,7 @@ --prodInputPath${prodInputGgraphPath}/publication --outputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --priority${priority} @@ -114,6 +119,7 @@ --prodInputPath${prodInputGgraphPath}/dataset --outputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --priority${priority} @@ -140,6 +146,7 @@ --prodInputPath${prodInputGgraphPath}/otherresearchproduct --outputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --priority${priority} @@ -166,6 +173,7 @@ --prodInputPath${prodInputGgraphPath}/software --outputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --priority${priority} @@ -192,6 +200,7 @@ --prodInputPath${prodInputGgraphPath}/datasource --outputPath${graphOutputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --priority${priority} @@ -218,6 +227,7 @@ --prodInputPath${prodInputGgraphPath}/organization --outputPath${graphOutputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --priority${priority} @@ -244,6 +254,7 @@ --prodInputPath${prodInputGgraphPath}/project --outputPath${graphOutputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --priority${priority} @@ -266,11 +277,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/relation --prodInputPath${prodInputGgraphPath}/relation --outputPath${graphOutputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation + --priority${priority} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json index 6018b7e93..1a612807b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json @@ -28,5 +28,11 @@ "paramLongName": "graphTableClassName", "paramDescription": "class name moelling the graph table", "paramRequired": true + }, + { + "paramName": "pr", + "paramLongName": "priority", + "paramDescription": "decides from which infrastructure the content must win in case of ID clash", + "paramRequired": false } ] \ No newline at end of file