forked from D-Net/dnet-hadoop
[Dump Funders] -
This commit is contained in:
parent
faf79db4d5
commit
13d1d73b2e
|
@ -4,14 +4,21 @@ package eu.dnetlib.dhp.oa.graph.dump.funderresults;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.ForeachFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -19,6 +26,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
|
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
|
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
|
||||||
|
@ -66,74 +74,50 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
|
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
|
||||||
String graphPath) {
|
String graphPath) {
|
||||||
|
|
||||||
Dataset<eu.dnetlib.dhp.schema.oaf.Project> project = Utils
|
Dataset<String> funderList = Utils
|
||||||
.readPath(spark, graphPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
|
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
||||||
|
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
|
||||||
|
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
|
||||||
|
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class))
|
||||||
|
.flatMap((FlatMapFunction<CommunityResult, String>) cr ->
|
||||||
|
cr.getProjects().stream().map(p -> p.getFunder().getShortName()).collect(Collectors.toList()).iterator()
|
||||||
|
, Encoders.STRING())
|
||||||
|
.distinct();
|
||||||
|
|
||||||
|
funderList.foreach((ForeachFunction<String>) funder ->
|
||||||
|
getFunderResult(funder, inputPath, spark)
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/" + funder)
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private static Dataset<CommunityResult> getFunderResult(String funderName, String inputPath, SparkSession spark) {
|
||||||
Dataset<CommunityResult> result = Utils
|
Dataset<CommunityResult> result = Utils
|
||||||
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
||||||
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
|
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
|
||||||
.union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class))
|
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
|
||||||
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
||||||
|
return result.map((MapFunction<CommunityResult, CommunityResult>) cr -> {
|
||||||
List<String> funderList = project
|
if (!Optional.ofNullable(cr.getProjects()).isPresent()) {
|
||||||
.map((MapFunction<eu.dnetlib.dhp.schema.oaf.Project, String>) p -> p.getId(),Encoders.STRING() )
|
|
||||||
.distinct()
|
|
||||||
.collectAsList();
|
|
||||||
|
|
||||||
funderList.forEach(funder -> {
|
|
||||||
String fundernsp = funder.substring(3);
|
|
||||||
String funderdump;
|
|
||||||
if (fundernsp.startsWith("corda")) {
|
|
||||||
funderdump = "EC_";
|
|
||||||
if (fundernsp.endsWith("h2020")) {
|
|
||||||
funderdump += "H2020";
|
|
||||||
} else {
|
|
||||||
funderdump += "FP7";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
|
||||||
}
|
|
||||||
writeFunderResult(funder, result, outputPath, funderdump);
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath,
|
|
||||||
String funderName) {
|
|
||||||
|
|
||||||
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
|
||||||
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
for (Project p : r.getProjects()) {
|
for (Project p : cr.getProjects()) {
|
||||||
if (p.getId().startsWith(nsp)) {
|
if (p.getFunder().getShortName().equalsIgnoreCase(funderName)) {
|
||||||
if (nsp.startsWith("40|irb")) {
|
return cr;
|
||||||
if (p.getFunder().getShortName().equals(funderName))
|
|
||||||
return r;
|
|
||||||
else
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return r;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}, Encoders.bean(CommunityResult.class))
|
}, Encoders.bean(CommunityResult.class))
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull);
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/" + funderName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
|
|
||||||
String funderDump) {
|
|
||||||
|
|
||||||
if (funder.startsWith("40|irb")) {
|
|
||||||
dumpResults(funder, results, outputPath, "HRZZ");
|
|
||||||
dumpResults(funder, results, outputPath, "MZOS");
|
|
||||||
} else
|
|
||||||
dumpResults(funder, results, outputPath, funderDump);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
||||||
|
@ -15,6 +16,7 @@ import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -104,12 +106,12 @@ public class SparkResultLinkedToProject implements Serializable {
|
||||||
cr.setProjects(t2._2().getProjectsList());
|
cr.setProjects(t2._2().getProjectsList());
|
||||||
return cr;
|
return cr;
|
||||||
}
|
}
|
||||||
|
|
||||||
, Encoders.bean(CommunityResult.class) )
|
, Encoders.bean(CommunityResult.class) )
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,12 @@
|
||||||
"paramLongName":"graphPath",
|
"paramLongName":"graphPath",
|
||||||
"paramDescription": "the path to the relations",
|
"paramDescription": "the path to the relations",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"cmp",
|
||||||
|
"paramLongName":"communityMapPath",
|
||||||
|
"paramDescription": "the path to the relations",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -321,4 +321,27 @@ public class PrepareResultProjectJobTest {
|
||||||
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
|
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMatchx() throws Exception {
|
||||||
|
|
||||||
|
final String sourcePath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkPrepareResultProject.main(new String[]{
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||||
|
"-sourcePath", sourcePath
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<ResultProject> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/preparedInfo")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
|
||||||
|
|
||||||
|
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
@ -76,7 +77,11 @@ public class ResultLinkedToProjectTest {
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
final String graphPath = getClass()
|
final String graphPath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch")
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
final String communityMapPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
SparkResultLinkedToProject.main(new String[] {
|
SparkResultLinkedToProject.main(new String[] {
|
||||||
|
@ -84,20 +89,18 @@ public class ResultLinkedToProjectTest {
|
||||||
"-outputPath", workingDir.toString() + "/preparedInfo",
|
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||||
"-sourcePath", sourcePath,
|
"-sourcePath", sourcePath,
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
"-graphPath", graphPath
|
"-graphPath", graphPath,
|
||||||
|
"-communityMapPath",communityMapPath
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Result> tmp = sc
|
JavaRDD<CommunityResult> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/preparedInfo")
|
.textFile(workingDir.toString() + "/preparedInfo")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Result.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Result> verificationDataset = spark
|
Assertions.assertEquals(0, tmp.count());
|
||||||
.createDataset(tmp.rdd(), Encoders.bean(Result.class));
|
|
||||||
|
|
||||||
Assertions.assertEquals(0, verificationDataset.count());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,8 +111,12 @@ public class ResultLinkedToProjectTest {
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json")
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
final String relationPath = getClass()
|
final String graphPath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
final String communityMapPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
SparkResultLinkedToProject.main(new String[] {
|
SparkResultLinkedToProject.main(new String[] {
|
||||||
|
@ -117,20 +124,18 @@ public class ResultLinkedToProjectTest {
|
||||||
"-outputPath", workingDir.toString() + "/preparedInfo",
|
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||||
"-sourcePath", sourcePath,
|
"-sourcePath", sourcePath,
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
"-graphPath", relationPath
|
"-graphPath", graphPath,
|
||||||
|
"-communityMapPath", communityMapPath
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Publication> tmp = sc
|
JavaRDD<CommunityResult> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/preparedInfo")
|
.textFile(workingDir.toString() + "/preparedInfo")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
|
Assertions.assertEquals(1, tmp.count());
|
||||||
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
|
||||||
|
|
||||||
Assertions.assertEquals(1, verificationDataset.count());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
{"ee":"SDSN - Greece","epos":"EPOS","enrmaps":"Energy Research","fet-h2020":"FET H2020","instruct":"Instruct-Eric","egi":"EGI Federation","euromarine":"Euromarine","covid-19":"COVID-19","dariah":"DARIAH EU","rda":"Research Data Alliance","clarin":"CLARIN","aginfra":"Agricultural and Food Sciences","risis":"RISI","fam":"Fisheries and Aquaculture Management","beopen":"Transport Research","elixir-gr":"ELIXIR GR","fet-fp7":"FET FP7","ifremer":"Ifremer","science-innovation-policy":"Science and Innovation Policy Studies","mes":"European Marine Scinece","oa-pg":"EC Post-Grant Open Access Pilot","ni":"Neuroinformatics","dh-ch":"Digital Humanities and Cultural Heritage"}
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1 @@
|
||||||
|
{"resultId":"50|a89337edbe55::43e8b61e5e8d682545cb867be8118585","projectsList":[{"id":"40|aka_________::01bb7b48e29d732a1c7bc5150b9195c4","code":"135027","acronym":null,"title":"Dynamic 3D resolution-enhanced low-coherence interferometric imaging / Consortium: Hi-Lo","funder":{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","fundingStream":null},"provenance":{"provenance":"Harvested","trust":"0.900000000000000022"},"validated":null},{"id":"40|aka_________::9d1af21dbd0f5bc719f71553d19a6b3a","code":"316061","acronym":null,"title":"Finnish Imaging of Degenerative Shoulder Study (FIMAGE): A study on the prevalence of degenerative imaging changes of the shoulder and their relevance to clinical symptoms in the general population.","funder":{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","fundingStream":null},"provenance":{"provenance":"Harvested","trust":"0.900000000000000022"},"validated":null}]}
|
Loading…
Reference in New Issue