diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index 1279ede533..d20a3036e8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -11,17 +11,12 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; -import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import eu.dnetlib.dhp.schema.oaf.*; /** @@ -33,7 +28,7 @@ public class DumpProducts implements Serializable { public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath, Class inputClazz, Class outputClazz, - boolean graph) { + String dumpType) { SparkConf conf = new SparkConf(); @@ -42,7 +37,7 @@ public class DumpProducts implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph); + execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType); }); } @@ -53,13 +48,13 @@ public class DumpProducts implements Serializable { String communityMapPath, Class inputClazz, Class outputClazz, - boolean graph) { + String dumpType) { CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); Utils .readPath(spark, inputPath, inputClazz) - .map((MapFunction) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) + .map((MapFunction) value -> execMap(value, communityMap, dumpType), Encoders.bean(outputClazz)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) @@ -70,18 +65,18 @@ public class DumpProducts implements Serializable { private static O execMap(I value, CommunityMap communityMap, - boolean graph) { + String dumpType) { Optional odInfo = Optional.ofNullable(value.getDataInfo()); if (odInfo.isPresent()) { - if (odInfo.get().getDeletedbyinference()) { + if (odInfo.get().getDeletedbyinference() || odInfo.get().getInvisible()) { return null; } } else { return null; } - if (!graph) { + if (Constants.DUMPTYPE.COMMUNITY.getType().equals(dumpType)) { Set communities = communityMap.keySet(); Optional> inputContext = Optional @@ -102,7 +97,8 @@ public class DumpProducts implements Serializable { return null; } } - return (O) ResultMapper.map(value, communityMap, graph); + + return (O) ResultMapper.map(value, communityMap, dumpType); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 4c3bc0dd52..cb052ebaa8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -21,10 +21,10 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class ResultMapper implements Serializable { public static Result map( - E in, Map communityMap, boolean graph) { + E in, Map communityMap, String dumpType) { Result out; - if (graph) { + if (Constants.DUMPTYPE.COMPLETE.getType().equals(dumpType)) { out = new GraphResult(); } else { out = new CommunityResult(); @@ -217,7 +217,7 @@ public class ResultMapper implements Serializable { .ofNullable(input.getInstance()); if (oInst.isPresent()) { - if (graph) { + if (Constants.DUMPTYPE.COMPLETE.getType().equals(dumpType)) { ((GraphResult) out) .setInstance(oInst.get().stream().map(i -> getGraphInstance(i)).collect(Collectors.toList())); } else { @@ -296,7 +296,7 @@ public class ResultMapper implements Serializable { out.setType(input.getResulttype().getClassid()); } - if (!graph) { + if (!Constants.DUMPTYPE.COMPLETE.getType().equals(dumpType)) { ((CommunityResult) out) .setCollectedfrom( input diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java index c4b89936ff..63970d14bc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java @@ -4,12 +4,15 @@ package eu.dnetlib.dhp.oa.graph.dump.community; import java.io.Serializable; import java.util.*; +import javax.swing.text.html.Option; + import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.oaf.Result; @@ -48,6 +51,11 @@ public class SparkDumpCommunityProducts implements Serializable { String communityMapPath = parser.get("communityMapPath"); + final String dumpType = Optional + .ofNullable(parser.get("dumpType")) + .map(String::valueOf) + .orElse("community"); + Class inputClazz = (Class) Class.forName(resultClassName); DumpProducts dump = new DumpProducts(); @@ -55,7 +63,7 @@ public class SparkDumpCommunityProducts implements Serializable { dump .run( isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, CommunityResult.class, - false); + dumpType); }