[Dump Funders] -

This commit is contained in:
Miriam Baglioni 2022-03-23 17:10:19 +01:00
parent 13d1d73b2e
commit 9ba598a9b5
2 changed files with 27 additions and 13 deletions

View File

@ -84,6 +84,15 @@ public class SparkDumpFunderResults implements Serializable {
, Encoders.STRING()) , Encoders.STRING())
.distinct(); .distinct();
Dataset<CommunityResult> pubs;
Dataset<CommunityResult> result ;
pubs = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class);
Dataset<CommunityResult> dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class);
Dataset<CommunityResult> orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class);
Dataset<CommunityResult> sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class);
result = pubs.union(dats).union(orp).union(sw);
funderList.foreach((ForeachFunction<String>) funder -> funderList.foreach((ForeachFunction<String>) funder ->
getFunderResult(funder, inputPath, spark) getFunderResult(funder, inputPath, spark)
.write() .write()
@ -99,12 +108,15 @@ public class SparkDumpFunderResults implements Serializable {
@Nullable @Nullable
private static Dataset<CommunityResult> getFunderResult(String funderName, String inputPath, SparkSession spark) { private static Dataset<CommunityResult> getFunderResult(String funderName, String inputPath, SparkSession spark) {
Dataset<CommunityResult> result = Utils Dataset<CommunityResult> pubs;
.readPath(spark, inputPath + "/publication", CommunityResult.class) Dataset<CommunityResult> result ;
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) pubs = Utils
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) .readPath(spark, inputPath + "/publication", CommunityResult.class);
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); Dataset<CommunityResult> dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class);
return result.map((MapFunction<CommunityResult, CommunityResult>) cr -> { Dataset<CommunityResult> orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class);
Dataset<CommunityResult> sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class);
result = pubs.union(dats).union(orp).union(sw);
Dataset<CommunityResult> tmp = result.map((MapFunction<CommunityResult, CommunityResult>) cr -> {
if (!Optional.ofNullable(cr.getProjects()).isPresent()) { if (!Optional.ofNullable(cr.getProjects()).isPresent()) {
return null; return null;
} }
@ -116,6 +128,8 @@ public class SparkDumpFunderResults implements Serializable {
return null; return null;
}, Encoders.bean(CommunityResult.class)) }, Encoders.bean(CommunityResult.class))
.filter(Objects::nonNull); .filter(Objects::nonNull);
System.out.println(tmp.count());
return tmp;
} }

View File

@ -81,15 +81,15 @@ public class SplitPerFunderTest {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// FP7 3 // FP7 3 and H2020 3
JavaRDD<CommunityResult> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/split/EC_FP7") .textFile(workingDir.toString() + "/split/EC")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class));
Assertions.assertEquals(3, verificationDataset.count()); Assertions.assertEquals(6, verificationDataset.count());
Assertions Assertions
.assertEquals( .assertEquals(
@ -132,10 +132,10 @@ public class SplitPerFunderTest {
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// H2020 3 // H2020 3
tmp = sc // tmp = sc
.textFile(workingDir.toString() + "/split/EC_H2020") // .textFile(workingDir.toString() + "/split/EC_H2020")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); // .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count()); // Assertions.assertEquals(3, tmp.count());
// MZOS 1 // MZOS 1
tmp = sc tmp = sc