diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java index 452f6dc54..41c47c5ed 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java @@ -8,8 +8,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.oa.graph.dump.Utils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -21,7 +19,9 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -51,78 +51,29 @@ public class SparkSplitForCommunity implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); -// final String resultClassName = parser.get("resultTableName"); -// log.info("resultTableName: {}", resultClassName); - final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); - final Optional cm = Optional.ofNullable(parser.get("communityMap")); - - SparkConf conf = new SparkConf(); + CommunitySplit split = new CommunitySplit(); CommunityMap communityMap; - if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) { - QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); - queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); - communityMap = queryInformationSystem.getCommunityMap(); - } else { - communityMap = new Gson().fromJson(cm.get(), CommunityMap.class); - } + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); + communityMap = queryInformationSystem.getCommunityMap(); + + split.run(isSparkSessionManaged, inputPath, outputPath, communityMap); + + + - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz); - }); } public static ISLookUpService getIsLookUpService(String isLookUpUrl) { return ISLookupClientFactory.getLookUpService(isLookUpUrl); } - private static void execSplit(SparkSession spark, String inputPath, String outputPath, - Set communities) {// }, Class inputClazz) { - Dataset result = Utils - .readPath(spark, inputPath + "/publication", Result.class) - .union(Utils.readPath(spark, inputPath + "/dataset", Result.class)) - .union(Utils.readPath(spark, inputPath + "/orp", Result.class)) - .union(Utils.readPath(spark, inputPath + "/software", Result.class)); - communities - .stream() - .forEach(c -> printResult(c, result, outputPath)); - } - - private static void printResult(String c, Dataset result, String outputPath) { - Dataset community_products = result - .filter(r -> containsCommunity(r, c)); - - if (community_products.count() > 0) { - community_products - .repartition(1) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath + "/" + c); - } - - } - - private static boolean containsCommunity(Result r, String c) { - if (Optional.ofNullable(r.getContext()).isPresent()) { - return r - .getContext() - .stream() - .filter(con -> con.getCode().equals(c)) - .collect(Collectors.toList()) - .size() > 0; - } - return false; - } }