diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index e53f4ca30d..5f6550e10e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; +import java.util.stream.Collectors; import javax.xml.crypto.Data; @@ -52,22 +53,22 @@ public class MergeGraphTableSparkJob { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - CleanGraphSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); + .toString( + CleanGraphSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); String priority = Optional - .ofNullable(parser.get("priority")) - .orElse(PRIORITY_DEFAULT); + .ofNullable(parser.get("priority")) + .orElse(PRIORITY_DEFAULT); log.info("priority: {}", priority); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); String betaInputPath = parser.get("betaInputPath"); @@ -89,48 +90,55 @@ public class MergeGraphTableSparkJob { conf.registerKryoClasses(ModelSupport.getOafModelClasses()); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); - }); + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); + }); } private static
void mergeGraphTable( - SparkSession spark, - String priority, - String betaInputPath, - String prodInputPath, - Class
p_clazz, - Class b_clazz, - String outputPath) { + SparkSession spark, + String priority, + String betaInputPath, + String prodInputPath, + Class
p_clazz,
+ Class b_clazz,
+ String outputPath) {
Dataset p = Optional.ofNullable(value._1()).map(Tuple2::_2);
- Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
+ .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
+ .map((MapFunction p = Optional.ofNullable(value._1()).map(Tuple2::_2);
+ Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
- if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
- return mergeDatasource(p, b);
- }
- switch (priority) {
- default:
- case "BETA":
- return mergeWithPriorityToBETA(p, b);
- case "PROD":
- return mergeWithPriorityToPROD(p, b);
- }
- }, Encoders.bean(p_clazz))
- .filter((FilterFunction ) Objects::nonNull)
- .write()
- .mode(SaveMode.Overwrite)
- .option("compression", "gzip")
- .json(outputPath);
+ if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
+ return mergeDatasource(p, b);
+ }
+ switch (priority) {
+ default:
+ case "BETA":
+ return mergeWithPriorityToBETA(p, b);
+ case "PROD":
+ return mergeWithPriorityToPROD(p, b);
+ }
+ }, Encoders.bean(p_clazz))
+ .filter((FilterFunction ) Objects::nonNull)
+ .filter((FilterFunction ) o -> {
+ HashSet