From 3ec239290433e2e70642df10ab0bd0e7835ff10b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 29 Jul 2020 16:46:50 +0200 Subject: [PATCH] Added new class to move the place the split is effectively run --- .../graph/dump/community/CommunitySplit.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java new file mode 100644 index 000000000..b05c6646c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.oa.graph.dump.community; + +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class CommunitySplit implements Serializable { + + + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap) { + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz); + }); + } + + private static void execSplit(SparkSession spark, String inputPath, String outputPath, + Set communities) {// }, Class inputClazz) { + + Dataset result = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + + communities + .stream() + .forEach(c -> printResult(c, result, outputPath)); + + } + + private static void printResult(String c, Dataset result, String outputPath) { + Dataset community_products = result + .filter(r -> containsCommunity(r, c)); + + try{ + community_products.first(); + community_products + .repartition(1) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/" + c); + }catch(Exception e){ + + } + + } + + private static boolean containsCommunity(CommunityResult r, String c) { + if (Optional.ofNullable(r.getContext()).isPresent()) { + return r + .getContext() + .stream() + .filter(con -> con.getCode().equals(c)) + .collect(Collectors.toList()) + .size() > 0; + } + return false; + } +}