WIP: bulktag_refactor #268

Closed
miriam.baglioni wants to merge 2 commits from bulktag_refactor into beta
10 changed files with 383 additions and 632 deletions

View File

@ -5,9 +5,12 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.bulktag.eosc.DatasourceMaster;
import eu.dnetlib.dhp.bulktag.eosc.EoscTagFunctions;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -64,6 +67,9 @@ public class SparkBulkTagJob {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String datasourceMapPath = parser.get("datasourceMapPath");
log.info("datasourceMapPath: {}", datasourceMapPath);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
@ -88,7 +94,7 @@ public class SparkBulkTagJob {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, datasourceMapPath, resultClazz, cc);
});
}
@ -97,9 +103,14 @@ public class SparkBulkTagJob {
String inputPath,
String outputPath,
ProtoMap protoMappingParams,
String datasourceMapPath,
Class<R> resultClazz,
CommunityConfiguration communityConfiguration) {
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class)
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
.collectAsList();
ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
@ -109,6 +120,10 @@ public class SparkBulkTagJob {
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.map((MapFunction<R, R>) EoscTagFunctions::execEoscTagResult, Encoders.bean(resultClazz))
.map(
(MapFunction<R, R>) value -> EoscTagFunctions.enrich(value, hostedByList),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -0,0 +1,188 @@
package eu.dnetlib.dhp.bulktag.eosc;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.util.*;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
public class EoscTagFunctions {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
public static final String COMPLIES_WITH = "compliesWith";
public static <R extends Result> R enrich(R value, List<String> hostedByList) {
if (value
.getInstance()
.stream()
.anyMatch(
i -> (hostedByList.contains(i.getHostedby().getKey())))
&&
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
Context context = new Context();
context.setId("eosc");
context
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST)));
value.getContext().add(context);
}
return value;
}
public static <R extends Result> R execEoscTagResult(R result) {
if (result instanceof Software) {
Software s = (Software) result;
if (containsCriteriaNotebook(s)) {
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
s.setEoscifguidelines(new ArrayList<>());
addEIG(
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
COMPLIES_WITH);
}
if (containsCriteriaGalaxy(s)) {
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
s.setEoscifguidelines(new ArrayList<>());
addEIG(
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
}
return (R) s;
} else if (result instanceof OtherResearchProduct) {
OtherResearchProduct orp = (OtherResearchProduct) result;
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
orp.setEoscifguidelines(new ArrayList<>());
if (containsCriteriaGalaxy(orp)) {
addEIG(
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
COMPLIES_WITH);
}
if (containscriteriaTwitter(orp)) {
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
}
return (R) orp;
} else if (result instanceof Dataset) {
Dataset d = (Dataset) result;
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
d.setEoscifguidelines(new ArrayList<>());
if (containscriteriaTwitter(d)) {
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
}
return (R) d;
}
// this is a Publication, return it as it is.
return result;
}
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
String sem) {
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
eoscifguidelines.add(newInstance(code, label, url, sem));
}
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
EoscIfGuidelines eig = new EoscIfGuidelines();
eig.setCode(code);
eig.setLabel(label);
eig.setUrl(url);
eig.setSemanticRelation(semantics);
return eig;
}
private static boolean containscriteriaTwitter(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("twitter") &&
(words.contains("data") || words.contains("dataset")))
return true;
return Optional
.ofNullable(r.getSubject())
.map(
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
.orElse(false);
}
private static boolean containsCriteriaGalaxy(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("galaxy") &&
words.contains("workflow"))
return true;
return Optional
.ofNullable(r.getSubject())
.map(
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
.orElse(false);
}
private static boolean containsCriteriaNotebook(Software s) {
if (!Optional.ofNullable(s.getSubject()).isPresent())
return false;
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
return true;
if (s
.getSubject()
.stream()
.anyMatch(
sbj -> sbj.getValue().toLowerCase().contains("python") &&
sbj.getValue().toLowerCase().contains("notebook")))
return true;
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
return true;
return false;
}
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
return words;
}
private static Set<String> getWordsF(List<Field<String>> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
return words;
}
}

View File

