From ae1b7fbfdb44b53382ed3a18ecb8358feba1f38c Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 7 Aug 2020 17:32:27 +0200 Subject: [PATCH] changed method signature from set of mapkey entries to String representing path on file system where to find the map --- .../oa/graph/dump/community/CommunitySplit.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java index f0fb16394..de7356aa3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java @@ -1,10 +1,17 @@ - +/** + * This class splits the dumped results according to the research community - research initiative/infrastructure + * they are related to. The information about the community is found in the element "context.id" in the result. + * Since the context that can be found in the result can be associated not only to communities, a community Map + * is provided. It will guide the splitting process. + * Note: the repartition(1) just before writing the results related to a community. This is a choice due + * to uploading constraints (just one file for each community) As soon as a better solution will be in place + * remove the repartition + */ package eu.dnetlib.dhp.oa.graph.dump.community; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -19,19 +26,19 @@ import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; public class CommunitySplit implements Serializable { - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap) { + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath) { SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz); + execSplit(spark, inputPath, outputPath, Utils.getCommunityMap(spark, communityMapPath).keySet()); }); } private static void execSplit(SparkSession spark, String inputPath, String outputPath, - Set communities) {// }, Class inputClazz) { + Set communities) { Dataset result = Utils .readPath(spark, inputPath + "/publication", CommunityResult.class)