From 9ba598a9b585e91220e08923ecabd6cea5f43ab4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 23 Mar 2022 17:10:19 +0100 Subject: [PATCH] [Dump Funders] - --- .../funderresults/SparkDumpFunderResults.java | 26 ++++++++++++++----- .../dump/funderresult/SplitPerFunderTest.java | 14 +++++----- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 950c74a8f..d7e973ca0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -84,6 +84,15 @@ public class SparkDumpFunderResults implements Serializable { , Encoders.STRING()) .distinct(); + Dataset pubs; + Dataset result ; + pubs = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class); + Dataset dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class); + Dataset orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class); + Dataset sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class); + result = pubs.union(dats).union(orp).union(sw); + funderList.foreach((ForeachFunction) funder -> getFunderResult(funder, inputPath, spark) .write() @@ -99,12 +108,15 @@ public class SparkDumpFunderResults implements Serializable { @Nullable private static Dataset getFunderResult(String funderName, String inputPath, SparkSession spark) { - Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - return result.map((MapFunction) cr -> { + Dataset pubs; + Dataset result ; + pubs = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class); + Dataset dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class); + Dataset orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class); + Dataset sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class); + result = pubs.union(dats).union(orp).union(sw); + Dataset tmp = result.map((MapFunction) cr -> { if (!Optional.ofNullable(cr.getProjects()).isPresent()) { return null; } @@ -116,6 +128,8 @@ public class SparkDumpFunderResults implements Serializable { return null; }, Encoders.bean(CommunityResult.class)) .filter(Objects::nonNull); + System.out.println(tmp.count()); + return tmp; } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java index 477481c08..10dff6aae 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java @@ -81,15 +81,15 @@ public class SplitPerFunderTest { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // FP7 3 + // FP7 3 and H2020 3 JavaRDD tmp = sc - .textFile(workingDir.toString() + "/split/EC_FP7") + .textFile(workingDir.toString() + "/split/EC") .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); - Assertions.assertEquals(3, verificationDataset.count()); + Assertions.assertEquals(6, verificationDataset.count()); Assertions .assertEquals( @@ -132,10 +132,10 @@ public class SplitPerFunderTest { Assertions.assertEquals(1, tmp.count()); // H2020 3 - tmp = sc - .textFile(workingDir.toString() + "/split/EC_H2020") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); - Assertions.assertEquals(3, tmp.count()); +// tmp = sc +// .textFile(workingDir.toString() + "/split/EC_H2020") +// .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); +// Assertions.assertEquals(3, tmp.count()); // MZOS 1 tmp = sc