[bulkTag] refactor to enrich each result single step
This commit is contained in:
parent
697a134504
commit
efc4f6a658
|
@ -4,12 +4,12 @@ package eu.dnetlib.dhp.bulktag;
|
|||
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -23,10 +23,17 @@ import com.google.gson.Gson;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.bulktag.community.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class SparkBulkTagJob {
|
||||
|
||||
private static String OPENAIRE_3 = "openaire3.0";
|
||||
private static String OPENAIRE_4 = "openaire-pub_4.0";
|
||||
private static String OPENAIRE_CRIS = "openaire-cris_1.1";
|
||||
private static String OPENAIRE_DATA = "openaire2.0_data";
|
||||
private static String EOSC = "10|openaire____::2e06c1122c7df43765fdcf91080824fa";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -88,10 +95,47 @@ public class SparkBulkTagJob {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
|
||||
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
|
||||
});
|
||||
}
|
||||
|
||||
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
|
||||
CommunityConfiguration cc) {
|
||||
|
||||
Dataset<String> datasources = readPath(
|
||||
spark, inputPath
|
||||
.substring(
|
||||
0,
|
||||
inputPath.lastIndexOf("/"))
|
||||
+ "/datasource",
|
||||
Datasource.class)
|
||||
.filter((FilterFunction<Datasource>) ds -> isOKDatasource(ds))
|
||||
.map((MapFunction<Datasource, String>) ds -> ds.getId(), Encoders.STRING());
|
||||
|
||||
Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
|
||||
|
||||
for (String ds : datasources.collectAsList()) {
|
||||
final String dsId = ds.substring(3);
|
||||
if (!dsm.containsKey(dsId)) {
|
||||
ArrayList<Pair<String, SelectionConstraints>> eoscList = new ArrayList<>();
|
||||
dsm.put(dsId, eoscList);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static boolean isOKDatasource(Datasource ds) {
|
||||
final String compatibility = ds.getOpenairecompatibility().getClassid();
|
||||
boolean isOk = (compatibility.equalsIgnoreCase(OPENAIRE_3) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_4) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_CRIS) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_DATA)) &&
|
||||
ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC));
|
||||
|
||||
return isOk;
|
||||
}
|
||||
|
||||
private static <R extends Result> void execBulkTag(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
|
|
|
@ -5,8 +5,6 @@ import java.io.Serializable;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.avro.generic.GenericData;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
/** Created by miriam on 01/08/2018. */
|
||||
|
@ -16,7 +14,7 @@ public class Community implements Serializable {
|
|||
private List<String> subjects = new ArrayList<>();
|
||||
private List<Provider> providers = new ArrayList<>();
|
||||
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
|
||||
private SelectionConstraints constraints;
|
||||
private SelectionConstraints constraints = new SelectionConstraints();
|
||||
|
||||
public String toJson() {
|
||||
final Gson g = new Gson();
|
||||
|
@ -27,7 +25,7 @@ public class Community implements Serializable {
|
|||
return !getSubjects().isEmpty()
|
||||
|| !getProviders().isEmpty()
|
||||
|| !getZenodoCommunities().isEmpty()
|
||||
|| !getConstraints().getCriteria().isEmpty();
|
||||
|| getConstraints().getCriteria() != null;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
|
|
@ -26,6 +26,16 @@ public class CommunityConfiguration implements Serializable {
|
|||
private Map<String, List<Pair<String, SelectionConstraints>>> zenodocommunityMap = new HashMap<>();
|
||||
// map communityid -> selectionconstraints
|
||||
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
|
||||
// map eosc datasource -> communityid
|
||||
private Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap = new HashMap<>();
|
||||
|
||||
public Map<String, List<Pair<String, SelectionConstraints>>> getEoscDatasourceMap() {
|
||||
return eoscDatasourceMap;
|
||||
}
|
||||
|
||||
public void setEoscDatasourceMap(Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap) {
|
||||
this.eoscDatasourceMap = eoscDatasourceMap;
|
||||
}
|
||||
|
||||
public Map<String, List<Pair<String, SelectionConstraints>>> getSubjectMap() {
|
||||
return subjectMap;
|
||||
|
@ -146,6 +156,10 @@ public class CommunityConfiguration implements Serializable {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public boolean isEoscDatasource(final String dts) {
|
||||
return eoscDatasourceMap.containsKey(dts);
|
||||
}
|
||||
|
||||
public List<Pair<String, SelectionConstraints>> getCommunityForZenodoCommunity(String zc) {
|
||||
return zenodocommunityMap.get(zc);
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
|||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -17,9 +16,7 @@ import com.google.gson.Gson;
|
|||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
|
@ -67,15 +64,18 @@ public class ResultTagger implements Serializable {
|
|||
return result;
|
||||
}
|
||||
|
||||
//Execute the EOSCTag for the services
|
||||
switch (result.getResulttype().getClassid()){
|
||||
// Execute the EOSCTag for the services
|
||||
switch (result.getResulttype().getClassid()) {
|
||||
case PUBLICATION_RESULTTYPE_CLASSID:
|
||||
break;
|
||||
case SOFTWARE_RESULTTYPE_CLASSID:
|
||||
EoscIFTag.tagForSoftware(result);
|
||||
break;
|
||||
case DATASET_RESULTTYPE_CLASSID:
|
||||
EoscIFTag.tagForDataset(result);
|
||||
break;
|
||||
case ORP_RESULTTYPE_CLASSID:
|
||||
EoscIFTag.tagForOther(result);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -101,24 +101,34 @@ public class ResultTagger implements Serializable {
|
|||
|
||||
// Tagging for datasource
|
||||
final Set<String> datasources = new HashSet<>();
|
||||
final Set<String> tmp = new HashSet<>();
|
||||
final Set<String> collfrom = new HashSet<>();
|
||||
final Set<String> hostdby = new HashSet<>();
|
||||
|
||||
if (Objects.nonNull(result.getInstance())) {
|
||||
for (Instance i : result.getInstance()) {
|
||||
if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) {
|
||||
tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
|
||||
collfrom.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
|
||||
}
|
||||
if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) {
|
||||
tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
|
||||
hostdby.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
tmp
|
||||
collfrom
|
||||
.forEach(
|
||||
dsId -> datasources
|
||||
.addAll(
|
||||
conf.getCommunityForDatasource(dsId, param)));
|
||||
hostdby.forEach(dsId -> {
|
||||
datasources
|
||||
.addAll(
|
||||
conf.getCommunityForDatasource(dsId, param));
|
||||
if (conf.isEoscDatasource(dsId)) {
|
||||
datasources.add("eosc");
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
communities.addAll(datasources);
|
||||
|
@ -154,7 +164,7 @@ public class ResultTagger implements Serializable {
|
|||
.getSelectionConstraintsMap()
|
||||
.keySet()
|
||||
.forEach(communityId -> {
|
||||
if (conf.getSelectionConstraintsMap().get(communityId) != null &&
|
||||
if (conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
|
||||
conf
|
||||
.getSelectionConstraintsMap()
|
||||
.get(communityId)
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
package eu.dnetlib.dhp.bulktag.criteria;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Locale;
|
||||
|
||||
@VerbClass("starts_with_caseinsensitive")
|
||||
public class StartsWithVerbIgnoreCase implements Selection, Serializable {
|
||||
|
|
|
@ -1,29 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 21/07/22
|
||||
*/
|
||||
public class DatasourceMaster implements Serializable {
|
||||
private String datasource;
|
||||
private String master;
|
||||
|
||||
public String getDatasource() {
|
||||
return datasource;
|
||||
}
|
||||
|
||||
public void setDatasource(String datasource) {
|
||||
this.datasource = datasource;
|
||||
}
|
||||
|
||||
public String getMaster() {
|
||||
return master;
|
||||
}
|
||||
|
||||
public void setMaster(String master) {
|
||||
this.master = master;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class EoscIFTag {
|
||||
|
||||
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
|
||||
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
|
||||
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
|
||||
public static final String COMPLIES_WITH = "compliesWith";
|
||||
|
||||
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
|
||||
EoscIfGuidelines eig = new EoscIfGuidelines();
|
||||
eig.setCode(code);
|
||||
eig.setLabel(label);
|
||||
eig.setUrl(url);
|
||||
eig.setSemanticRelation(semantics);
|
||||
return eig;
|
||||
|
||||
}
|
||||
|
||||
public static <R extends Result> void tagForSoftware(R s) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static <R extends Result> void tagForOther(R orp) {
|
||||
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
|
||||
orp.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
if (containsCriteriaGalaxy(orp)) {
|
||||
addEIG(
|
||||
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containscriteriaTwitter(orp)) {
|
||||
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static <R extends Result> void tagForDataset(R d) {
|
||||
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
|
||||
d.setEoscifguidelines(new ArrayList<>());
|
||||
if (containscriteriaTwitter(d)) {
|
||||
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
|
||||
String sem) {
|
||||
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
|
||||
eoscifguidelines.add(newInstance(code, label, url, sem));
|
||||
}
|
||||
|
||||
private static boolean containscriteriaTwitter(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
|
||||
if (words.contains("twitter") &&
|
||||
(words.contains("data") || words.contains("dataset")))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaGalaxy(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
if (words.contains("galaxy") &&
|
||||
words.contains("workflow"))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static <R extends Result> boolean containsCriteriaNotebook(R s) {
|
||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||
return false;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||
return true;
|
||||
if (s
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||
sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
return words;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
|
||||
return words;
|
||||
}
|
||||
}
|
|
@ -1,136 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 21/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.common.RelationInverse;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
public class ReadMasterDatasourceFromDB implements Closeable {
|
||||
|
||||
private final DbClient dbClient;
|
||||
private static final Log log = LogFactory.getLog(ReadMasterDatasourceFromDB.class);
|
||||
|
||||
private final BufferedWriter writer;
|
||||
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String QUERY = "SELECT dso.id datasource, d.id master FROM " +
|
||||
"(SELECT id FROM dsm_services WHERE id like 'eosc%') dso " +
|
||||
"FULL JOIN " +
|
||||
"(SELECT id, duplicate FROM dsm_dedup_services WHERE duplicate like 'eosc%')d " +
|
||||
"ON dso.id = d.duplicate";
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
ReadMasterDatasourceFromDB.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/datasourcemaster_parameters.json")));
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
final String dbPassword = parser.get("postgresPassword");
|
||||
final String hdfsPath = parser.get("hdfsPath");
|
||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||
|
||||
try (
|
||||
final ReadMasterDatasourceFromDB rmd = new ReadMasterDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
|
||||
dbPassword)) {
|
||||
|
||||
log.info("Processing datasources...");
|
||||
rmd.execute(QUERY, rmd::datasourceMasterMap);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public void execute(final String sql, final Function<ResultSet, DatasourceMaster> producer) {
|
||||
|
||||
dbClient.processResults(sql, rs -> writeMap(producer.apply(rs)));
|
||||
}
|
||||
|
||||
public DatasourceMaster datasourceMasterMap(ResultSet rs) {
|
||||
try {
|
||||
DatasourceMaster dm = new DatasourceMaster();
|
||||
String datasource = rs.getString("datasource");
|
||||
dm.setDatasource(datasource);
|
||||
String master = rs.getString("master");
|
||||
if (StringUtils.isNotBlank(master))
|
||||
dm.setMaster(OafMapperUtils.createOpenaireId(10, master, true));
|
||||
else
|
||||
dm.setMaster(OafMapperUtils.createOpenaireId(10, datasource, true));
|
||||
return dm;
|
||||
|
||||
} catch (final SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
dbClient.close();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
public ReadMasterDatasourceFromDB(
|
||||
final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
|
||||
throws IOException {
|
||||
|
||||
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.defaultFS", hdfsNameNode);
|
||||
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
Path hdfsWritePath = new Path(hdfsPath);
|
||||
FSDataOutputStream fsDataOutputStream = null;
|
||||
if (fileSystem.exists(hdfsWritePath)) {
|
||||
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
||||
} else {
|
||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||
}
|
||||
|
||||
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
protected void writeMap(final DatasourceMaster dm) {
|
||||
try {
|
||||
writer.write(OBJECT_MAPPER.writeValueAsString(dm));
|
||||
writer.newLine();
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,209 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.TAGGING_TRUST;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.print.attribute.DocAttributeSet;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
|
||||
import eu.dnetlib.dhp.bulktag.community.*;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 21/07/22
|
||||
*/
|
||||
public class SparkEoscBulkTag implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static String OPENAIRE_3 = "openaire3.0";
|
||||
private static String OPENAIRE_4 = "openaire-pub_4.0";
|
||||
private static String OPENAIRE_CRIS = "openaire-cris_1.1";
|
||||
private static String OPENAIRE_DATA = "openaire2.0_data";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkEoscBulkTag.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
String datasourceMapPath = parser.get("datasourceMapPath");
|
||||
log.info("datasourceMapPath: {}", datasourceMapPath);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final String resultType = parser.get("resultType");
|
||||
log.info("resultType: {}", resultType);
|
||||
|
||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
CommunityConfiguration cc;
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, workingPath);
|
||||
selectCompliantDatasources(spark, inputPath, workingPath, datasourceMapPath);
|
||||
execBulkTag(spark, inputPath, workingPath, resultType, resultClazz);
|
||||
});
|
||||
}
|
||||
|
||||
private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath,
|
||||
String datasourceMapPath) {
|
||||
Dataset<Datasource> datasources = readPath(spark, inputPath + "datasource", Datasource.class)
|
||||
.filter((FilterFunction<Datasource>) ds -> {
|
||||
final String compatibility = ds.getOpenairecompatibility().getClassid();
|
||||
return compatibility.equalsIgnoreCase(OPENAIRE_3) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_4) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_CRIS) ||
|
||||
compatibility.equalsIgnoreCase(OPENAIRE_DATA);
|
||||
});
|
||||
|
||||
Dataset<DatasourceMaster> datasourceMaster = readPath(spark, datasourceMapPath, DatasourceMaster.class);
|
||||
|
||||
datasources
|
||||
.joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Datasource, DatasourceMaster>, DatasourceMaster>) t2 -> t2._2(),
|
||||
Encoders.bean(DatasourceMaster.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "datasource");
|
||||
}
|
||||
|
||||
private static <R extends Result> void execBulkTag(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
String workingPath,
|
||||
String resultType,
|
||||
Class<R> resultClazz) {
|
||||
|
||||
List<String> hostedByList = readPath(spark, workingPath + "datasource", DatasourceMaster.class)
|
||||
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
|
||||
.collectAsList();
|
||||
|
||||
readPath(spark, inputPath + resultType, resultClazz)
|
||||
.map(
|
||||
(MapFunction<R, R>) value -> enrich(value, hostedByList),
|
||||
Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + resultType);
|
||||
|
||||
readPath(spark, workingPath + resultType, resultClazz)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + resultType);
|
||||
|
||||
}
|
||||
|
||||
private static <R extends Result> R enrich(R value, List<String> hostedByList) {
|
||||
if (value.getDataInfo().getDeletedbyinference() == null) {
|
||||
value.getDataInfo().setDeletedbyinference(false);
|
||||
}
|
||||
if (value.getContext() == null) {
|
||||
value.setContext(new ArrayList<>());
|
||||
}
|
||||
if (value
|
||||
.getInstance()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
i -> (hostedByList.contains(i.getHostedby().getKey())))
|
||||
&&
|
||||
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
|
||||
Context context = new Context();
|
||||
context.setId("eosc");
|
||||
context
|
||||
.setDataInfo(
|
||||
Arrays
|
||||
.asList(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, BULKTAG_DATA_INFO_TYPE, true, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
|
||||
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
|
||||
TAGGING_TRUST)));
|
||||
value.getContext().add(context);
|
||||
|
||||
}
|
||||
return value;
|
||||
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
|
||||
private static <R extends Result> MapFunction<R, R> patchResult() {
|
||||
return r -> {
|
||||
if (r.getDataInfo().getDeletedbyinference() == null) {
|
||||
r.getDataInfo().setDeletedbyinference(false);
|
||||
}
|
||||
if (r.getContext() == null) {
|
||||
r.setContext(new ArrayList<>());
|
||||
}
|
||||
return r;
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -1,252 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class SparkEoscTag {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
|
||||
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
|
||||
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
|
||||
public static final String COMPLIES_WITH = "compliesWith";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkEoscTag.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
execEoscTag(spark, inputPath, workingPath);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
|
||||
EoscIfGuidelines eig = new EoscIfGuidelines();
|
||||
eig.setCode(code);
|
||||
eig.setLabel(label);
|
||||
eig.setUrl(url);
|
||||
eig.setSemanticRelation(semantics);
|
||||
return eig;
|
||||
|
||||
}
|
||||
|
||||
public static <R extends Result> R tagForSoftware(R s){
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
|
||||
|
||||
readPath(spark, inputPath + "/software", Software.class)
|
||||
.map((MapFunction<Software, Software>) s -> {
|
||||
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
|
||||
COMPLIES_WITH);
|
||||
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
|
||||
}
|
||||
return s;
|
||||
}, Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/software");
|
||||
|
||||
readPath(spark, workingPath + "/software", Software.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/software");
|
||||
|
||||
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
|
||||
|
||||
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
|
||||
orp.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
if (containsCriteriaGalaxy(orp)) {
|
||||
addEIG(
|
||||
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containscriteriaTwitter(orp)) {
|
||||
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return orp;
|
||||
}, Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, inputPath + "/dataset", Dataset.class)
|
||||
.map((MapFunction<Dataset, Dataset>) d -> {
|
||||
|
||||
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
|
||||
d.setEoscifguidelines(new ArrayList<>());
|
||||
if (containscriteriaTwitter(d)) {
|
||||
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return d;
|
||||
}, Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/dataset");
|
||||
|
||||
readPath(spark, workingPath + "/dataset", Dataset.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/dataset");
|
||||
}
|
||||
|
||||
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
|
||||
String sem) {
|
||||
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
|
||||
eoscifguidelines.add(newInstance(code, label, url, sem));
|
||||
}
|
||||
|
||||
private static boolean containscriteriaTwitter(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
|
||||
if (words.contains("twitter") &&
|
||||
(words.contains("data") || words.contains("dataset")))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaGalaxy(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
if (words.contains("galaxy") &&
|
||||
words.contains("workflow"))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static <R extends Result> boolean containsCriteriaNotebook(R s) {
|
||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||
return false;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||
return true;
|
||||
if (s
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||
sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
return words;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
|
||||
return words;
|
||||
}
|
||||
}
|
|
@ -219,158 +219,9 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait" to="eosc_tag"/>
|
||||
<join name="wait" to="End"/>
|
||||
|
||||
<action name="eosc_tag">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscTag</arg>
|
||||
</spark>
|
||||
<ok to="eosc_get_datasource_master"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="eosc_get_datasource_master">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.bulktag.eosc.ReadMasterDatasourceFromDB</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="fork_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_eosc_context_tag">
|
||||
<path start="eosc_context_tag_publication"/>
|
||||
<path start="eosc_context_tag_dataset"/>
|
||||
<path start="eosc_context_tag_otherresearchproduct"/>
|
||||
<path start="eosc_context_tag_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="eosc_context_tag_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging publication</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/</arg>
|
||||
<arg>--resultType</arg><arg>publication</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="eosc_context_tag_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging dataset</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/</arg>
|
||||
<arg>--resultType</arg><arg>dataset</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="eosc_context_tag_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging software</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/</arg>
|
||||
<arg>--resultType</arg><arg>software</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="eosc_context_tag_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging ORP</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/</arg>
|
||||
<arg>--resultType</arg><arg>otherresearchproduct</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<join name="wait_eosc_context_tag" to="End"/>
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -6,17 +6,13 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
@ -28,11 +24,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class BulkTagJobTest {
|
||||
|
@ -644,7 +636,8 @@ public class BulkTagJobTest {
|
|||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(),
|
||||
"-sourcePath",
|
||||
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/software_10.json.gz").getPath(),
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
|
@ -772,14 +765,764 @@ public class BulkTagJobTest {
|
|||
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||
|
||||
idExplodeCommunity.show(false);
|
||||
// Assertions.assertEquals(5, idExplodeCommunity.count());
|
||||
//
|
||||
// Assertions
|
||||
// .assertEquals(
|
||||
// 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||
// Assertions
|
||||
// .assertEquals(
|
||||
// 2, idExplodeCommunity.filter("provenance = 'community:advconstraint'").count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void bulkTagOtherJupyter() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct")
|
||||
.getPath();
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"-outputPath", workingDir.toString() + "/otherresearchproduct",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.filter(
|
||||
orp -> orp
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.filter(
|
||||
orp -> orp
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bulkTagDatasetJupyter() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/jupyter/dataset")
|
||||
.getPath();
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.filter(
|
||||
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.filter(
|
||||
ds -> ds
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bulkTagSoftwareJupyter() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/jupyter/software")
|
||||
.getPath();
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
tmp
|
||||
.filter(s -> s.getEoscifguidelines() != null)
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
List<Subject> subjects = tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject();
|
||||
Assertions.assertEquals(7, subjects.size());
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("simulation numérique")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void galaxyOtherTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct")
|
||||
.getPath();
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"-outputPath", workingDir.toString() + "/otherresearchproduct",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
orp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
orp.foreach(o -> System.out.println(OBJECT_MAPPER.writeValueAsString(o)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, orp
|
||||
.filter(o -> o.getEoscifguidelines() != null)
|
||||
.filter(
|
||||
o -> o
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void galaxySoftwareTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/galaxy/software")
|
||||
.getPath();
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(11, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getEoscifguidelines().size() > 0)
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getEoscifguidelines().size() > 0)
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void twitterDatasetTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/twitter/dataset")
|
||||
.getPath();
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Dataset> dats = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(11, dats.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
dats
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void twitterOtherTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct")
|
||||
.getPath();
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"-outputPath", workingDir.toString() + "/otherresearchproduct",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
orp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
orp
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
}
|
||||
|
||||
@Test
|
||||
void twitterSoftwareTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/eosctag/twitter/software")
|
||||
.getPath();
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void EoscContextTagTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json")
|
||||
.getPath();
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(8, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,162 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 22/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
//"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc
|
||||
//"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc
|
||||
//"50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7" has two instance one hostedby eosc
|
||||
//"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag
|
||||
|
||||
public class EOSCContextTaggingTest {
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(EOSCContextTaggingTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(EOSCContextTaggingTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(EOSCTagJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void EoscContextTagTest() throws Exception {
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
SparkEoscBulkTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input/dataset",
|
||||
"-workingPath", workingDir.toString() + "/working/dataset",
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb")
|
||||
&&
|
||||
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
|
||||
.count());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,713 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class EOSCTagJobTest {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(EOSCTagJobTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(EOSCTagJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void jupyterUpdatesTest() throws Exception {
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
tmp
|
||||
.filter(s -> s.getEoscifguidelines() != null)
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines() == null);
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines() == null);
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
|
||||
|
||||
List<Subject> subjects = tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject();
|
||||
Assertions.assertEquals(7, subjects.size());
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("simulation numérique")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
|
||||
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.filter(
|
||||
ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
|
||||
.filter(
|
||||
ds -> ds
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.filter(
|
||||
orp -> orp
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0, sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class))
|
||||
.filter(
|
||||
orp -> orp
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getValue().equals("EOSC::Jupyter Notebook")))
|
||||
.count());
|
||||
|
||||
// spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void galaxyUpdatesTest() throws Exception {
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(11, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getEoscifguidelines() != null)
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(
|
||||
s -> s.getEoscifguidelines() != null)
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
5, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
tmp
|
||||
.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
orp
|
||||
.filter(
|
||||
s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
orp.foreach(o -> System.out.println(OBJECT_MAPPER.writeValueAsString(o)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, orp
|
||||
.filter(o -> o.getEoscifguidelines() != null)
|
||||
.filter(
|
||||
o -> o
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getCode().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
Assertions
|
||||
.assertFalse(
|
||||
orp
|
||||
.filter(sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void twitterUpdatesTest() throws Exception {
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/software").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class),
|
||||
Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/software");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/dataset").getPath())
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct").getPath())
|
||||
.map(
|
||||
(MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER
|
||||
.readValue(value, OtherResearchProduct.class),
|
||||
Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Software> tmp = sc
|
||||
.textFile(workingDir.toString() + "/input/software")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
JavaRDD<OtherResearchProduct> orp = sc
|
||||
.textFile(workingDir.toString() + "/input/otherresearchproduct")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||
|
||||
Assertions.assertEquals(10, orp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
orp
|
||||
.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
orp
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
JavaRDD<Dataset> dats = sc
|
||||
.textFile(workingDir.toString() + "/input/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(11, dats.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
dats
|
||||
.filter(
|
||||
s -> s
|
||||
.getEoscifguidelines()
|
||||
.stream()
|
||||
.anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data")))
|
||||
.count());
|
||||
|
||||
}
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue