From 86e50f7311f9b914a01c0bbd1eaee54b925225ad Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Jul 2021 14:31:45 +0200 Subject: [PATCH] modified code to split the Croatian funder --- .../funderresults/SparkDumpFunderResults.java | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 00f604b14b..261fb68bb1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -8,6 +8,7 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -102,29 +103,49 @@ public class SparkDumpFunderResults implements Serializable { } else { funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase(); } - writeFunderResult(funder, result, outputPath + "/" + funderdump); + writeFunderResult(funder, result, outputPath , funderdump); }); } - private static void writeFunderResult(String funder, Dataset results, String outputPath) { + private static void dumpResults(String nsp, Dataset results, String outputPath, String funderName, String funderDump) { results.map((MapFunction) r -> { if (!Optional.ofNullable(r.getProjects()).isPresent()) { return null; } for (Project p : r.getProjects()) { - if (p.getId().startsWith(funder)) { + if (p.getId().startsWith(nsp)) { + + if (nsp.equals("40|irb")) { + if (p.getFunder().getShortName().equals(funderName)) + return r; + else + return null; + } return r; } } return null; }, Encoders.bean(CommunityResult.class)) - .filter(Objects::nonNull) + .filter((FilterFunction) r -> r!= null) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath); + .json(outputPath + "/" + funderDump); } + + private static void writeFunderResult(String funder, Dataset results, String outputPath, + String funderDump) { + + if (funder.equals("40|irb")) { + dumpResults(funder, results, outputPath, "CSF", "HRZZ"); + dumpResults(funder, results, outputPath, "MSES", "MZOS"); + } else + dumpResults(funder, results, outputPath, funderDump, null); + + } + + }