forked from D-Net/dnet-hadoop
changes to mirror the last dump of the graph with the ols data model.
This commit is contained in:
parent
886617afd0
commit
774cdb190e
|
@ -70,9 +70,15 @@ class MAGMappingTest {
|
||||||
|
|
||||||
implicit val formats = DefaultFormats
|
implicit val formats = DefaultFormats
|
||||||
|
|
||||||
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
|
|
||||||
val sc = new SparkContext(conf)
|
val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.driver.host", "localhost")
|
||||||
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
|
|
||||||
|
val spark: SparkSession =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate()
|
||||||
val path = getClass.getResource("magPapers.json").getPath
|
val path = getClass.getResource("magPapers.json").getPath
|
||||||
|
|
||||||
import org.apache.spark.sql.Encoders
|
import org.apache.spark.sql.Encoders
|
||||||
|
@ -95,9 +101,15 @@ class MAGMappingTest {
|
||||||
|
|
||||||
implicit val formats = DefaultFormats
|
implicit val formats = DefaultFormats
|
||||||
|
|
||||||
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
|
|
||||||
val sc = new SparkContext(conf)
|
val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.driver.host", "localhost")
|
||||||
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
|
|
||||||
|
val spark: SparkSession =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate()
|
||||||
val path = getClass.getResource("duplicatedMagPapers.json").getPath
|
val path = getClass.getResource("duplicatedMagPapers.json").getPath
|
||||||
|
|
||||||
import org.apache.spark.sql.Encoders
|
import org.apache.spark.sql.Encoders
|
||||||
|
|
|
@ -37,7 +37,8 @@ public class DumpProducts implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
Utils.removeOutputDir(spark, outputPath);
|
||||||
execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
|
execDump(
|
||||||
|
spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,14 +48,14 @@ public class SendToZenodoHDFS implements Serializable {
|
||||||
.orElse(false);
|
.orElse(false);
|
||||||
|
|
||||||
final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null);
|
final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null);
|
||||||
final String communityMapPath = parser.get("communityMapPath");
|
//final String communityMapPath = parser.get("communityMapPath");
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
FileSystem fileSystem = FileSystem.get(conf);
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
CommunityMap communityMap = Utils.readCommunityMap(fileSystem, communityMapPath);
|
//CommunityMap communityMap = Utils.readCommunityMap(fileSystem, communityMapPath);
|
||||||
|
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
.listFiles(
|
.listFiles(
|
||||||
|
@ -87,10 +87,10 @@ public class SendToZenodoHDFS implements Serializable {
|
||||||
if (!p_string.endsWith("_SUCCESS")) {
|
if (!p_string.endsWith("_SUCCESS")) {
|
||||||
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
|
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
|
||||||
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
||||||
log.info("Sending information for community: " + name);
|
// log.info("Sending information for community: " + name);
|
||||||
if (communityMap.containsKey(name.substring(0, name.lastIndexOf(".")))) {
|
// if (communityMap.containsKey(name.substring(0, name.lastIndexOf(".")))) {
|
||||||
name = communityMap.get(name.substring(0, name.lastIndexOf("."))).replace(" ", "_") + ".tar";
|
// name = communityMap.get(name.substring(0, name.lastIndexOf("."))).replace(" ", "_") + ".tar";
|
||||||
}
|
// }
|
||||||
|
|
||||||
FSDataInputStream inputStream = fileSystem.open(p);
|
FSDataInputStream inputStream = fileSystem.open(p);
|
||||||
zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen());
|
zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen());
|
||||||
|
|
|
@ -34,12 +34,13 @@ public class CommunitySplit implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
Utils.removeOutputDir(spark, outputPath);
|
||||||
execSplit(spark, inputPath, outputPath, Utils.getCommunityMap(spark, communityMapPath).keySet());
|
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
|
||||||
|
execSplit(spark, inputPath, outputPath, communityMap);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
|
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
|
||||||
Set<String> communities) {
|
CommunityMap communities) {
|
||||||
|
|
||||||
Dataset<CommunityResult> result = Utils
|
Dataset<CommunityResult> result = Utils
|
||||||
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
||||||
|
@ -48,8 +49,9 @@ public class CommunitySplit implements Serializable {
|
||||||
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
||||||
|
|
||||||
communities
|
communities
|
||||||
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.forEach(c -> printResult(c, result, outputPath));
|
.forEach(c -> printResult(c, result, outputPath + "/" + communities.get(c).replace(" ", "_")));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,7 +63,7 @@ public class CommunitySplit implements Serializable {
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(outputPath + "/" + c);
|
.json(outputPath);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,12 +103,13 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
} else {
|
} else {
|
||||||
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
||||||
}
|
}
|
||||||
writeFunderResult(funder, result, outputPath , funderdump);
|
writeFunderResult(funder, result, outputPath, funderdump);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath, String funderName) {
|
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath,
|
||||||
|
String funderName) {
|
||||||
|
|
||||||
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
||||||
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
||||||
|
@ -128,14 +129,13 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}, Encoders.bean(CommunityResult.class))
|
}, Encoders.bean(CommunityResult.class))
|
||||||
.filter((FilterFunction<CommunityResult>) r -> r!= null)
|
.filter((FilterFunction<CommunityResult>) r -> r != null)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath + "/" + funderName);
|
.json(outputPath + "/" + funderName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
|
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
|
||||||
String funderDump) {
|
String funderDump) {
|
||||||
|
|
||||||
|
@ -147,5 +147,4 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,23 @@
|
||||||
{
|
{
|
||||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
"definitions": {
|
"definitions": {
|
||||||
|
"AccessRight":{
|
||||||
|
"type":"object",
|
||||||
|
"properties":{
|
||||||
|
"code": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "COAR access mode code: http://vocabularies.coar-repositories.org/documentation/access_rights/"
|
||||||
|
},
|
||||||
|
"label": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Label for the access mode"
|
||||||
|
},
|
||||||
|
"scheme": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Scheme of reference for access right code. Always set to COAR access rights vocabulary: http://vocabularies.coar-repositories.org/documentation/access_rights/"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"ControlledField": {
|
"ControlledField": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
@ -266,6 +283,57 @@
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"instance":{
|
||||||
|
"type":"array",
|
||||||
|
"items":{
|
||||||
|
"type":"object",
|
||||||
|
"properties":{
|
||||||
|
"accessright":{
|
||||||
|
"allOf":[
|
||||||
|
{
|
||||||
|
"$ref":"#/definitions/AccessRight"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"description":"The accessright of this materialization of the result"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"articleprocessingcharge":{
|
||||||
|
"type":"object",
|
||||||
|
"properties":{
|
||||||
|
"amount":{
|
||||||
|
"type":"string"
|
||||||
|
},
|
||||||
|
"currency":{
|
||||||
|
"type":"string"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"license":{
|
||||||
|
"type":"string"
|
||||||
|
},
|
||||||
|
"publicationdate":{
|
||||||
|
"type":"string"
|
||||||
|
},
|
||||||
|
"refereed":{
|
||||||
|
"type":"string"
|
||||||
|
},
|
||||||
|
"type":{
|
||||||
|
"type":"string",
|
||||||
|
"description":"The specific sub-type of this materialization of the result (see https://api.openaire.eu/vocabularies/dnet:result_typologies following the links)"
|
||||||
|
},
|
||||||
|
"url":{
|
||||||
|
"description":"Description of url",
|
||||||
|
"type":"array",
|
||||||
|
"items":{
|
||||||
|
"type":"string",
|
||||||
|
"description":"urls where it is possible to access the materialization of the result"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"description":"One of the materialization for this result"
|
||||||
|
}
|
||||||
|
},
|
||||||
"programmingLanguage": {
|
"programmingLanguage": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "Only for results with type 'software': the programming language"
|
"description": "Only for results with type 'software': the programming language"
|
||||||
|
@ -302,7 +370,7 @@
|
||||||
"subject": {
|
"subject": {
|
||||||
"allOf": [
|
"allOf": [
|
||||||
{"$ref": "#/definitions/ControlledField"},
|
{"$ref": "#/definitions/ControlledField"},
|
||||||
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary)."},
|
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary)."}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class QueryInformationSystemTest {
|
||||||
lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap);
|
lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap);
|
||||||
queryInformationSystem = new QueryInformationSystem();
|
queryInformationSystem = new QueryInformationSystem();
|
||||||
queryInformationSystem.setIsLookUp(isLookUpService);
|
queryInformationSystem.setIsLookUp(isLookUpService);
|
||||||
map = queryInformationSystem.getCommunityMap();
|
map = queryInformationSystem.getCommunityMap(false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -379,7 +379,7 @@ public class CreateRelationTest {
|
||||||
" <param name=\"rule\"/>\n" +
|
" <param name=\"rule\"/>\n" +
|
||||||
" <param name=\"CD_PROJECT_NUMBER\">675858</param>\n" +
|
" <param name=\"CD_PROJECT_NUMBER\">675858</param>\n" +
|
||||||
" <param name=\"url\"/>\n" +
|
" <param name=\"url\"/>\n" +
|
||||||
" <param name=\"funding\">H2020-EINFRA-2015-1</param>\n" +
|
" <param name=\"funding\">EC | H2020 | RIA</param>\n" +
|
||||||
" <param name=\"funder\">EC</param>\n" +
|
" <param name=\"funder\">EC</param>\n" +
|
||||||
" <param name=\"acronym\">West-Life</param>\n" +
|
" <param name=\"acronym\">West-Life</param>\n" +
|
||||||
" </concept>\n" +
|
" </concept>\n" +
|
||||||
|
|
Loading…
Reference in New Issue