@ -1,18 +1,10 @@
package eu.dnetlib.dhp.bulktag.eosc;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -22,23 +14,19 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @author miriam.baglioni
* @Date 21/07/22
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Function;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.common.RelationInverse;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class ReadMasterDatasourceFromDB implements Closeable {
public class ReadMasterEoscDatasourceFromDB implements Closeable {
private final DbClient dbClient;
private static final Log log = LogFactory.getLog(ReadMasterDatasourceFromDB.class);
private static final Log log = LogFactory.getLog(ReadMasterEoscDatasourceFromDB.class);
private final BufferedWriter writer;
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -53,7 +41,7 @@ public class ReadMasterDatasourceFromDB implements Closeable {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ReadMasterDatasourceFromDB.class
ReadMasterEoscDatasourceFromDB.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/datasourcemaster_parameters.json")));
@ -66,12 +54,11 @@ public class ReadMasterDatasourceFromDB implements Closeable {
final String hdfsNameNode = parser.get("hdfsNameNode");
try (
final ReadMasterDatasourceFromDB rmd = new ReadMasterDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
final ReadMasterEoscDatasourceFromDB rmd = new ReadMasterEoscDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
dbPassword)) {
log.info("Processing datasources...");
rmd.execute(QUERY, rmd::datasourceMasterMap);
}
}
@ -103,7 +90,7 @@ public class ReadMasterDatasourceFromDB implements Closeable {
writer.close();
}
public ReadMasterDatasourceFromDB(
public ReadMasterEoscDatasourceFromDB(
final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
throws IOException {

View File

@ -1,169 +0,0 @@
package eu.dnetlib.dhp.bulktag.eosc;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.TAGGING_TRUST;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import javax.print.attribute.DocAttributeSet;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
/**
* @author miriam.baglioni
* @Date 21/07/22
*/
public class SparkEoscBulkTag implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkEoscBulkTag.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String datasourceMapPath = parser.get("datasourceMapPath");
log.info("datasourceMapPath: {}", datasourceMapPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
CommunityConfiguration cc;
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, workingPath);
execBulkTag(spark, inputPath, workingPath, datasourceMapPath, resultClazz);
});
}
private static <R extends Result> void execBulkTag(
SparkSession spark,
String inputPath,
String workingPath,
String datasourceMapPath,
Class<R> resultClazz) {
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class)
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
.collectAsList();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> enrich(value, hostedByList),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
readPath(spark, workingPath, resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath);
}
private static <R extends Result> R enrich(R value, List<String> hostedByList) {
if (value
.getInstance()
.stream()
.anyMatch(
i -> (hostedByList.contains(i.getHostedby().getKey())))
&&
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
Context context = new Context();
context.setId("eosc");
context
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST)));
value.getContext().add(context);
}
return value;
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
private static <R extends Result> MapFunction<R, R> patchResult() {
return r -> {
if (r.getDataInfo().getDeletedbyinference() == null) {
r.getDataInfo().setDeletedbyinference(false);
}
if (r.getContext() == null) {
r.setContext(new ArrayList<>());
}
return r;
};
}
}

View File

