From 906db690d2c4c1cfd69a05c5d5f5dc75db2229e9 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 18 Nov 2020 17:43:08 +0100 Subject: [PATCH] - --- .../SparkPrepareResultProject.java | 47 +------------------ 1 file changed, 1 insertion(+), 46 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java index 9b254088a7..21b4f4dc77 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java @@ -65,7 +65,7 @@ public class SparkPrepareResultProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - prepareResultProjectList2(spark, inputPath, outputPath, communityMapPath); + prepareResultProjectList(spark, inputPath, outputPath, communityMapPath); }); } @@ -115,51 +115,6 @@ public class SparkPrepareResultProject implements Serializable { .option("compression", "gzip") .json(outputPath + "/" + funder); }); - - } - - private static void prepareResultProjectList2(SparkSession spark, String inputPath, String outputPath, - String communityMapPath) { - - CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); - - Dataset relation = Utils - .readPath(spark, inputPath + "/relation", Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - - Dataset result = Utils - .readPath(spark, inputPath + "/publication", eu.dnetlib.dhp.schema.oaf.Result.class) - .union(Utils.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Result.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", eu.dnetlib.dhp.schema.oaf.Result.class)) - .union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class)); - - result - .joinWith(relation, result.col("id").equalTo(relation.col("target"))) - .groupByKey( - (MapFunction, String>) value -> value - ._2() - .getSource() - .substring(3, 15), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction, String>) (s, it) -> { - Tuple2 first = it.next(); - List resultList = new ArrayList<>(); - resultList.add(ResultMapper.map(first._1(), communityMap, true)); - it.forEachRemaining(c -> { - resultList.add(ResultMapper.map(c._1(), communityMap, true)); - - }); - spark - .createDataFrame(resultList, Result.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + s); - - return new String(); - }, Encoders.STRING()); - } }