This commit is contained in:
Claudio Atzori 2022-05-05 11:38:08 +02:00
commit 658450d9a3
13 changed files with 928 additions and 1 deletions

View File

@ -0,0 +1,243 @@
package eu.dnetlib.dhp.bulktag;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class SparkEoscTag {
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final Qualifier EOSC_QUALIFIER = OafMapperUtils
.qualifier(
"eosc",
"European Open Science Cloud",
ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES);
public static final DataInfo EOSC_DATAINFO = OafMapperUtils
.dataInfo(
false, "propagation", true, false,
OafMapperUtils
.qualifier(
"propagation:subject", "Inferred by OpenAIRE",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9");
public final static StructuredProperty EOSC_NOTEBOOK = OafMapperUtils
.structuredProperty(
"EOSC::Jupyter Notebook", EOSC_QUALIFIER, EOSC_DATAINFO);
public final static StructuredProperty EOSC_GALAXY = OafMapperUtils
.structuredProperty(
"EOSC::Galaxy Workflow", EOSC_QUALIFIER, EOSC_DATAINFO);
public final static StructuredProperty EOSC_TWITTER = OafMapperUtils
.structuredProperty(
"EOSC::Twitter Data", EOSC_QUALIFIER, EOSC_DATAINFO);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkEoscTag.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
execEoscTag(spark, inputPath, workingPath);
});
}
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
readPath(spark, inputPath + "/software", Software.class)
.map((MapFunction<Software, Software>) s -> {
List<StructuredProperty> sbject;
if (!Optional.ofNullable(s.getSubject()).isPresent())
s.setSubject(new ArrayList<>());
sbject = s.getSubject();
if (containsCriteriaNotebook(s)) {
sbject.add(EOSC_NOTEBOOK);
}
if (containsCriteriaGalaxy(s)) {
sbject.add(EOSC_GALAXY);
}
return s;
}, Encoders.bean(Software.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/software");
readPath(spark, workingPath + "/software", Software.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/software");
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
List<StructuredProperty> sbject;
if (!Optional.ofNullable(orp.getSubject()).isPresent())
orp.setSubject(new ArrayList<>());
sbject = orp.getSubject();
if (containsCriteriaGalaxy(orp)) {
sbject.add(EOSC_GALAXY);
}
if (containscriteriaTwitter(orp)) {
sbject.add(EOSC_TWITTER);
}
return orp;
}, Encoders.bean(OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/otherresearchproduct");
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/otherresearchproduct");
readPath(spark, inputPath + "/dataset", Dataset.class)
.map((MapFunction<Dataset, Dataset>) d -> {
List<StructuredProperty> sbject;
if (!Optional.ofNullable(d.getSubject()).isPresent())
d.setSubject(new ArrayList<>());
sbject = d.getSubject();
if (containscriteriaTwitter(d)) {
sbject.add(EOSC_TWITTER);
}
return d;
}, Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/dataset");
readPath(spark, workingPath + "/dataset", Dataset.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/dataset");
}
private static boolean containscriteriaTwitter(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("twitter") &&
(words.contains("data") || words.contains("dataset")))
return true;
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
return true;
return false;
}
private static boolean containsCriteriaGalaxy(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("galaxy") &&
words.contains("workflow"))
return true;
if (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
return true;
return false;
}
private static boolean containsCriteriaNotebook(Software s) {
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
return true;
if (s
.getSubject()
.stream()
.anyMatch(
sbj -> sbj.getValue().toLowerCase().contains("python") &&
sbj.getValue().toLowerCase().contains("notebook")))
return true;
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
return true;
return false;
}
private static Set<String> getSubjects(List<StructuredProperty> s) {
Set<String> subjects = new HashSet<>();
s.stream().forEach(sbj -> subjects.addAll(Arrays.asList(sbj.getValue().toLowerCase().split(" "))));
s.stream().forEach(sbj -> subjects.add(sbj.getValue().toLowerCase()));
return subjects;
}
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
return words;
}
private static Set<String> getWordsF(List<Field<String>> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
// elem
// .forEach(
// t -> words.addAll(Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" "))));
return words;
}
}

View File

@ -0,0 +1,21 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -204,7 +204,31 @@
<error to="Kill"/>
</action>
<join name="wait" to="End"/>
<join name="wait" to="eosc_tag"/>
<action name="eosc_tag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC_tagging</name>
<class>eu.dnetlib.dhp.bulktag.SparkEoscTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscTag</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>

View File

@ -0,0 +1,547 @@
package eu.dnetlib.dhp.bulktag;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.*;
public class EOSCTagJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(EOSCTagJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(EOSCTagJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void jupyterUpdatesTest() throws Exception {
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
.map(
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
Encoders.bean(Software.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/software");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
.map(
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
.map(
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
.readValue(value, OtherResearchProduct.class),
Encoders.bean(OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Software> tmp = sc
.textFile(workingDir.toString() + "/input/software")
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
Assertions.assertEquals(10, tmp.count());
Assertions
.assertEquals(
4,
tmp
.filter(
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
.count());
Assertions
.assertEquals(
2, tmp
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions
.assertEquals(
5, tmp
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertFalse(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions
.assertEquals(
9, tmp
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions
.assertEquals(
5, tmp
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertFalse(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions
.assertEquals(
9, tmp
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
List<StructuredProperty> subjects = tmp
.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244"))
.collect()
.get(0)
.getSubject();
Assertions.assertEquals(8, subjects.size());
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("simulation numérique")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
Assertions
.assertEquals(
10, sc
.textFile(workingDir.toString() + "/input/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
.count());
Assertions
.assertEquals(
0, sc
.textFile(workingDir.toString() + "/input/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
.filter(
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
.count());
Assertions
.assertEquals(
10, sc
.textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
.count());
Assertions
.assertEquals(
0, sc
.textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
.filter(
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
.count());
// spark.stop();
}
@Test
void galaxyUpdatesTest() throws Exception {
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/software").getPath())
.map(
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
Encoders.bean(Software.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/software");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/dataset").getPath())
.map(
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct").getPath())
.map(
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
.readValue(value, OtherResearchProduct.class),
Encoders.bean(OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Software> tmp = sc
.textFile(workingDir.toString() + "/input/software")
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
Assertions.assertEquals(11, tmp.count());
Assertions
.assertEquals(
2,
tmp
.filter(
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
.count());
Assertions
.assertEquals(
2, tmp
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
Assertions
.assertEquals(
6, tmp
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
Assertions
.assertEquals(
8, tmp
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertFalse(
tmp
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
JavaRDD<OtherResearchProduct> orp = sc
.textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, orp.count());
Assertions
.assertEquals(
2,
orp
.filter(
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
.count());
Assertions
.assertEquals(
3, orp
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
orp
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
Assertions
.assertEquals(
2, orp
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertFalse(
orp
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
Assertions
.assertEquals(
3, orp
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
.collect()
.get(0)
.getSubject()
.size());
Assertions
.assertTrue(
orp
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
.collect()
.get(0)
.getSubject()
.stream()
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
}
@Test
void twitterUpdatesTest() throws Exception {
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/software").getPath())
.map(
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
Encoders.bean(Software.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/software");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/dataset").getPath())
.map(
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct").getPath())
.map(
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
.readValue(value, OtherResearchProduct.class),
Encoders.bean(OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Software> tmp = sc
.textFile(workingDir.toString() + "/input/software")
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
Assertions.assertEquals(10, tmp.count());
Assertions
.assertEquals(
0,
tmp
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
.count());
JavaRDD<OtherResearchProduct> orp = sc
.textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, orp.count());
Assertions
.assertEquals(
3,
orp
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
.count());
JavaRDD<Dataset> dats = sc
.textFile(workingDir.toString() + "/input/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(11, dats.count());
Assertions
.assertEquals(
3,
dats
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
.count());
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long