@ -1,237 +0,0 @@
package eu.dnetlib.dhp.bulktag.eosc;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
public class SparkEoscTag {
private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String EOSC_GALAXY_WORKFLOW = "EOSC::Galaxy Workflow";
public static final String EOSC_TWITTER_DATA = "EOSC::Twitter Data";
public static final String EOSC_JUPYTER_NOTEBOOK = "EOSC::Jupyter Notebook";
public static final String COMPLIES_WITH = "compliesWith";
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkEoscTag.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
execEoscTag(spark, inputPath, workingPath);
});
}
public static EoscIfGuidelines newInstance(String code, String label, String url, String semantics) {
EoscIfGuidelines eig = new EoscIfGuidelines();
eig.setCode(code);
eig.setLabel(label);
eig.setUrl(url);
eig.setSemanticRelation(semantics);
return eig;
}
private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) {
readPath(spark, inputPath + "/software", Software.class)
.map((MapFunction<Software, Software>) s -> {
if (containsCriteriaNotebook(s)) {
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
s.setEoscifguidelines(new ArrayList<>());
addEIG(
s.getEoscifguidelines(), EOSC_JUPYTER_NOTEBOOK, EOSC_JUPYTER_NOTEBOOK, "",
COMPLIES_WITH);
}
if (containsCriteriaGalaxy(s)) {
if (!Optional.ofNullable(s.getEoscifguidelines()).isPresent())
s.setEoscifguidelines(new ArrayList<>());
addEIG(
s.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "", COMPLIES_WITH);
}
return s;
}, Encoders.bean(Software.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/software");
readPath(spark, workingPath + "/software", Software.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/software");
readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
.map((MapFunction<OtherResearchProduct, OtherResearchProduct>) orp -> {
if (!Optional.ofNullable(orp.getEoscifguidelines()).isPresent())
orp.setEoscifguidelines(new ArrayList<>());
if (containsCriteriaGalaxy(orp)) {
addEIG(
orp.getEoscifguidelines(), EOSC_GALAXY_WORKFLOW, EOSC_GALAXY_WORKFLOW, "",
COMPLIES_WITH);
}
if (containscriteriaTwitter(orp)) {
addEIG(orp.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
}
return orp;
}, Encoders.bean(OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/otherresearchproduct");
readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/otherresearchproduct");
readPath(spark, inputPath + "/dataset", Dataset.class)
.map((MapFunction<Dataset, Dataset>) d -> {
if (!Optional.ofNullable(d.getEoscifguidelines()).isPresent())
d.setEoscifguidelines(new ArrayList<>());
if (containscriteriaTwitter(d)) {
addEIG(d.getEoscifguidelines(), EOSC_TWITTER_DATA, EOSC_TWITTER_DATA, "", COMPLIES_WITH);
}
return d;
}, Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/dataset");
readPath(spark, workingPath + "/dataset", Dataset.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/dataset");
}
private static void addEIG(List<EoscIfGuidelines> eoscifguidelines, String code, String label, String url,
String sem) {
if (!eoscifguidelines.stream().anyMatch(eig -> eig.getCode().equals(code)))
eoscifguidelines.add(newInstance(code, label, url, sem));
}
private static boolean containscriteriaTwitter(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("twitter") &&
(words.contains("data") || words.contains("dataset")))
return true;
return Optional
.ofNullable(r.getSubject())
.map(
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) &&
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data")))
.orElse(false);
}
private static boolean containsCriteriaGalaxy(Result r) {
Set<String> words = getWordsSP(r.getTitle());
words.addAll(getWordsF(r.getDescription()));
if (words.contains("galaxy") &&
words.contains("workflow"))
return true;
return Optional
.ofNullable(r.getSubject())
.map(
s -> s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) &&
s.stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow")))
.orElse(false);
}
private static boolean containsCriteriaNotebook(Software s) {
if (!Optional.ofNullable(s.getSubject()).isPresent())
return false;
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter")))
return true;
if (s
.getSubject()
.stream()
.anyMatch(
sbj -> sbj.getValue().toLowerCase().contains("python") &&
sbj.getValue().toLowerCase().contains("notebook")))
return true;
if (s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python")) &&
s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("notebook")))
return true;
return false;
}
private static Set<String> getWordsSP(List<StructuredProperty> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
return words;
}
private static Set<String> getWordsF(List<Field<String>> elem) {
Set<String> words = new HashSet<>();
Optional
.ofNullable(elem)
.ifPresent(
e -> e
.forEach(
t -> words
.addAll(
Arrays.asList(t.getValue().toLowerCase().replaceAll("[^a-zA-Z ]", "").split(" ")))));
return words;
}
}

View File

@ -46,6 +46,11 @@
"paramLongName": "taggingConf",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": false
},
{
"paramName": "dmp",
"paramLongName": "datasourceMapPath",
"paramDescription": "mapping between the datasource duplicates and the corresponding master",
"paramRequired": true
}
]

View File

@ -102,7 +102,20 @@
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_exec_bulktag"/>
<join name="copy_wait" to="eosc_get_datasource_master"/>
<action name="eosc_get_datasource_master">
<java>
<main-class>eu.dnetlib.dhp.bulktag.eosc.ReadMasterEoscDatasourceFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/datasourcemaster</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="fork_exec_bulktag"/>
<error to="Kill"/>
</action>
<fork name="fork_exec_bulktag">
<path start="bulktag_publication"/>
@ -133,6 +146,7 @@
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -160,6 +174,7 @@
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -187,6 +202,7 @@
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -214,159 +230,12 @@
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="eosc_tag"/>
<action name="eosc_tag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC_tagging</name>
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscTag</arg>
</spark>
<ok to="eosc_get_datasource_master"/>
<error to="Kill"/>
</action>
<action name="eosc_get_datasource_master">
<java>
<main-class>eu.dnetlib.dhp.bulktag.eosc.ReadMasterDatasourceFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/datasourcemaster</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="fork_eosc_context_tag"/>
<error to="Kill"/>
</action>
<fork name="fork_eosc_context_tag">
<path start="eosc_context_tag_publication"/>
<path start="eosc_context_tag_dataset"/>
<path start="eosc_context_tag_otherresearchproduct"/>
<path start="eosc_context_tag_software"/>
</fork>
<action name="eosc_context_tag_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC tagging publication</name>
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait_eosc_context_tag"/>
<error to="Kill"/>
</action>
<action name="eosc_context_tag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC tagging dataset</name>
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait_eosc_context_tag"/>
<error to="Kill"/>
</action>
<action name="eosc_context_tag_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC tagging software</name>
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait_eosc_context_tag"/>
<error to="Kill"/>
</action>
<action name="eosc_context_tag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>EOSC tagging ORP</name>
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark>
<ok to="wait_eosc_context_tag"/>
<error to="Kill"/>
</action>
<join name="wait_eosc_context_tag" to="End"/>
<end name="End"/>
<join name="wait" to="End"/>
</workflow-app>

View File

@ -112,7 +112,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -152,7 +156,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -249,7 +257,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -329,7 +341,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -402,7 +418,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/orp",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -523,7 +543,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -649,7 +673,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -749,7 +777,11 @@ public class BulkTagJobTest {
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.bulktag;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -21,17 +19,9 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author miriam.baglioni
* @Date 22/07/22
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
//"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc
//"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc
@ -39,13 +29,39 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
//"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag
public class EOSCContextTaggingTest {
private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class);
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
+ " \"title\" : \"$['title'][*]['value']\","
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
+ " \"contributor\" : \"$['contributor'][*]['value']\","
+ " \"description\" : \"$['description'][*]['value']\", "
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='subject:fos')].value\"} ";
private static String taggingConf = "";
static {
try {
taggingConf = IOUtils
.toString(
BulkTagJobTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml"));
} catch (IOException e) {
e.printStackTrace();
}
}
@BeforeAll
public static void beforeAll() throws IOException {
@ -89,18 +105,22 @@ public class EOSCContextTaggingTest {
.option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset");
SparkEoscBulkTag
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input/dataset",
"-workingPath", workingDir.toString() + "/working/dataset",
"-sourcePath", workingDir.toString() + "/input/dataset",
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -7,6 +7,7 @@ import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -23,18 +24,43 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscTag;
import eu.dnetlib.dhp.bulktag.eosc.EoscTagFunctions;
import eu.dnetlib.dhp.schema.oaf.*;
public class EOSCTagJobTest {
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
+ " \"title\" : \"$['title'][*]['value']\","
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
+ " \"contributor\" : \"$['contributor'][*]['value']\","
+ " \"description\" : \"$['description'][*]['value']\", "
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='subject:fos')].value\"} ";
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
private static String taggingConf = "";
static {
try {
taggingConf = IOUtils
.toString(
BulkTagJobTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml"));
} catch (IOException e) {
e.printStackTrace();
}
}
@BeforeAll
public static void beforeAll() throws IOException {
@ -101,14 +127,21 @@ public class EOSCTagJobTest {
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
"-sourcePath", workingDir.toString() + "/input",
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath()
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -401,13 +434,14 @@ public class EOSCTagJobTest {
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
"-workingPath", workingDir.toString() + "/working",
});
@ -644,14 +678,21 @@ public class EOSCTagJobTest {
.option("compression", "gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input",
"-workingPath", workingDir.toString() + "/working"
"-sourcePath", workingDir.toString() + "/input",
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap,
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath()
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());