WIP: bulktag_refactor #268
|
@ -5,9 +5,12 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.eosc.DatasourceMaster;
|
||||
import eu.dnetlib.dhp.bulktag.eosc.EoscTagFunctions;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
@ -64,6 +67,9 @@ public class SparkBulkTagJob {
|
|||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final String datasourceMapPath = parser.get("datasourceMapPath");
|
||||
log.info("datasourceMapPath: {}", datasourceMapPath);
|
||||
|
||||
final Boolean saveGraph = Optional
|
||||
.ofNullable(parser.get("saveGraph"))
|
||||
.map(Boolean::valueOf)
|
||||
|
@ -88,7 +94,7 @@ public class SparkBulkTagJob {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
|
||||
execBulkTag(spark, inputPath, outputPath, protoMappingParams, datasourceMapPath, resultClazz, cc);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -97,9 +103,14 @@ public class SparkBulkTagJob {
|
|||
String inputPath,
|
||||
String outputPath,
|
||||
ProtoMap protoMappingParams,
|
||||
String datasourceMapPath,
|
||||
Class<R> resultClazz,
|
||||
CommunityConfiguration communityConfiguration) {
|
||||
|
||||
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class)
|
||||
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
|
||||
.collectAsList();
|
||||
|
||||
ResultTagger resultTagger = new ResultTagger();
|
||||
readPath(spark, inputPath, resultClazz)
|
||||
.map(patchResult(), Encoders.bean(resultClazz))
|
||||
|
@ -109,6 +120,10 @@ public class SparkBulkTagJob {
|
|||
.enrichContextCriteria(
|
||||
value, communityConfiguration, protoMappingParams),
|
||||
Encoders.bean(resultClazz))
|
||||
.map((MapFunction<R, R>) EoscTagFunctions::execEoscTagResult, Encoders.bean(resultClazz))
|
||||
.map(
|
||||
(MapFunction<R, R>) value -> EoscTagFunctions.enrich(value, hostedByList),
|
||||
Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
|
||||
public class EoscTagFunctions {
|
||||
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
|
||||
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
|
||||
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
|
||||
public static final String COMPLIES_WITH = "compliesWith";
|
||||
|
||||
public static <R extends Result> R enrich(R value, List<String> hostedByList) {
|
||||
if (value
|
||||
.getInstance()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
i -> (hostedByList.contains(i.getHostedby().getKey())))
|
||||
&&
|
||||
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
|
||||
Context context = new Context();
|
||||
context.setId("eosc");
|
||||
context
|
||||
.setDataInfo(
|
||||
Arrays
|
||||
.asList(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, BULKTAG_DATA_INFO_TYPE, true, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
|
||||
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
|
||||
TAGGING_TRUST)));
|
||||
value.getContext().add(context);
|
||||
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static <R extends Result> R execEoscTagResult(R result) {
|
||||
if (result instanceof Software) {
|
||||
Software s = (Software) result;
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
|
||||
}
|
||||
return (R) s;
|
||||
|
||||
} else if (result instanceof OtherResearchProduct) {
|
||||
OtherResearchProduct orp = (OtherResearchProduct) result;
|
||||
|
||||
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
|
||||
orp.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
if (containsCriteriaGalaxy(orp)) {
|
||||
addEIG(
|
||||
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containscriteriaTwitter(orp)) {
|
||||
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return (R) orp;
|
||||
} else if (result instanceof Dataset) {
|
||||
Dataset d = (Dataset) result;
|
||||
|
||||
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
|
||||
d.setEoscifguidelines(new ArrayList<>());
|
||||
if (containscriteriaTwitter(d)) {
|
||||
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return (R) d;
|
||||
}
|
||||
|
||||
// this is a Publication, return it as it is.
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
|
||||
String sem) {
|
||||
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
|
||||
eoscifguidelines.add(newInstance(code, label, url, sem));
|
||||
}
|
||||
|
||||
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
|
||||
EoscIfGuidelines eig = new EoscIfGuidelines();
|
||||
eig.setCode(code);
|
||||
eig.setLabel(label);
|
||||
eig.setUrl(url);
|
||||
eig.setSemanticRelation(semantics);
|
||||
return eig;
|
||||
}
|
||||
|
||||
private static boolean containscriteriaTwitter(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
|
||||
if (words.contains("twitter") &&
|
||||
(words.contains("data") || words.contains("dataset")))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaGalaxy(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
if (words.contains("galaxy") &&
|
||||
words.contains("workflow"))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaNotebook(Software s) {
|
||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||
return false;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||
return true;
|
||||
if (s
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||
sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
return words;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
|
||||
return words;
|
||||
}
|
||||
}
|
|
@ -1,18 +1,10 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -22,23 +14,19 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 21/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.function.Function;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.common.RelationInverse;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
public class ReadMasterDatasourceFromDB implements Closeable {
|
||||
public class ReadMasterEoscDatasourceFromDB implements Closeable {
|
||||
|
||||
private final DbClient dbClient;
|
||||
private static final Log log = LogFactory.getLog(ReadMasterDatasourceFromDB.class);
|
||||
private static final Log log = LogFactory.getLog(ReadMasterEoscDatasourceFromDB.class);
|
||||
|
||||
private final BufferedWriter writer;
|
||||
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
@ -53,7 +41,7 @@ public class ReadMasterDatasourceFromDB implements Closeable {
|
|||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
ReadMasterDatasourceFromDB.class
|
||||
ReadMasterEoscDatasourceFromDB.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/datasourcemaster_parameters.json")));
|
||||
|
||||
|
@ -66,12 +54,11 @@ public class ReadMasterDatasourceFromDB implements Closeable {
|
|||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||
|
||||
try (
|
||||
final ReadMasterDatasourceFromDB rmd = new ReadMasterDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
|
||||
final ReadMasterEoscDatasourceFromDB rmd = new ReadMasterEoscDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
|
||||
dbPassword)) {
|
||||
|
||||
log.info("Processing datasources...");
|
||||
rmd.execute(QUERY, rmd::datasourceMasterMap);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,7 +90,7 @@ public class ReadMasterDatasourceFromDB implements Closeable {
|
|||
writer.close();
|
||||
}
|
||||
|
||||
public ReadMasterDatasourceFromDB(
|
||||
public ReadMasterEoscDatasourceFromDB(
|
||||
final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
|
||||
throws IOException {
|
||||
|
|
@ -1,169 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
|
||||
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.TAGGING_TRUST;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.print.attribute.DocAttributeSet;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
|
||||
import eu.dnetlib.dhp.bulktag.community.*;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 21/07/22
|
||||
*/
|
||||
public class SparkEoscBulkTag implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkEoscBulkTag.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
String datasourceMapPath = parser.get("datasourceMapPath");
|
||||
log.info("datasourceMapPath: {}", datasourceMapPath);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
CommunityConfiguration cc;
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, workingPath);
|
||||
execBulkTag(spark, inputPath, workingPath, datasourceMapPath, resultClazz);
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends Result> void execBulkTag(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
String workingPath,
|
||||
String datasourceMapPath,
|
||||
Class<R> resultClazz) {
|
||||
|
||||
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class)
|
||||
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
|
||||
.collectAsList();
|
||||
|
||||
readPath(spark, inputPath, resultClazz)
|
||||
.map(patchResult(), Encoders.bean(resultClazz))
|
||||
.filter(Objects::nonNull)
|
||||
.map(
|
||||
(MapFunction<R, R>) value -> enrich(value, hostedByList),
|
||||
Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
|
||||
readPath(spark, workingPath, resultClazz)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath);
|
||||
|
||||
}
|
||||
|
||||
private static <R extends Result> R enrich(R value, List<String> hostedByList) {
|
||||
if (value
|
||||
.getInstance()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
i -> (hostedByList.contains(i.getHostedby().getKey())))
|
||||
&&
|
||||
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
|
||||
Context context = new Context();
|
||||
context.setId("eosc");
|
||||
context
|
||||
.setDataInfo(
|
||||
Arrays
|
||||
.asList(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, BULKTAG_DATA_INFO_TYPE, true, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
|
||||
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
|
||||
TAGGING_TRUST)));
|
||||
value.getContext().add(context);
|
||||
|
||||
}
|
||||
return value;
|
||||
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
|
||||
private static <R extends Result> MapFunction<R, R> patchResult() {
|
||||
return r -> {
|
||||
if (r.getDataInfo().getDeletedbyinference() == null) {
|
||||
r.getDataInfo().setDeletedbyinference(false);
|
||||
}
|
||||
if (r.getContext() == null) {
|
||||
r.setContext(new ArrayList<>());
|
||||
}
|
||||
return r;
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -1,237 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag.eosc;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.readPath;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class SparkEoscTag {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
|
||||
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
|
||||
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
|
||||
public static final String COMPLIES_WITH = "compliesWith";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkEoscTag.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
execEoscTag(spark, inputPath, workingPath);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
|
||||
EoscIfGuidelines eig = new EoscIfGuidelines();
|
||||
eig.setCode(code);
|
||||
eig.setLabel(label);
|
||||
eig.setUrl(url);
|
||||
eig.setSemanticRelation(semantics);
|
||||
return eig;
|
||||
|
||||
}
|
||||
|
||||
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
|
||||
|
||||
readPath(spark, inputPath + "/software", Software.class)
|
||||
.map((MapFunction<Software, Software>) s -> {
|
||||
|
||||
if (containsCriteriaNotebook(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
|
||||
COMPLIES_WITH);
|
||||
|
||||
}
|
||||
if (containsCriteriaGalaxy(s)) {
|
||||
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
|
||||
s.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
addEIG(
|
||||
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
|
||||
}
|
||||
return s;
|
||||
}, Encoders.bean(Software.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/software");
|
||||
|
||||
readPath(spark, workingPath + "/software", Software.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/software");
|
||||
|
||||
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
|
||||
|
||||
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
|
||||
orp.setEoscifguidelines(new ArrayList<>());
|
||||
|
||||
if (containsCriteriaGalaxy(orp)) {
|
||||
addEIG(
|
||||
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
|
||||
COMPLIES_WITH);
|
||||
}
|
||||
if (containscriteriaTwitter(orp)) {
|
||||
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return orp;
|
||||
}, Encoders.bean(OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/otherresearchproduct");
|
||||
|
||||
readPath(spark, inputPath + "/dataset", Dataset.class)
|
||||
.map((MapFunction<Dataset, Dataset>) d -> {
|
||||
|
||||
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
|
||||
d.setEoscifguidelines(new ArrayList<>());
|
||||
if (containscriteriaTwitter(d)) {
|
||||
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
|
||||
}
|
||||
return d;
|
||||
}, Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/dataset");
|
||||
|
||||
readPath(spark, workingPath + "/dataset", Dataset.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath + "/dataset");
|
||||
}
|
||||
|
||||
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
|
||||
String sem) {
|
||||
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
|
||||
eoscifguidelines.add(newInstance(code, label, url, sem));
|
||||
}
|
||||
|
||||
private static boolean containscriteriaTwitter(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
|
||||
if (words.contains("twitter") &&
|
||||
(words.contains("data") || words.contains("dataset")))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaGalaxy(Result r) {
|
||||
Set<String> words = getWordsSP(r.getTitle());
|
||||
words.addAll(getWordsF(r.getDescription()));
|
||||
if (words.contains("galaxy") &&
|
||||
words.contains("workflow"))
|
||||
return true;
|
||||
|
||||
return Optional
|
||||
.ofNullable(r.getSubject())
|
||||
.map(
|
||||
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
|
||||
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private static boolean containsCriteriaNotebook(Software s) {
|
||||
if (!Optional.ofNullable(s.getSubject()).isPresent())
|
||||
return false;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
|
||||
return true;
|
||||
if (s
|
||||
.getSubject()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
sbj -> sbj.getValue().toLowerCase().contains("python") &&
|
||||
sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
|
||||
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
return words;
|
||||
}
|
||||
|
||||
private static Set<String> getWordsF(List<Field<String>> elem) {
|
||||
Set<String> words = new HashSet<>();
|
||||
Optional
|
||||
.ofNullable(elem)
|
||||
.ifPresent(
|
||||
e -> e
|
||||
.forEach(
|
||||
t -> words
|
||||
.addAll(
|
||||
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
|
||||
|
||||
return words;
|
||||
}
|
||||
}
|
|
@ -46,6 +46,11 @@
|
|||
"paramLongName": "taggingConf",
|
||||
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "dmp",
|
||||
"paramLongName": "datasourceMapPath",
|
||||
"paramDescription": "mapping between the datasource duplicates and the corresponding master",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -102,7 +102,20 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="copy_wait" to="fork_exec_bulktag"/>
|
||||
<join name="copy_wait" to="eosc_get_datasource_master"/>
|
||||
|
||||
<action name="eosc_get_datasource_master">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.bulktag.eosc.ReadMasterEoscDatasourceFromDB</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="fork_exec_bulktag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_exec_bulktag">
|
||||
<path start="bulktag_publication"/>
|
||||
|
@ -133,6 +146,7 @@
|
|||
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
|
||||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -160,6 +174,7 @@
|
|||
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
|
||||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -187,6 +202,7 @@
|
|||
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
||||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -214,159 +230,12 @@
|
|||
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
|
||||
<arg>--pathMap</arg><arg>${pathMap}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait" to="eosc_tag"/>
|
||||
|
||||
<action name="eosc_tag">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscTag</arg>
|
||||
</spark>
|
||||
<ok to="eosc_get_datasource_master"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="eosc_get_datasource_master">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.bulktag.eosc.ReadMasterDatasourceFromDB</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="fork_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_eosc_context_tag">
|
||||
<path start="eosc_context_tag_publication"/>
|
||||
<path start="eosc_context_tag_dataset"/>
|
||||
<path start="eosc_context_tag_otherresearchproduct"/>
|
||||
<path start="eosc_context_tag_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="eosc_context_tag_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging publication</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/publication</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/publication</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="eosc_context_tag_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging dataset</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/dataset</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/dataset</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="eosc_context_tag_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging software</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/software</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/software</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="eosc_context_tag_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC tagging ORP</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/otherresearchproduct</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
|
||||
</spark>
|
||||
<ok to="wait_eosc_context_tag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<join name="wait_eosc_context_tag" to="End"/>
|
||||
<end name="End"/>
|
||||
<join name="wait" to="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -112,7 +112,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -152,7 +156,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -249,7 +257,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
@ -329,7 +341,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-outputPath", workingDir.toString() + "/publication",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -402,7 +418,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"-outputPath", workingDir.toString() + "/orp",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -523,7 +543,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -649,7 +673,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -749,7 +777,11 @@ public class BulkTagJobTest {
|
|||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
@ -21,17 +19,9 @@ import org.junit.jupiter.api.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 22/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
//"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc
|
||||
//"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc
|
||||
|
@ -39,13 +29,39 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|||
//"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag
|
||||
|
||||
public class EOSCContextTaggingTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class);
|
||||
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
|
||||
|
||||
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
|
||||
+ " \"title\" : \"$['title'][*]['value']\","
|
||||
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||
+ " \"description\" : \"$['description'][*]['value']\", "
|
||||
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
||||
|
||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='subject:fos')].value\"} ";
|
||||
|
||||
private static String taggingConf = "";
|
||||
|
||||
static {
|
||||
try {
|
||||
taggingConf = IOUtils
|
||||
.toString(
|
||||
BulkTagJobTest.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
|
@ -89,18 +105,22 @@ public class EOSCContextTaggingTest {
|
|||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/dataset");
|
||||
|
||||
SparkEoscBulkTag
|
||||
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input/dataset",
|
||||
"-workingPath", workingDir.toString() + "/working/dataset",
|
||||
"-sourcePath", workingDir.toString() + "/input/dataset",
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath(),
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.nio.file.Path;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
@ -23,18 +24,43 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag;
|
||||
import eu.dnetlib.dhp.bulktag.eosc.EoscTagFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class EOSCTagJobTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
|
||||
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
|
||||
+ " \"title\" : \"$['title'][*]['value']\","
|
||||
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||
+ " \"description\" : \"$['description'][*]['value']\", "
|
||||
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
||||
|
||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='subject:fos')].value\"} ";
|
||||
|
||||
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
|
||||
|
||||
private static String taggingConf = "";
|
||||
|
||||
static {
|
||||
try {
|
||||
taggingConf = IOUtils
|
||||
.toString(
|
||||
BulkTagJobTest.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
|
@ -101,14 +127,21 @@ public class EOSCTagJobTest {
|
|||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
"-sourcePath", workingDir.toString() + "/input",
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
@ -401,13 +434,14 @@ public class EOSCTagJobTest {
|
|||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
|
||||
|
||||
});
|
||||
|
||||
|
@ -644,14 +678,21 @@ public class EOSCTagJobTest {
|
|||
.option("compression", "gzip")
|
||||
.json(workingDir.toString() + "/input/otherresearchproduct");
|
||||
|
||||
SparkEoscTag
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
workingDir.toString() + "/input",
|
||||
"-workingPath", workingDir.toString() + "/working"
|
||||
|
||||
"-sourcePath", workingDir.toString() + "/input",
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"-outputPath", workingDir.toString() + "/software",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap,
|
||||
"-datasourceMapPath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
|
Loading…
Reference in New Issue