Merge branch 'beta' into 7096-fileGZip-collector-plugin
commit
06b5533d4c
File diff suppressed because one or more lines are too long
@ -0,0 +1,330 @@
|
||||
{
|
||||
"indexed":{
|
||||
"date-parts":[
|
||||
[
|
||||
2022,
|
||||
4,
|
||||
14
|
||||
]
|
||||
],
|
||||
"date-time":"2022-04-14T11:27:30Z",
|
||||
"timestamp":1649935650109
|
||||
},
|
||||
"reference-count":22,
|
||||
"publisher":"SAGE Publications",
|
||||
"issue":"2",
|
||||
"license":[
|
||||
{
|
||||
"start":{
|
||||
"date-parts":[
|
||||
[
|
||||
1980,
|
||||
4,
|
||||
1
|
||||
]
|
||||
],
|
||||
"date-time":"1980-04-01T00:00:00Z",
|
||||
"timestamp":323395200000
|
||||
},
|
||||
"content-version":"tdm",
|
||||
"delay-in-days":0,
|
||||
"URL":"http:\/\/journals.sagepub.com\/page\/policies\/text-and-data-mining-license"
|
||||
}
|
||||
],
|
||||
"content-domain":{
|
||||
"domain":[
|
||||
|
||||
],
|
||||
"crossmark-restriction":false
|
||||
},
|
||||
"short-container-title":[
|
||||
"Perception"
|
||||
],
|
||||
"published-print":{
|
||||
"date-parts":[
|
||||
[
|
||||
1980,
|
||||
4
|
||||
]
|
||||
]
|
||||
},
|
||||
"abstract":"<jats:p> To answer the question \u2018What is suppressed during binocular rivalry?\u2019 a series of three experiments was performed. In the first experiment observers viewed binocular rivalry between orthogonally oriented patterns. When the dominant and suppressed patterns were interchanged between the eyes observers continued seeing with the dominant eye, indicating that an eye, not a pattern, is suppressed during rivalry. In a second experiment it was found that a suppressed eye was able to contribute to stereopsis. A third experiment demonstrated that the predominance of an eye could be influenced by prior adaptation of the other eye, indicating that binocular mechanisms participate in the rivalry process. <\/jats:p>",
|
||||
"DOI":"10.1068\/p090223",
|
||||
"type":"journal-article",
|
||||
"created":{
|
||||
"date-parts":[
|
||||
[
|
||||
2007,
|
||||
1,
|
||||
23
|
||||
]
|
||||
],
|
||||
"date-time":"2007-01-23T15:21:36Z",
|
||||
"timestamp":1169565696000
|
||||
},
|
||||
"page":"223-231",
|
||||
"source":"Crossref",
|
||||
"is-referenced-by-count":123,
|
||||
"title":[
|
||||
"What is Suppressed during Binocular Rivalry?"
|
||||
],
|
||||
"prefix":"10.1177",
|
||||
"volume":"9",
|
||||
"author":[
|
||||
{
|
||||
"given":"Randolph",
|
||||
"family":"Blake",
|
||||
"sequence":"first",
|
||||
"affiliation":[
|
||||
{
|
||||
"name":"Cresap Neuroscience Laboratory, Northwestern University, Evanston, Illinois 60201, USA"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"given":"David H",
|
||||
"family":"Westendorf",
|
||||
"sequence":"additional",
|
||||
"affiliation":[
|
||||
{
|
||||
"name":"Department of Psychology, University of Arkansas, Fayetteville, Arkansas 72701, USA"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"given":"Randall",
|
||||
"family":"Overton",
|
||||
"sequence":"additional",
|
||||
"affiliation":[
|
||||
{
|
||||
"name":"Department of Psychology, Illinois State University, Normal, Illinois 61761, USA"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"member":"179",
|
||||
"published-online":{
|
||||
"date-parts":[
|
||||
[
|
||||
2016,
|
||||
6,
|
||||
25
|
||||
]
|
||||
]
|
||||
},
|
||||
"reference":[
|
||||
{
|
||||
"key":"bibr1-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1136\/bjo.37.1.37"
|
||||
},
|
||||
{
|
||||
"key":"bibr2-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1037\/0096-1523.5.2.315"
|
||||
},
|
||||
{
|
||||
"key":"bibr3-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1016\/0042-6989(74)90065-0"
|
||||
},
|
||||
{
|
||||
"key":"bibr4-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1068\/p080143"
|
||||
},
|
||||
{
|
||||
"key":"bibr5-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1016\/0042-6989(70)90036-2"
|
||||
},
|
||||
{
|
||||
"key":"bibr6-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1113\/jphysiol.1969.sp008862"
|
||||
},
|
||||
{
|
||||
"key":"bibr7-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1113\/jphysiol.1972.sp010006"
|
||||
},
|
||||
{
|
||||
"key":"bibr8-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1146\/annurev.ps.23.020172.002213"
|
||||
},
|
||||
{
|
||||
"key":"bibr9-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1126\/science.166.3902.245"
|
||||
},
|
||||
{
|
||||
"key":"bibr10-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1037\/h0075805"
|
||||
},
|
||||
{
|
||||
"key":"bibr11-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1113\/jphysiol.1968.sp008552"
|
||||
},
|
||||
{
|
||||
"key":"bibr12-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1113\/jphysiol.1965.sp007784"
|
||||
},
|
||||
{
|
||||
"key":"bibr13-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1037\/h0032455"
|
||||
},
|
||||
{
|
||||
"key":"bibr14-p090223",
|
||||
"volume-title":"Treatise on Physiological Optics",
|
||||
"volume":"3",
|
||||
"author":"von Helmholtz H",
|
||||
"year":"1866",
|
||||
"edition":"3"
|
||||
},
|
||||
{
|
||||
"key":"bibr15-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1068\/p040125"
|
||||
},
|
||||
{
|
||||
"key":"bibr16-p090223",
|
||||
"volume-title":"On Binocular Rivalry",
|
||||
"author":"Levelt W J M",
|
||||
"year":"1965"
|
||||
},
|
||||
{
|
||||
"key":"bibr17-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1001\/archopht.1935.00840020011001"
|
||||
},
|
||||
{
|
||||
"key":"bibr18-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.3758\/BF03205796"
|
||||
},
|
||||
{
|
||||
"key":"bibr19-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.3758\/BF03210180"
|
||||
},
|
||||
{
|
||||
"key":"bibr20-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1037\/0033-2909.85.2.376"
|
||||
},
|
||||
{
|
||||
"key":"bibr21-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.1016\/0042-6989(79)90169-X"
|
||||
},
|
||||
{
|
||||
"key":"bibr22-p090223",
|
||||
"doi-asserted-by":"publisher",
|
||||
"DOI":"10.3758\/BF03210465"
|
||||
}
|
||||
],
|
||||
"container-title":[
|
||||
"Perception"
|
||||
],
|
||||
"original-title":[
|
||||
|
||||
],
|
||||
"language":"en",
|
||||
"link":[
|
||||
{
|
||||
"URL":"http:\/\/journals.sagepub.com\/doi\/pdf\/10.1068\/p090223",
|
||||
"content-type":"application\/pdf",
|
||||
"content-version":"vor",
|
||||
"intended-application":"text-mining"
|
||||
},
|
||||
{
|
||||
"URL":"http:\/\/journals.sagepub.com\/doi\/pdf\/10.1068\/p090223",
|
||||
"content-type":"unspecified",
|
||||
"content-version":"vor",
|
||||
"intended-application":"similarity-checking"
|
||||
}
|
||||
],
|
||||
"deposited":{
|
||||
"date-parts":[
|
||||
[
|
||||
2021,
|
||||
12,
|
||||
3
|
||||
]
|
||||
],
|
||||
"date-time":"2021-12-03T11:49:48Z",
|
||||
"timestamp":1638532188000
|
||||
},
|
||||
"score":1,
|
||||
"resource":{
|
||||
"primary":{
|
||||
"URL":"http:\/\/journals.sagepub.com\/doi\/10.1068\/p090223"
|
||||
}
|
||||
},
|
||||
"subtitle":[
|
||||
|
||||
],
|
||||
"short-title":[
|
||||
|
||||
],
|
||||
"issued":{
|
||||
"date-parts":[
|
||||
[
|
||||
1980,
|
||||
4
|
||||
]
|
||||
]
|
||||
},
|
||||
"references-count":22,
|
||||
"journal-issue":{
|
||||
"issue":"2",
|
||||
"published-print":{
|
||||
"date-parts":[
|
||||
[
|
||||
1980,
|
||||
4
|
||||
]
|
||||
]
|
||||
}
|
||||
},
|
||||
"alternative-id":[
|
||||
"10.1068\/p090223"
|
||||
],
|
||||
"URL":"http:\/\/dx.doi.org\/10.1068\/p090223",
|
||||
"relation":{
|
||||
|
||||
},
|
||||
"ISSN":[
|
||||
"0301-0066",
|
||||
"1468-4233"
|
||||
],
|
||||
"issn-type":[
|
||||
{
|
||||
"value":"0301-0066",
|
||||
"type":"print"
|
||||
},
|
||||
{
|
||||
"value":"1468-4233",
|
||||
"type":"electronic"
|
||||
}
|
||||
],
|
||||
"subject":[
|
||||
"Artificial Intelligence",
|
||||
"Sensory Systems",
|
||||
"Experimental and Cognitive Psychology",
|
||||
"Ophthalmology"
|
||||
],
|
||||
"published":{
|
||||
"date-parts":[
|
||||
[
|
||||
1980,
|
||||
4
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
@ -0,0 +1,251 @@
|
||||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
public class SparkEoscTag {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
public static final Qualifier EOSC_QUALIFIER = OafMapperUtils
|
||||
.qualifier(
|
||||
"EOSC",
|
||||
"European Open Science Cloud",
|
||||
ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES);
|
||||
public static final DataInfo EOSC_DATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false, "propagation", true, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
"propagation:subject", "Inferred by OpenAIRE",
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.9");
|
||||
public final static StructuredProperty EOSC_NOTEBOOK = OafMapperUtils
|
||||
.structuredProperty(
|
||||
"EOSC::Jupyter Notebook", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||
public final static StructuredProperty EOSC_GALAXY = OafMapperUtils
|
||||
.structuredProperty(
|
||||
"EOSC::Galaxy Workflow", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||
public final static StructuredProperty EOSC_TWITTER = OafMapperUtils
|
||||
.structuredProperty(
|
||||
"EOSC::Twitter Data", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkEoscTag.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
execEoscTag(spark, inputPath, workingPath);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
|
||||
|
||||
readPath(spark, inputPath + "/software", Software.class)
|
||||
.map((MapFunction<Software, Software>) s -> {
|
||||
List<StructuredProperty> sbject;
|
||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||
s.setSubject(new ArrayList<>());
|
||||
sbject = s.getSubject();
|
||||
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
sbject.add(EOSC_NOTEBOOK);
|
||||
if (sbject.stream().anyMatch(sb -> sb.getValue().equals("EOSC Jupyter Notebook"))) {
|
||||
sbject = sbject.stream().map(sb -> {
|
||||
if (sb.getValue().equals("EOSC Jupyter Notebook")) {
|
||||
return null;
|
||||
}
|
||||
return sb;
|
||||
}).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
s.setSubject(sbject);
|
||||
}
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
sbject.add(EOSC_GALAXY);
|
||||
}
|
||||
return s;
|
||||
}, Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/software");
|
||||
|
||||
readPath(spark, workingPath + "/software", Software.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/software");
|
||||
|
||||
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
|
||||
List<StructuredProperty> sbject;
|
||||
if (!Optional.ofNullable(orp.getSubject()).isPresent())
|
||||
orp.setSubject(new ArrayList<>());
|
||||
sbject = orp.getSubject();
|
||||
if (containsCriteriaGalaxy(orp)) {
|
||||
sbject.add(EOSC_GALAXY);
|
||||
}
|
||||
if (containscriteriaTwitter(orp)) {
|
||||
sbject.add(EOSC_TWITTER);
|
||||
}
|
||||
return orp;
|
||||
}, Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, inputPath + "/dataset", Dataset.class)
|
||||
.map((MapFunction<Dataset, Dataset>) d -> {
|
||||
List<StructuredProperty> sbject;
|
||||
if (!Optional.ofNullable(d.getSubject()).isPresent())
|
||||
d.setSubject(new ArrayList<>());
|
||||
sbject = d.getSubject();
|
||||
if (containscriteriaTwitter(d)) {
|
||||
sbject.add(EOSC_TWITTER);
|
||||
}
|
||||
return d;
|
||||
}, Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/dataset");
|
||||
|
||||
readPath(spark, workingPath + "/dataset", Dataset.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/dataset");
|
||||
}
|
||||
|
||||
private static boolean containscriteriaTwitter(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
|
||||
if (words.contains("twitter") &&
|
||||
(words.contains("data") || words.contains("dataset")))
|
||||
return true;
|
||||
|
||||
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaGalaxy(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
if (words.contains("galaxy") &&
|
||||
words.contains("workflow"))
|
||||
return true;
|
||||
|
||||
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaNotebook(Software s) {
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||
return true;
|
||||
if (s
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||
sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Set<String> getSubjects(List<StructuredProperty> s) {
|
||||
Set<String> subjects = new HashSet<>();
|
||||
s.stream().forEach(sbj -> subjects.addAll(Arrays.asList(sbj.getValue().toLowerCase().split(" "))));
|
||||
s.stream().forEach(sbj -> subjects.add(sbj.getValue().toLowerCase()));
|
||||
return subjects;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
return words;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
// elem
|
||||
// .forEach(
|
||||
// t -> words.addAll(Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" "))));
|
||||
return words;
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
[
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "wp",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ssm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
]
|
@ -0,0 +1,538 @@
|
||||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class EOSCTagJobTest {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(EOSCTagJobTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(EOSCTagJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void jupyterUpdatesTest() throws Exception {
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
9, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
9, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
List<StructuredProperty> subjects = tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject();
|
||||
Assertions.assertEquals(8, subjects.size());
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("simulation numérique")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.filter(
|
||||
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.filter(
|
||||
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
// spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void galaxyUpdatesTest() throws Exception {
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(11, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
orp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void twitterUpdatesTest() throws Exception {
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
orp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
JavaRDD<Dataset> dats = sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(11, dats.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
dats
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
}
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -1,136 +0,0 @@
|
||||
package eu.dnetlib.dhp.sx.graph.pangaea
|
||||
|
||||
import org.apache.spark.sql.expressions.Aggregator
|
||||
import org.apache.spark.sql.{Encoder, Encoders}
|
||||
import org.json4s
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
||||
import java.util.regex.Pattern
|
||||
import scala.language.postfixOps
|
||||
import scala.xml.{Elem, Node, XML}
|
||||
|
||||
case class PangaeaDataModel(
|
||||
identifier: String,
|
||||
title: List[String],
|
||||
objectType: List[String],
|
||||
creator: List[String],
|
||||
publisher: List[String],
|
||||
dataCenter: List[String],
|
||||
subject: List[String],
|
||||
language: String,
|
||||
rights: String,
|
||||
parent: String,
|
||||
relation: List[String],
|
||||
linkage: List[(String, String)]
|
||||
) {}
|
||||
|
||||
object PangaeaUtils {
|
||||
|
||||
def toDataset(input: String): PangaeaDataModel = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: json4s.JValue = parse(input)
|
||||
val xml = (json \ "xml").extract[String]
|
||||
parseXml(xml)
|
||||
}
|
||||
|
||||
def findDOIInRelation(input: List[String]): List[String] = {
|
||||
val pattern = Pattern.compile("\\b(10[.][0-9]{4,}(?:[.][0-9]+)*\\/(?:(?![\"&\\'<>])\\S)+)\\b")
|
||||
input
|
||||
.map(i => {
|
||||
val matcher = pattern.matcher(i)
|
||||
if (matcher.find())
|
||||
matcher.group(0)
|
||||
else
|
||||
null
|
||||
})
|
||||
.filter(i => i != null)
|
||||
}
|
||||
|
||||
def attributeOpt(attribute: String, node: Node): Option[String] =
|
||||
node.attribute(attribute) flatMap (_.headOption) map (_.text)
|
||||
|
||||
def extractLinkage(node: Elem): List[(String, String)] = {
|
||||
(node \ "linkage")
|
||||
.map(n => (attributeOpt("type", n), n.text))
|
||||
.filter(t => t._1.isDefined)
|
||||
.map(t => (t._1.get, t._2))(collection.breakOut)
|
||||
}
|
||||
|
||||
def parseXml(input: String): PangaeaDataModel = {
|
||||
val xml = XML.loadString(input)
|
||||
|
||||
val identifier = (xml \ "identifier").text
|
||||
val title: List[String] = (xml \ "title").map(n => n.text)(collection.breakOut)
|
||||
val pType: List[String] = (xml \ "type").map(n => n.text)(collection.breakOut)
|
||||
val creators: List[String] = (xml \ "creator").map(n => n.text)(collection.breakOut)
|
||||
val publisher: List[String] = (xml \ "publisher").map(n => n.text)(collection.breakOut)
|
||||
val dataCenter: List[String] = (xml \ "dataCenter").map(n => n.text)(collection.breakOut)
|
||||
val subject: List[String] = (xml \ "subject").map(n => n.text)(collection.breakOut)
|
||||
val language = (xml \ "language").text
|
||||
val rights = (xml \ "rights").text
|
||||
val parentIdentifier = (xml \ "parentIdentifier").text
|
||||
val relation: List[String] = (xml \ "relation").map(n => n.text)(collection.breakOut)
|
||||
val relationFiltered = findDOIInRelation(relation)
|
||||
val linkage: List[(String, String)] = extractLinkage(xml)
|
||||
|
||||
PangaeaDataModel(
|
||||
identifier,
|
||||
title,
|
||||
pType,
|
||||
creators,
|
||||
publisher,
|
||||
dataCenter,
|
||||
subject,
|
||||
language,
|
||||
rights,
|
||||
parentIdentifier,
|
||||
relationFiltered,
|
||||
linkage
|
||||
)
|
||||
}
|
||||
|
||||
def getDatasetAggregator(): Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] =
|
||||
new Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] {
|
||||
|
||||
override def zero: PangaeaDataModel = null
|
||||
|
||||
override def reduce(b: PangaeaDataModel, a: (String, PangaeaDataModel)): PangaeaDataModel = {
|
||||
if (b == null)
|
||||
a._2
|
||||
else {
|
||||
if (a == null)
|
||||
b
|
||||
else {
|
||||
if (b.title != null && b.title.nonEmpty)
|
||||
b
|
||||
else
|
||||
a._2
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def merge(b1: PangaeaDataModel, b2: PangaeaDataModel): PangaeaDataModel = {
|
||||
if (b1 == null)
|
||||
b2
|
||||
else {
|
||||
if (b2 == null)
|
||||
b1
|
||||
else {
|
||||
if (b1.title != null && b1.title.nonEmpty)
|
||||
b1
|
||||
else
|
||||
b2
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
override def finish(reduction: PangaeaDataModel): PangaeaDataModel = reduction
|
||||
|
||||
override def bufferEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
|
||||
|
||||
override def outputEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
|
||||
}
|
||||
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
package eu.dnetlib.dhp.sx.graph.pangaea
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object SparkGeneratePanagaeaDataset {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(
|
||||
Source
|
||||
.fromInputStream(
|
||||
getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json")
|
||||
)
|
||||
.mkString
|
||||
)
|
||||
parser.parseArgument(args)
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(SparkGeneratePanagaeaDataset.getClass.getSimpleName)
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate()
|
||||
|
||||
parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}"))
|
||||
logger.info("Converting sequential file into Dataset")
|
||||
val sc: SparkContext = spark.sparkContext
|
||||
|
||||
val workingPath: String = parser.get("workingPath")
|
||||
|
||||
implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
|
||||
|
||||
val inputRDD: RDD[PangaeaDataModel] =
|
||||
sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s))
|
||||
|
||||
spark
|
||||
.createDataset(inputRDD)
|
||||
.as[PangaeaDataModel]
|
||||
.map(s => (s.identifier, s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(PangaeaUtils.getDatasetAggregator().toColumn)
|
||||
.map(s => s._2)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$workingPath/dataset")
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
package eu.dnetlib.dhp.sx.pangaea
|
||||
|
||||
import eu.dnetlib.dhp.sx.graph.pangaea.PangaeaUtils
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import java.util.TimeZone
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import scala.io.Source
|
||||
class PangaeaTransformTest {
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
def test_dateStamp() :Unit ={
|
||||
|
||||
|
||||
|
||||
val d = new Date()
|
||||
|
||||
val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format d}Z"
|
||||
println(s)
|
||||
|
||||
|
||||
val xml = Source.fromInputStream(getClass.getResourceAsStream("input.xml")).mkString
|
||||
println(PangaeaUtils.parseXml(xml))
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
@ -1 +1,5 @@
|
||||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }}
|
||||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" },
|
||||
"originalId": ["eurocrisdris::1234"],
|
||||
"collectedfrom": [{"key": "eurocrisdris::2b29d08e383ff4cd8a2b6b226ce37e38", "value": "Directory of Research Information System (DRIS)"}],
|
||||
"pid": [{"value": "10.1010.xyx", "qualifier": {"classid": "doi"}}]
|
||||
}
|
@ -1 +1,4 @@
|
||||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }}
|
||||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" },
|
||||
"originalId": ["opendoar____::1234"],
|
||||
"collectedfrom": [{"key": "openaire____::47ce9e9f4fad46e732cff06419ecaabb", "value": "OpenDOAR"}]
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns:datacite="http://datacite.org/schema/kernel-4"
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||
xmlns:oaire="http://namespace.openaire.eu/schema/oaire/"
|
||||
xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<header xmlns="http://www.openarchives.org/OAI/2.0/">
|
||||
<dri:objIdentifier>eosca5322f5f::4dd1aaf93ae136b65dc9ee4e6f76eac9</dri:objIdentifier>
|
||||
<dri:recordIdentifier>53aa90bf-c593-4e6d-923f-d4711ac4b0e1</dri:recordIdentifier>
|
||||
<dri:dateOfCollection>2022-05-25T15:35:48.262Z</dri:dateOfCollection>
|
||||
<oaf:datasourceprefix>eosca5322f5f</oaf:datasourceprefix>
|
||||
<identifier>53aa90bf-c593-4e6d-923f-d4711ac4b0e1</identifier>
|
||||
<datestamp>2022-05-25T15:35:38Z</datestamp>
|
||||
<setSpec>rohub_data</setSpec>
|
||||
<setSpec>ro-crate_data</setSpec>
|
||||
<dr:dateOfTransformation>2022-05-25T15:36:11.094Z</dr:dateOfTransformation>
|
||||
</header>
|
||||
<metadata>
|
||||
<oaire:resource xmlns="http://namespace.openaire.eu/schema/oaire/">
|
||||
<datacite:identifier identifierType="landingPage">https://w3id.org/ro-id/53aa90bf-c593-4e6d-923f-d4711ac4b0e1</datacite:identifier>
|
||||
<datacite:alternateIdentifiers>
|
||||
<datacite:alternateIdentifier alternateIdentifierType="URL">http://api.rohub.org/api/ros/53aa90bf-c593-4e6d-923f-d4711ac4b0e1/</datacite:alternateIdentifier>
|
||||
</datacite:alternateIdentifiers>
|
||||
<datacite:relatedIdentifiers>
|
||||
<datacite:relatedIdentifier relatedIdentifierType="" relationType="">
|
||||
https://github.com/NordicESMhub/RELIANCE/blob/main/content/science/notebooks/air_quality_lockdown.ipynb
|
||||
</datacite:relatedIdentifier>
|
||||
<datacite:relatedIdentifier relatedIdentifierType="URI" relationType="IsPartOf">https://github.com/NordicESMhub/RELIANCE/blob/main/content/science/notebooks/air_quality_lockdown.ipynb</datacite:relatedIdentifier>
|
||||
<datacite:relatedIdentifier relatedIdentifierType="" relationType="">
|
||||
https://nordicesmhub.github.io/RELIANCE/science/notebooks/air_quality_lockdown.html
|
||||
</datacite:relatedIdentifier>
|
||||
<datacite:relatedIdentifier relatedIdentifierType="URI" relationType="IsPartOf">https://nordicesmhub.github.io/RELIANCE/science/notebooks/air_quality_lockdown.html</datacite:relatedIdentifier>
|
||||
</datacite:relatedIdentifiers>
|
||||
<creators xmlns="http://datacite.org/schema/kernel-4">
|
||||
<creator>
|
||||
<creator>
|
||||
<creatorName>Anne Fouilloux</creatorName>
|
||||
</creator>
|
||||
</creator>
|
||||
</creators>
|
||||
<dates xmlns="http://datacite.org/schema/kernel-4">
|
||||
<date dateType="Created">2021-12-19T21:18:33Z</date>
|
||||
</dates>
|
||||
<dc:descriptions>
|
||||
<dc:description descriptionType="Abstract">The COVID-19 pandemic has led to significant reductions in economic activity, especially during lockdowns. Several studies has shown that the concentration of nitrogen dioxyde and particulate matter levels have reduced during lockdown events. Reductions in transportation sector emissions are most likely largely responsible for the NO2 anomalies. In this study, we analyze the impact of lockdown events on the air quality using data from Copernicus Atmosphere Monitoring Service over Europe and at selected locations.</dc:description>
|
||||
</dc:descriptions>
|
||||
<oaire:fundingReferences>
|
||||
<oaire:fundingReference>
|
||||
<oaire:funderName>European Commission</oaire:funderName>
|
||||
<oaire:funderIdentifier funderIdentifierType="Crossref Funder ID">10.13039/501100000781</oaire:funderIdentifier>
|
||||
<oaire:awardNumber awardURI="">101017502</oaire:awardNumber>
|
||||
<oaire:awardTitle>Research Lifecycle Management for Earth Science Communities and Copernicus Users</oaire:awardTitle>
|
||||
</oaire:fundingReference>
|
||||
</oaire:fundingReferences>
|
||||
<oaire:licenseCondition uri="https://opensource.org/licenses/MIT">MIT License</oaire:licenseCondition>
|
||||
<dc:publisher>University of Oslo</dc:publisher>
|
||||
<dc:publicationYear>2021</dc:publicationYear>
|
||||
<oaire:resourceType resourceTypeGeneral="other research product" uri="http://purl.org/coar/resource_type/c_1843">RO-crate</oaire:resourceType>
|
||||
<rightsList xmlns="http://datacite.org/schema/kernel-4">
|
||||
<rights rightsURI="http://purl.org/coar/access_right/c_abf2">open access</rights>
|
||||
</rightsList>
|
||||
<sizes xmlns="http://datacite.org/schema/kernel-4">
|
||||
<size>11.971 MB</size>
|
||||
</sizes>
|
||||
<subjects xmlns="http://datacite.org/schema/kernel-4">
|
||||
<subject>Applied sciences</subject>
|
||||
<subject>Meteorology</subject>
|
||||
<subject>EOSC::RO-crate</subject>
|
||||
</subjects>
|
||||
<titles xmlns="http://datacite.org/schema/kernel-4">
|
||||
<title>Impact of the Covid-19 Lockdown on Air quality over Europe</title>
|
||||
</titles>
|
||||
</oaire:resource>
|
||||
<oaf:identifier identifierType="URL">https://w3id.org/ro-id/53aa90bf-c593-4e6d-923f-d4711ac4b0e1</oaf:identifier>
|
||||
<dr:CobjCategory type="other">0048</dr:CobjCategory>
|
||||
<oaf:dateAccepted/>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:license>https://opensource.org/licenses/MIT</oaf:license>
|
||||
<oaf:language>und</oaf:language>
|
||||
<oaf:hostedBy id="eosc________::psnc::psnc.rohub" name="ROHub"/>
|
||||
<oaf:collectedFrom id="eosc________::psnc::psnc.rohub" name="ROHub"/>
|
||||
</metadata>
|
||||
<about xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2022-05-25T15:35:48.262Z">
|
||||
<baseURL>https%3A%2F%2Fapi.rohub.org%2Fapi%2Foai2d%2F</baseURL>
|
||||
<identifier>53aa90bf-c593-4e6d-923f-d4711ac4b0e1</identifier>
|
||||
<datestamp>2022-05-25T15:35:38Z</datestamp>
|
||||
<metadataNamespace/>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk"
|
||||
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue