WIP: dump of the OpenAIRE graph - Changes #103

Closed
miriam.baglioni wants to merge 77 commits from miriam.baglioni/dnet-hadoop:dump into master
1 changed files with 22 additions and 4 deletions
Showing only changes of commit 9841086ef3 - Show all commits

View File

@ -102,19 +102,26 @@ public class SparkDumpFunderResults implements Serializable {
} else {
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
}
writeFunderResult(funder, result, outputPath + "/" + funderdump);
writeFunderResult(funder, result, outputPath, funderdump);
});
}
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath) {
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath,
String funderName, String funderDump) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null;
}
for (Project p : r.getProjects()) {
if (p.getId().startsWith(funder)) {
if (p.getId().startsWith(nsp)) {
if (nsp.equals("40|irb")) {
if (p.getFunder().getShortName().equals(funderName))
return r;
else
return null;
}
return r;
}
}
@ -124,7 +131,18 @@ public class SparkDumpFunderResults implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.json(outputPath + "/" + funderDump);
}
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
String funderDump) {
if (funder.equals("40|irb")) {
dumpResults(funder, results, outputPath, "CSF", "HRZZ");
dumpResults(funder, results, outputPath, "MSES", "MZOS");
} else
dumpResults(funder, results, outputPath, funderDump, null);
}
}