From 968c59d97a2728ae206043b8ee656624d41e430d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 24 Jul 2020 17:25:19 +0200 Subject: [PATCH] added teh logic to dump also the products for the whole graph. They will miss collected from and context information that will be materialized as new relations --- .../dhp/oa/graph/dump/DumpProducts.java | 138 ++++++++++-------- 1 file changed, 76 insertions(+), 62 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index b8781e257..5febfb33e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -1,17 +1,7 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.List; @@ -20,68 +10,92 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; +import eu.dnetlib.dhp.schema.oaf.*; public class DumpProducts implements Serializable { - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, Class inputClazz, boolean graph) { + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, + Class inputClazz, + Class outputClazz, + boolean graph) { - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath, communityMap, inputClazz, graph);// , dumpClazz); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , dumpClazz); - }); - } + }); + } - public static void execDump(SparkSession spark, - String inputPath, - String outputPath, - CommunityMap communityMap, - Class inputClazz, - boolean graph) { + public static void execDump( + SparkSession spark, + String inputPath, + String outputPath, + CommunityMap communityMap, + Class inputClazz, + Class outputClazz, + boolean graph) throws ClassNotFoundException { - Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); + Utils + .readPath(spark, inputPath, inputClazz) + .map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - tmp - .map(value -> execMap(value, communityMap, graph), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + } - } + private static O execMap(I value, + CommunityMap communityMap, + boolean graph) { - private static eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value, - CommunityMap communityMap, - boolean graph) { + Optional odInfo = Optional.ofNullable(value.getDataInfo()); + if (odInfo.isPresent()) { + if (odInfo.get().getDeletedbyinference()) { + return null; + } + } else { + return null; + } - if (!graph) { - Set communities = communityMap.keySet(); + if (!graph) { + Set communities = communityMap.keySet(); - Optional> inputContext = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Result)value).getContext()); - if (!inputContext.isPresent()) { - return null; - } - List toDumpFor = inputContext.get().stream().map(c -> { - if (communities.contains(c.getId())) { - return c.getId(); - } - if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { - return c.getId().substring(0, 3); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - if (toDumpFor.size() == 0) { - return null; - } - } - return ResultMapper.map(value, communityMap); + Optional> inputContext = Optional + .ofNullable(((eu.dnetlib.dhp.schema.oaf.Result) value).getContext()); + if (!inputContext.isPresent()) { + return null; + } + List toDumpFor = inputContext.get().stream().map(c -> { + if (communities.contains(c.getId())) { + return c.getId(); + } + if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { + return c.getId().substring(0, 3); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (toDumpFor.size() == 0) { + return null; + } + } + return (O) ResultMapper.map(value, communityMap, graph); - } + } }