forked from antonis.lempesis/dnet-hadoop
modified code to split the Croatian funder
This commit is contained in:
parent
da88c850c6
commit
86e50f7311
|
@ -8,6 +8,7 @@ import java.util.*;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -102,29 +103,49 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
} else {
|
} else {
|
||||||
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
||||||
}
|
}
|
||||||
writeFunderResult(funder, result, outputPath + "/" + funderdump);
|
writeFunderResult(funder, result, outputPath , funderdump);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath) {
|
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath, String funderName, String funderDump) {
|
||||||
|
|
||||||
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
||||||
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
for (Project p : r.getProjects()) {
|
for (Project p : r.getProjects()) {
|
||||||
if (p.getId().startsWith(funder)) {
|
if (p.getId().startsWith(nsp)) {
|
||||||
|
|
||||||
|
if (nsp.equals("40|irb")) {
|
||||||
|
if (p.getFunder().getShortName().equals(funderName))
|
||||||
|
return r;
|
||||||
|
else
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}, Encoders.bean(CommunityResult.class))
|
}, Encoders.bean(CommunityResult.class))
|
||||||
.filter(Objects::nonNull)
|
.filter((FilterFunction<CommunityResult>) r -> r!= null)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath + "/" + funderDump);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
|
||||||
|
String funderDump) {
|
||||||
|
|
||||||
|
if (funder.equals("40|irb")) {
|
||||||
|
dumpResults(funder, results, outputPath, "CSF", "HRZZ");
|
||||||
|
dumpResults(funder, results, outputPath, "MSES", "MZOS");
|
||||||
|
} else
|
||||||
|
dumpResults(funder, results, outputPath, funderDump, null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue