From a4e82a65aacdc7aed07dc5eb9287a98ed8ac932d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 19 Mar 2021 11:34:44 +0100 Subject: [PATCH] integrated filter applied when merging BETA & PROD graphs to rule our records from Datacite --- .../graph/merge/MergeGraphTableSparkJob.java | 112 ++++++++++-------- 1 file changed, 60 insertions(+), 52 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index e53f4ca30..5f6550e10 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; +import java.util.stream.Collectors; import javax.xml.crypto.Data; @@ -52,22 +53,22 @@ public class MergeGraphTableSparkJob { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - CleanGraphSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); + .toString( + CleanGraphSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); String priority = Optional - .ofNullable(parser.get("priority")) - .orElse(PRIORITY_DEFAULT); + .ofNullable(parser.get("priority")) + .orElse(PRIORITY_DEFAULT); log.info("priority: {}", priority); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); String betaInputPath = parser.get("betaInputPath"); @@ -89,48 +90,55 @@ public class MergeGraphTableSparkJob { conf.registerKryoClasses(ModelSupport.getOafModelClasses()); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); - }); + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); + }); } private static

void mergeGraphTable( - SparkSession spark, - String priority, - String betaInputPath, - String prodInputPath, - Class

p_clazz, - Class b_clazz, - String outputPath) { + SparkSession spark, + String priority, + String betaInputPath, + String prodInputPath, + Class

p_clazz, + Class b_clazz, + String outputPath) { Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz); Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz); prod - .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") - .map((MapFunction, Tuple2>, P>) value -> { - Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); - Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); + .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") + .map((MapFunction, Tuple2>, P>) value -> { + Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); + Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) { - return mergeDatasource(p, b); - } - switch (priority) { - default: - case "BETA": - return mergeWithPriorityToBETA(p, b); - case "PROD": - return mergeWithPriorityToPROD(p, b); - } - }, Encoders.bean(p_clazz)) - .filter((FilterFunction

) Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) { + return mergeDatasource(p, b); + } + switch (priority) { + default: + case "BETA": + return mergeWithPriorityToBETA(p, b); + case "PROD": + return mergeWithPriorityToPROD(p, b); + } + }, Encoders.bean(p_clazz)) + .filter((FilterFunction

) Objects::nonNull) + .filter((FilterFunction

) o -> { + HashSet collectedFromNames = Optional + .ofNullable(o.getCollectedfrom()) + .map(c -> c.stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new))) + .orElse(new HashSet()); + return !collectedFromNames.contains("Datacite"); + }) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } /** @@ -184,19 +192,19 @@ public class MergeGraphTableSparkJob { } private static Dataset> readTableFromPath( - SparkSession spark, String inputEntityPath, Class clazz) { + SparkSession spark, String inputEntityPath, Class clazz) { log.info("Reading Graph table from: {}", inputEntityPath); return spark - .read() - .textFile(inputEntityPath) - .map( - (MapFunction>) value -> { - final T t = OBJECT_MAPPER.readValue(value, clazz); - final String id = ModelSupport.idFn().apply(t); - return new Tuple2<>(id, t); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + .read() + .textFile(inputEntityPath) + .map( + (MapFunction>) value -> { + final T t = OBJECT_MAPPER.readValue(value, clazz); + final String id = ModelSupport.idFn().apply(t); + return new Tuple2<>(id, t); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); } private static void removeOutputDir(SparkSession spark, String path) {