forked from D-Net/dnet-hadoop
[EOSCTag] last test and change in the implementation to search in title and descriptio
This commit is contained in:
parent
e37177e1ce
commit
a21fe310e5
|
@ -584,12 +584,10 @@ case object Crossref2Oaf {
|
||||||
if (dp.length == 10) {
|
if (dp.length == 10) {
|
||||||
return GraphCleaningFunctions.cleanDate(dp)
|
return GraphCleaningFunctions.cleanDate(dp)
|
||||||
}
|
}
|
||||||
}
|
} else if (res.size == 2) {
|
||||||
else if (res.size ==2) {
|
|
||||||
val dp = f"${res.head}-${res(1)}%02d-01"
|
val dp = f"${res.head}-${res(1)}%02d-01"
|
||||||
return GraphCleaningFunctions.cleanDate(dp)
|
return GraphCleaningFunctions.cleanDate(dp)
|
||||||
}
|
} else if (res.size == 1) {
|
||||||
else if (res.size ==1) {
|
|
||||||
return GraphCleaningFunctions.cleanDate(s"${res.head}-01-01")
|
return GraphCleaningFunctions.cleanDate(s"${res.head}-01-01")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,10 +73,10 @@ class CrossrefMappingTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def crossrefIssueDateTest(): Unit = {
|
def crossrefIssueDateTest(): Unit = {
|
||||||
val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/issue_date.json")).mkString
|
val json =
|
||||||
|
Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/issue_date.json")).mkString
|
||||||
assertNotNull(json)
|
assertNotNull(json)
|
||||||
assertFalse(json.isEmpty)
|
assertFalse(json.isEmpty)
|
||||||
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
|
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.bulktag;
|
package eu.dnetlib.dhp.bulktag;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import java.util.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -14,185 +18,214 @@ import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
|
||||||
public class SparkEoscTag {
|
public class SparkEoscTag {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
|
||||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
public static final Qualifier EOSC_QUALIFIER = OafMapperUtils.qualifier("eosc",
|
public static final Qualifier EOSC_QUALIFIER = OafMapperUtils
|
||||||
"European Open Science Cloud",
|
.qualifier(
|
||||||
ModelConstants.DNET_SUBJECT_TYPOLOGIES,ModelConstants.DNET_SUBJECT_TYPOLOGIES);
|
"eosc",
|
||||||
public static final DataInfo EOSC_DATAINFO = OafMapperUtils.dataInfo(false, "propagation", true, false,
|
"European Open Science Cloud",
|
||||||
OafMapperUtils.qualifier("propagation:subject","Inferred by OpenAIRE",
|
ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES);
|
||||||
ModelConstants.DNET_PROVENANCE_ACTIONS,ModelConstants.DNET_PROVENANCE_ACTIONS), "0.9");
|
public static final DataInfo EOSC_DATAINFO = OafMapperUtils
|
||||||
public final static StructuredProperty EOSC_NOTEBOOK = OafMapperUtils.structuredProperty(
|
.dataInfo(
|
||||||
"EOSC::Jupyter Notebook", EOSC_QUALIFIER,EOSC_DATAINFO);
|
false, "propagation", true, false,
|
||||||
public final static StructuredProperty EOSC_GALAXY = OafMapperUtils.structuredProperty(
|
OafMapperUtils
|
||||||
"EOSC::Galaxy Workflow", EOSC_QUALIFIER, EOSC_DATAINFO);
|
.qualifier(
|
||||||
public final static StructuredProperty EOSC_TWITTER = OafMapperUtils.structuredProperty(
|
"propagation:subject", "Inferred by OpenAIRE",
|
||||||
"EOSC::Twitter Data", EOSC_QUALIFIER,EOSC_DATAINFO);
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.9");
|
||||||
|
public final static StructuredProperty EOSC_NOTEBOOK = OafMapperUtils
|
||||||
|
.structuredProperty(
|
||||||
|
"EOSC::Jupyter Notebook", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||||
|
public final static StructuredProperty EOSC_GALAXY = OafMapperUtils
|
||||||
|
.structuredProperty(
|
||||||
|
"EOSC::Galaxy Workflow", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||||
|
public final static StructuredProperty EOSC_TWITTER = OafMapperUtils
|
||||||
|
.structuredProperty(
|
||||||
|
"EOSC::Twitter Data", EOSC_QUALIFIER, EOSC_DATAINFO);
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
SparkEoscTag.class
|
SparkEoscTag.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
|
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
Boolean isSparkSessionManaged = Optional
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
execEoscTag(spark, inputPath, workingPath);
|
execEoscTag(spark, inputPath, workingPath);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
|
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
|
||||||
|
|
||||||
readPath(spark, inputPath + "/software", Software.class)
|
readPath(spark, inputPath + "/software", Software.class)
|
||||||
.map((MapFunction<Software, Software>) s -> {
|
.map((MapFunction<Software, Software>) s -> {
|
||||||
List<StructuredProperty> sbject;
|
List<StructuredProperty> sbject;
|
||||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||||
s.setSubject(new ArrayList<>());
|
s.setSubject(new ArrayList<>());
|
||||||
sbject = s.getSubject();
|
sbject = s.getSubject();
|
||||||
|
|
||||||
if(containsCriteriaNotebook(s)){
|
if (containsCriteriaNotebook(s)) {
|
||||||
sbject.add(EOSC_NOTEBOOK);
|
sbject.add(EOSC_NOTEBOOK);
|
||||||
|
|
||||||
}
|
}
|
||||||
if(containsCriteriaGalaxy(s)){
|
if (containsCriteriaGalaxy(s)) {
|
||||||
sbject.add(EOSC_GALAXY);
|
sbject.add(EOSC_GALAXY);
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}, Encoders.bean(Software.class) )
|
}, Encoders.bean(Software.class))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression","gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingPath + "/software");
|
.json(workingPath + "/software");
|
||||||
|
|
||||||
readPath(spark, workingPath + "/software" , Software.class)
|
readPath(spark, workingPath + "/software", Software.class)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression","gzip")
|
.option("compression", "gzip")
|
||||||
.json(inputPath + "/software");
|
.json(inputPath + "/software");
|
||||||
|
|
||||||
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||||
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp ->
|
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
|
||||||
{
|
List<StructuredProperty> sbject;
|
||||||
List<StructuredProperty> sbject;
|
if (!Optional.ofNullable(orp.getSubject()).isPresent())
|
||||||
if (!Optional.ofNullable(orp.getSubject()).isPresent())
|
orp.setSubject(new ArrayList<>());
|
||||||
orp.setSubject(new ArrayList<>());
|
sbject = orp.getSubject();
|
||||||
sbject = orp.getSubject();
|
if (containsCriteriaGalaxy(orp)) {
|
||||||
if(containsCriteriaGalaxy(orp)){
|
sbject.add(EOSC_GALAXY);
|
||||||
sbject.add(EOSC_GALAXY);
|
}
|
||||||
}
|
if (containscriteriaTwitter(orp)) {
|
||||||
if(containscriteriaTwitter(orp)){
|
sbject.add(EOSC_TWITTER);
|
||||||
sbject.add(EOSC_TWITTER);
|
}
|
||||||
}
|
return orp;
|
||||||
return orp;
|
}, Encoders.bean(OtherResearchProduct.class))
|
||||||
}, Encoders.bean(OtherResearchProduct.class))
|
.write()
|
||||||
.write()
|
.mode(SaveMode.Overwrite)
|
||||||
.mode(SaveMode.Overwrite)
|
.option("compression", "gzip")
|
||||||
.option("compression","gzip")
|
.json(workingPath + "/otherresearchproduct");
|
||||||
.json(workingPath + "/otherresearchproduct");
|
|
||||||
|
|
||||||
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
|
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression","gzip")
|
.option("compression", "gzip")
|
||||||
.json(inputPath + "/otherresearchproduct");
|
.json(inputPath + "/otherresearchproduct");
|
||||||
|
|
||||||
readPath(spark, inputPath + "/dataset", Dataset.class)
|
readPath(spark, inputPath + "/dataset", Dataset.class)
|
||||||
.map((MapFunction<Dataset, Dataset>) d -> {
|
.map((MapFunction<Dataset, Dataset>) d -> {
|
||||||
List<StructuredProperty> sbject;
|
List<StructuredProperty> sbject;
|
||||||
if (!Optional.ofNullable(d.getSubject()).isPresent())
|
if (!Optional.ofNullable(d.getSubject()).isPresent())
|
||||||
d.setSubject(new ArrayList<>());
|
d.setSubject(new ArrayList<>());
|
||||||
sbject = d.getSubject();
|
sbject = d.getSubject();
|
||||||
if(containscriteriaTwitter(d)){
|
if (containscriteriaTwitter(d)) {
|
||||||
sbject.add(EOSC_TWITTER);
|
sbject.add(EOSC_TWITTER);
|
||||||
}
|
}
|
||||||
return d;
|
return d;
|
||||||
} , Encoders.bean(Dataset.class) )
|
}, Encoders.bean(Dataset.class))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression","gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingPath + "/dataset");
|
.json(workingPath + "/dataset");
|
||||||
|
|
||||||
readPath(spark, workingPath + "/dataset" , Dataset.class)
|
readPath(spark, workingPath + "/dataset", Dataset.class)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression","gzip")
|
.option("compression", "gzip")
|
||||||
.json(inputPath + "/dataset");
|
.json(inputPath + "/dataset");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean containscriteriaTwitter(Result r) {
|
private static boolean containscriteriaTwitter(Result r) {
|
||||||
if (r.getTitle().stream().anyMatch(t -> t.getValue().toLowerCase().contains("twitter") &&
|
Set<String> words = getWordsSP(r.getTitle());
|
||||||
t.getValue().toLowerCase().contains("data")))
|
words.addAll(getWordsF(r.getDescription()));
|
||||||
return true;
|
|
||||||
if(r.getDescription().stream().anyMatch(d -> d.getValue().toLowerCase().contains("twitter") &&
|
|
||||||
d.getValue().toLowerCase().contains("data") ))
|
|
||||||
return true;
|
|
||||||
if(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
|
||||||
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
|
||||||
return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean containsCriteriaGalaxy(Result r) {
|
if (words.contains("twitter") &&
|
||||||
if (r.getTitle().stream().anyMatch(t -> t.getValue().toLowerCase().contains("galaxy") &&
|
(words.contains("data") || words.contains("dataset")))
|
||||||
(t.getValue().toLowerCase().contains("workflow") || t.getValue().toLowerCase().contains("software"))))
|
return true;
|
||||||
return true;
|
|
||||||
if(r.getDescription().stream().anyMatch(d -> d.getValue().toLowerCase().contains("galaxy") &&
|
|
||||||
(d.getValue().toLowerCase().contains("workflow") || d.getValue().toLowerCase().contains("software"))))
|
|
||||||
return true;
|
|
||||||
if(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
|
||||||
(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow"))) ||
|
|
||||||
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("software")))
|
|
||||||
return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean containsCriteriaNotebook(Software s) {
|
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||||
if(s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||||
return true;
|
return true;
|
||||||
if(s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
return false;
|
||||||
sbj.getValue().toLowerCase().contains("notebook")))
|
}
|
||||||
return true;
|
|
||||||
if(s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
|
||||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
|
||||||
return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean containsTitleNotebook(Software s) {
|
private static boolean containsCriteriaGalaxy(Result r) {
|
||||||
if (s.getTitle().stream().anyMatch(t -> t.getValue().toLowerCase().contains("jupyter") &&
|
Set<String> words = getWordsSP(r.getTitle());
|
||||||
t.getValue().toLowerCase().contains("notebook")))
|
words.addAll(getWordsF(r.getDescription()));
|
||||||
return true;
|
if (words.contains("galaxy") &&
|
||||||
return false;
|
(words.contains("workflow") || words.contains("software")))
|
||||||
}
|
return true;
|
||||||
|
|
||||||
|
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||||
|
(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow"))) ||
|
||||||
|
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("software")))
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean containsCriteriaNotebook(Software s) {
|
||||||
|
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||||
|
return true;
|
||||||
|
if (s
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||||
|
sbj.getValue().toLowerCase().contains("notebook")))
|
||||||
|
return true;
|
||||||
|
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||||
|
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getSubjects(List<StructuredProperty> s) {
|
||||||
|
Set<String> subjects = new HashSet<>();
|
||||||
|
s.stream().forEach(sbj -> subjects.addAll(Arrays.asList(sbj.getValue().toLowerCase().split(" "))));
|
||||||
|
s.stream().forEach(sbj -> subjects.add(sbj.getValue().toLowerCase()));
|
||||||
|
return subjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||||
|
Set<String> words = new HashSet<>();
|
||||||
|
elem
|
||||||
|
.forEach(
|
||||||
|
t -> words.addAll(Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" "))));
|
||||||
|
return words;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||||
|
Set<String> words = new HashSet<>();
|
||||||
|
elem
|
||||||
|
.forEach(
|
||||||
|
t -> words.addAll(Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" "))));
|
||||||
|
return words;
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.bulktag;
|
package eu.dnetlib.dhp.bulktag;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -11,6 +16,7 @@ import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
@ -19,26 +25,20 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
public class EOSCTagJobTest {
|
public class EOSCTagJobTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
||||||
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() throws IOException {
|
public static void beforeAll() throws IOException {
|
||||||
workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName());
|
workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName());
|
||||||
|
@ -70,23 +70,39 @@ public class EOSCTagJobTest {
|
||||||
@Test
|
@Test
|
||||||
void jupyterUpdatesTest() throws Exception {
|
void jupyterUpdatesTest() throws Exception {
|
||||||
|
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
|
spark
|
||||||
.map((MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class), Encoders.bean(Software.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/software");
|
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||||
|
Encoders.bean(Software.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/software");
|
||||||
|
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
|
spark
|
||||||
.map((MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class), Encoders.bean(Dataset.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/dataset");
|
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||||
|
Encoders.bean(Dataset.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/dataset");
|
||||||
|
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
|
spark
|
||||||
.map((MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER.readValue(value, OtherResearchProduct.class), Encoders.bean(OtherResearchProduct.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, OtherResearchProduct.class),
|
||||||
|
Encoders.bean(OtherResearchProduct.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||||
|
|
||||||
SparkEoscTag
|
SparkEoscTag
|
||||||
.main(
|
.main(
|
||||||
|
@ -106,36 +122,109 @@ public class EOSCTagJobTest {
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
Assertions.assertEquals(4, tmp.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
4,
|
||||||
|
tmp
|
||||||
|
.filter(
|
||||||
|
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||||
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(2, tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
|
2, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
5, tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertFalse(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
|
|
||||||
Assertions.assertEquals(5, tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertFalse(tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
|
9, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
|
|
||||||
Assertions.assertEquals(9, tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
|
5, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertFalse(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
|
|
||||||
Assertions.assertEquals(5, tmp.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertFalse(tmp.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect()
|
9, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
|
|
||||||
Assertions.assertEquals(9, tmp.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect()
|
List<StructuredProperty> subjects = tmp
|
||||||
.get(0).getSubject().size());
|
.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244"))
|
||||||
Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect()
|
.collect()
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
.get(0)
|
||||||
|
.getSubject();
|
||||||
List<StructuredProperty> subjects = tmp.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244")).collect()
|
|
||||||
.get(0).getSubject();
|
|
||||||
Assertions.assertEquals(8, subjects.size());
|
Assertions.assertEquals(8, subjects.size());
|
||||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
|
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
|
||||||
|
@ -146,104 +235,313 @@ public class EOSCTagJobTest {
|
||||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
|
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
|
||||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
|
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
10, sc
|
||||||
|
.textFile(workingDir.toString() + "/input/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||||
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(10, sc
|
Assertions
|
||||||
.textFile(workingDir.toString() + "/input/dataset")
|
.assertEquals(
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)).count());
|
0, sc
|
||||||
|
.textFile(workingDir.toString() + "/input/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||||
|
.filter(
|
||||||
|
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||||
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(0, sc
|
Assertions
|
||||||
.textFile(workingDir.toString() + "/input/dataset")
|
.assertEquals(
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)).filter(ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
|
10, sc
|
||||||
|
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
0, sc
|
||||||
|
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||||
|
.filter(
|
||||||
|
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||||
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(10, sc
|
// spark.stop();
|
||||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).count());
|
|
||||||
|
|
||||||
Assertions.assertEquals(0, sc
|
|
||||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).filter(ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void galaxyUpdatesTest() throws Exception {
|
void galaxyUpdatesTest() throws Exception {
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/software").getPath())
|
spark
|
||||||
.map((MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class), Encoders.bean(Software.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/software").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/software");
|
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||||
|
Encoders.bean(Software.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/software");
|
||||||
|
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/dataset").getPath())
|
spark
|
||||||
.map((MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class), Encoders.bean(Dataset.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/dataset").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/dataset");
|
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||||
|
Encoders.bean(Dataset.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/dataset");
|
||||||
|
|
||||||
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct").getPath())
|
spark
|
||||||
.map((MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER.readValue(value, OtherResearchProduct.class), Encoders.bean(OtherResearchProduct.class))
|
.read()
|
||||||
.write()
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct").getPath())
|
||||||
.option("compression","gzip")
|
.map(
|
||||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, OtherResearchProduct.class),
|
||||||
|
Encoders.bean(OtherResearchProduct.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||||
|
|
||||||
SparkEoscTag
|
SparkEoscTag
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-sourcePath",
|
"-sourcePath",
|
||||||
workingDir.toString() + "/input",
|
workingDir.toString() + "/input",
|
||||||
"-workingPath", workingDir.toString() + "/working"
|
"-workingPath", workingDir.toString() + "/working"
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Software> tmp = sc
|
JavaRDD<Software> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/input/software")
|
.textFile(workingDir.toString() + "/input/software")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
Assertions.assertEquals(2, tmp.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow"))).count());
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
tmp
|
||||||
|
.filter(
|
||||||
|
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||||
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(2, tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
|
2, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
6, tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
Assertions.assertEquals(6, tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
|
8, tmp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertFalse(
|
||||||
|
tmp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
Assertions.assertEquals(8, tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
|
JavaRDD<OtherResearchProduct> orp = sc
|
||||||
.get(0).getSubject().size());
|
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||||
Assertions.assertFalse(tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
|
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
|
||||||
|
|
||||||
JavaRDD<OtherResearchProduct> orp = sc.textFile(workingDir.toString() + "/input/otherresearchproduct").map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
|
||||||
|
|
||||||
Assertions.assertEquals(10, orp.count());
|
Assertions.assertEquals(10, orp.count());
|
||||||
|
|
||||||
Assertions.assertEquals(2, orp.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow"))).count());
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
orp
|
||||||
|
.filter(
|
||||||
|
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3, orp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
orp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
Assertions.assertEquals(3, orp.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertTrue(orp.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect()
|
2, orp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertFalse(
|
||||||
|
orp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
Assertions.assertEquals(2, orp.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5")).collect()
|
Assertions
|
||||||
.get(0).getSubject().size());
|
.assertEquals(
|
||||||
Assertions.assertFalse(orp.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5")).collect()
|
3, orp
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||||
|
.collect()
|
||||||
Assertions.assertEquals(3, orp.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72")).collect()
|
.get(0)
|
||||||
.get(0).getSubject().size());
|
.getSubject()
|
||||||
Assertions.assertTrue(orp.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72")).collect()
|
.size());
|
||||||
.get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
orp
|
||||||
|
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void twitterUpdatesTest() throws Exception {
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/software").getPath())
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||||
|
Encoders.bean(Software.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/software");
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/dataset").getPath())
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||||
|
Encoders.bean(Dataset.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/dataset");
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct").getPath())
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, OtherResearchProduct.class),
|
||||||
|
Encoders.bean(OtherResearchProduct.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||||
|
|
||||||
|
SparkEoscTag
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
workingDir.toString() + "/input",
|
||||||
|
"-workingPath", workingDir.toString() + "/working"
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Software> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/input/software")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
0,
|
||||||
|
tmp
|
||||||
|
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
JavaRDD<OtherResearchProduct> orp = sc
|
||||||
|
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, orp.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3,
|
||||||
|
orp
|
||||||
|
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> dats = sc
|
||||||
|
.textFile(workingDir.toString() + "/input/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, dats.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
dats
|
||||||
|
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue