forked from D-Net/dnet-hadoop
Compare commits
5 Commits
master
...
subjectPro
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | 777d0bf560 | |
Miriam Baglioni | 50cdc76987 | |
Miriam Baglioni | e1317edd23 | |
Miriam Baglioni | 3d49668431 | |
Miriam Baglioni | c58e0d9910 |
|
@ -12,14 +12,13 @@ import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class PropagationConstant {
|
public class PropagationConstant {
|
||||||
|
|
||||||
|
@ -50,6 +49,9 @@ public class PropagationConstant {
|
||||||
|
|
||||||
public static final String TRUE = "true";
|
public static final String TRUE = "true";
|
||||||
|
|
||||||
|
public static final String PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_ID = "subject:remrel";
|
||||||
|
public static final String PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_NAME = "Propagation of subjects through semantic relation";
|
||||||
|
|
||||||
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos";
|
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos";
|
||||||
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories";
|
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories";
|
||||||
|
|
||||||
|
@ -239,4 +241,30 @@ public class PropagationConstant {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <R extends Oaf> Dataset<R> readOafKryoPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Class[] getModelClasses() {
|
||||||
|
List<Class<?>> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses());
|
||||||
|
modelClasses
|
||||||
|
.addAll(
|
||||||
|
Lists
|
||||||
|
.newArrayList(
|
||||||
|
Result.class,
|
||||||
|
Qualifier.class,
|
||||||
|
DataInfo.class,
|
||||||
|
Publication.class,
|
||||||
|
eu.dnetlib.dhp.schema.oaf.Dataset.class,
|
||||||
|
Software.class,
|
||||||
|
OtherResearchProduct.class,
|
||||||
|
Subject.class,
|
||||||
|
AccessRight.class));
|
||||||
|
return modelClasses.toArray(new Class[] {});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjecttoresultfromsemrel;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 04/10/22
|
||||||
|
*
|
||||||
|
* This is for the selection of result with subject in subjetClassList
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class PrepareResultResultStep1 implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareResultResultStep1.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
PrepareResultResultStep1.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
final String resultClassName = parser.get("resultTableName");
|
||||||
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
|
final String resultType = parser.get("resultType");
|
||||||
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
|
final List<String> subjectClassList = Arrays
|
||||||
|
.asList(
|
||||||
|
parser.get("subjectlist").split(";"))
|
||||||
|
.stream()
|
||||||
|
.map(s -> s.toLowerCase())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
log.info("subjectClassList: {}", subjectClassList);
|
||||||
|
|
||||||
|
final List<String> allowedSemRel = Arrays
|
||||||
|
.asList(
|
||||||
|
parser.get("allowedsemrels").split(";"))
|
||||||
|
.stream()
|
||||||
|
.map(s -> s.toLowerCase())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
log.info("allowedSemRel: {}", allowedSemRel);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
prepareInfo(spark, inputPath, outputPath, subjectClassList, allowedSemRel, resultClazz, resultType);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> void prepareInfo(SparkSession spark,
|
||||||
|
String inputPath,
|
||||||
|
String outputPath,
|
||||||
|
List<String> subjectClassList,
|
||||||
|
List<String> allowedSemRel,
|
||||||
|
Class<R> resultClazz,
|
||||||
|
String resultType) {
|
||||||
|
|
||||||
|
Dataset<R> result = readPath(spark, inputPath + "/" + resultType, resultClazz)
|
||||||
|
.filter(
|
||||||
|
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||||
|
!r.getDataInfo().getInvisible() && Optional.ofNullable(r.getSubject()).isPresent() &&
|
||||||
|
r
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> subjectClassList.contains(s.getQualifier().getClassid().toLowerCase())));
|
||||||
|
|
||||||
|
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class)
|
||||||
|
.filter(
|
||||||
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||||
|
allowedSemRel.contains(r.getRelClass().toLowerCase()));
|
||||||
|
|
||||||
|
result
|
||||||
|
.joinWith(relation, result.col("id").equalTo(relation.col("source")), "right")
|
||||||
|
.groupByKey((MapFunction<Tuple2<R, Relation>, String>) t2 -> t2._2().getTarget(), Encoders.STRING())
|
||||||
|
.mapGroups(
|
||||||
|
(MapGroupsFunction<String, Tuple2<R, Relation>, ResultSubjectList>) (k,
|
||||||
|
it) -> getResultSubjectList(subjectClassList, k, it),
|
||||||
|
Encoders.bean(ResultSubjectList.class))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/" + resultType);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> ResultSubjectList getResultSubjectList(List<String> subjectClassList, String k,
|
||||||
|
Iterator<Tuple2<R, Relation>> it) {
|
||||||
|
Tuple2<R, Relation> first = it.next();
|
||||||
|
if (!Optional.ofNullable(first._1()).isPresent()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
ResultSubjectList rsl = new ResultSubjectList();
|
||||||
|
rsl.setResId(k);
|
||||||
|
List<SubjectInfo> sbjInfo = new ArrayList<>();
|
||||||
|
Set<String> subjectSet = new HashSet<>();
|
||||||
|
extracted(subjectClassList, first._1().getSubject(), sbjInfo, subjectSet);
|
||||||
|
it.forEachRemaining(t2 -> {
|
||||||
|
if (Optional.ofNullable(t2._1()).isPresent())
|
||||||
|
extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet);
|
||||||
|
});
|
||||||
|
rsl.setSubjectList(sbjInfo);
|
||||||
|
return rsl;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> void extracted(List<String> subjectClassList, List<Subject> resultSubject,
|
||||||
|
List<SubjectInfo> sbjList, Set<String> subjectSet) {
|
||||||
|
|
||||||
|
resultSubject
|
||||||
|
.stream()
|
||||||
|
.filter(s -> subjectClassList.contains(s.getQualifier().getClassid().toLowerCase()))
|
||||||
|
.forEach(s -> {
|
||||||
|
if (!subjectSet.contains(s.getValue()))
|
||||||
|
sbjList
|
||||||
|
.add(
|
||||||
|
SubjectInfo
|
||||||
|
.newInstance(
|
||||||
|
s.getQualifier().getClassid(), s.getQualifier().getClassname(), s.getValue()));
|
||||||
|
subjectSet.add(s.getValue());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjecttoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 04/10/22
|
||||||
|
*/
|
||||||
|
public class ResultSubjectList implements Serializable {
|
||||||
|
private String resId;
|
||||||
|
List<SubjectInfo> subjectList;
|
||||||
|
|
||||||
|
public String getResId() {
|
||||||
|
return resId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResId(String resId) {
|
||||||
|
this.resId = resId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<SubjectInfo> getSubjectList() {
|
||||||
|
return subjectList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSubjectList(List<SubjectInfo> subjectList) {
|
||||||
|
this.subjectList = subjectList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,192 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjecttoresultfromsemrel;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 05/10/22
|
||||||
|
*/
|
||||||
|
public class SparkSubjectPropagationStep2 implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkSubjectPropagationStep2.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkSubjectPropagationStep2.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String preparedPath = parser.get("preparedPath");
|
||||||
|
log.info("preparedPath: {}", preparedPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final String resultClassName = parser.get("resultTableName");
|
||||||
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
|
final String resultType = parser.get("resultType");
|
||||||
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
|
conf.registerKryoClasses(getModelClasses());
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
execPropagation(spark, inputPath, outputPath, workingPath, preparedPath, resultClazz, resultType);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> void execPropagation(SparkSession spark,
|
||||||
|
String inputPath,
|
||||||
|
String outputPath,
|
||||||
|
String workingPath,
|
||||||
|
String preparedPath,
|
||||||
|
Class<R> resultClazz,
|
||||||
|
String resultType) {
|
||||||
|
|
||||||
|
Dataset<Tuple2<String, R>> results = readOafKryoPath(spark, inputPath + "/" + resultType, resultClazz)
|
||||||
|
.map(
|
||||||
|
(MapFunction<R, Tuple2<String, R>>) r -> new Tuple2(r.getId(), r),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(resultClazz)));
|
||||||
|
|
||||||
|
Dataset<ResultSubjectList> preparedResult = readPath(
|
||||||
|
spark, preparedPath + "/publication", ResultSubjectList.class)
|
||||||
|
.union(readPath(spark, preparedPath + "/dataset", ResultSubjectList.class))
|
||||||
|
.union(readPath(spark, preparedPath + "/software", ResultSubjectList.class))
|
||||||
|
.union(readPath(spark, preparedPath + "/otherresearchproduct", ResultSubjectList.class));
|
||||||
|
|
||||||
|
results
|
||||||
|
.joinWith(
|
||||||
|
preparedResult,
|
||||||
|
results.col("_1").equalTo(preparedResult.col("resId")),
|
||||||
|
"left")
|
||||||
|
.map((MapFunction<Tuple2<Tuple2<String, R>, ResultSubjectList>, String>) t2 -> {
|
||||||
|
R res = t2._1()._2();
|
||||||
|
// estraggo le tipologie di subject dal result
|
||||||
|
Map<String, List<String>> resultMap = new HashMap<>();
|
||||||
|
if (Optional.ofNullable(t2._2()).isPresent()) {
|
||||||
|
if(Optional.ofNullable(res.getSubject()).isPresent()){
|
||||||
|
res.getSubject().stream().forEach(s -> {
|
||||||
|
String cid = s.getQualifier().getClassid();
|
||||||
|
if(!cid.equals(ModelConstants.DNET_SUBJECT_KEYWORD)){
|
||||||
|
if (!resultMap.containsKey(cid)) {
|
||||||
|
resultMap.put(cid, new ArrayList<>());
|
||||||
|
}
|
||||||
|
resultMap.get(cid).add(s.getValue());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}else{
|
||||||
|
res.setSubject(new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from the list all the subjects with the same class already present in the result
|
||||||
|
List<String> distinctClassId = t2
|
||||||
|
._2()
|
||||||
|
.getSubjectList()
|
||||||
|
.stream()
|
||||||
|
.map(si -> si.getClassid())
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
List<SubjectInfo> sbjInfo = new ArrayList<>();
|
||||||
|
for (String k : distinctClassId) {
|
||||||
|
if (!resultMap.containsKey(k))
|
||||||
|
sbjInfo = t2
|
||||||
|
._2()
|
||||||
|
.getSubjectList()
|
||||||
|
.stream()
|
||||||
|
.filter(s -> s.getClassid().equalsIgnoreCase(k))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
else
|
||||||
|
sbjInfo = t2
|
||||||
|
._2()
|
||||||
|
.getSubjectList()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s.getClassid().equalsIgnoreCase(k) &&
|
||||||
|
!resultMap.get(k).contains(s.getValue()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
// All the subjects not already present in the result are added
|
||||||
|
for (SubjectInfo si : sbjInfo) {
|
||||||
|
res.getSubject().add(getSubject(si));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return OBJECT_MAPPER.writeValueAsString(res);
|
||||||
|
}, Encoders.STRING())
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.text(workingPath + "/" + resultType);
|
||||||
|
|
||||||
|
readPath(spark, workingPath + "/" + resultType, resultClazz)
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/" + resultType);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> Subject getSubject(SubjectInfo si) {
|
||||||
|
return OafMapperUtils
|
||||||
|
.subject(
|
||||||
|
si.getValue(),
|
||||||
|
si.getClassid(), si.getClassname(),
|
||||||
|
ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES,
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, PROPAGATION_DATA_INFO_TYPE,
|
||||||
|
true, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_ID,
|
||||||
|
PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_NAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.85"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjecttoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 06/10/22
|
||||||
|
*/
|
||||||
|
public class SubjectInfo implements Serializable {
|
||||||
|
private String classid;
|
||||||
|
private String value;
|
||||||
|
private String classname;
|
||||||
|
|
||||||
|
public static SubjectInfo newInstance(String classid, String classname, String value) {
|
||||||
|
SubjectInfo si = new SubjectInfo();
|
||||||
|
si.classid = classid;
|
||||||
|
si.value = value;
|
||||||
|
si.classname = classname;
|
||||||
|
return si;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClassid() {
|
||||||
|
return classid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClassid(String classid) {
|
||||||
|
this.classid = classid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClassname() {
|
||||||
|
return classname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClassname(String classname) {
|
||||||
|
this.classname = classname;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjecttoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 05/10/22
|
||||||
|
*/
|
||||||
|
public class Utils implements Serializable {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
[
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName":"asr",
|
||||||
|
"paramLongName":"allowedsemrels",
|
||||||
|
"paramDescription": "the set of semantic relations between the results to be exploited to perform the propagation",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"sl",
|
||||||
|
"paramLongName":"subjectlist",
|
||||||
|
"paramDescription": "the list of classid for the subject we wanti to propagate",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"rt",
|
||||||
|
"paramLongName":"resultType",
|
||||||
|
"paramDescription": "the result type",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"sp",
|
||||||
|
"paramLongName":"sourcePath",
|
||||||
|
"paramDescription": "the path of the input graph",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"rtn",
|
||||||
|
"paramLongName":"resultTableName",
|
||||||
|
"paramDescription": "the class of the result",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "ssm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the path where prepared info have been stored",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path used to store temporary output files",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,45 @@
|
||||||
|
[
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName":"pp",
|
||||||
|
"paramLongName":"preparedPath",
|
||||||
|
"paramDescription": "the path to the prepared information",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"rt",
|
||||||
|
"paramLongName":"resultType",
|
||||||
|
"paramDescription": "the result type",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"sp",
|
||||||
|
"paramLongName":"sourcePath",
|
||||||
|
"paramDescription": "the path of the input graph",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"rtn",
|
||||||
|
"paramLongName":"resultTableName",
|
||||||
|
"paramDescription": "the class of the result",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "ssm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the path where prepared info have been stored",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path used to store output files",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "wp",
|
||||||
|
"paramLongName": "workingPath",
|
||||||
|
"paramDescription": "the path used to store temporary output files",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,63 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
<!-- <value>hadoop-rm3.garr-pa1.d4science.org:8032</value>-->
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<!-- <value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>-->
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
<!-- <value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>-->
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorNumber</name>
|
||||||
|
<value>4</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<value>15G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<value>10G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2MaxExecutors</name>
|
||||||
|
<value>50</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
|
||||||
|
</configuration>
|
|
@ -0,0 +1,297 @@
|
||||||
|
<workflow-app name="subject_to_result_propagation" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sourcePath</name>
|
||||||
|
<description>the source path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>subjectlist</name>
|
||||||
|
<value>fos;sdg</value>
|
||||||
|
<description>the list of subject classid to propagate (split by ;)</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>allowedsemrels</name>
|
||||||
|
<value>IsSupplementedBy;IsSupplementTo;IsPreviousVersionOf;IsNewVersionOf;IsIdenticalTo;Obsoletes;IsObsoletedBy;IsVersionOf</value>
|
||||||
|
<description>the allowed semantics </description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the output path</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="reset_outputpath"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="reset_outputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${outputPath}"/>
|
||||||
|
<mkdir path="${outputPath}"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="prepare_subject_propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<fork name="prepare_subject_propagation">
|
||||||
|
<path start="prepare_subject_propagation_publication"/>
|
||||||
|
<path start="prepare_subject_propagation_dataset"/>
|
||||||
|
<path start="prepare_subject_propagation_software"/>
|
||||||
|
<path start="prepare_subject_propagation_orp"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="prepare_subject_propagation_publication">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareSubjectResultsAssociation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||||
|
<arg>--subjectlist</arg><arg>${subjectlist}</arg>
|
||||||
|
<arg>--resultType</arg><arg>publication</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_prepare"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="prepare_subject_propagation_dataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareSubjectResultsAssociation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||||
|
<arg>--subjectlist</arg><arg>${subjectlist}</arg>
|
||||||
|
<arg>--resultType</arg><arg>dataset</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_prepare"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="prepare_subject_propagation_software">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareSubjectResultsAssociation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||||
|
<arg>--subjectlist</arg><arg>${subjectlist}</arg>
|
||||||
|
<arg>--resultType</arg><arg>software</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_prepare"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="prepare_subject_propagation_orp">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareSubjectResultsAssociation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||||
|
<arg>--subjectlist</arg><arg>${subjectlist}</arg>
|
||||||
|
<arg>--resultType</arg><arg>otherresearchproduct</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_prepare"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="wait_prepare" to="propagate_subject"/>
|
||||||
|
|
||||||
|
<fork name="propagate_subject">
|
||||||
|
<path start="propagate_subject_publication"/>
|
||||||
|
<path start="propagate_subject_dataset"/>
|
||||||
|
<path start="propagate_subject_software"/>
|
||||||
|
<path start="propagate_subject_otherresearchproduct"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="propagate_subject_publication">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SubjectToResultPropagation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=8G
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
|
<arg>--resultType</arg><arg>publication</arg>
|
||||||
|
<arg>--preparedPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="propagate_subject_otherresearchproduct">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SubjectToResultPropagation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
|
<arg>--resultType</arg><arg>otherresearchproduct</arg>
|
||||||
|
<arg>--preparedPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="propagate_subject_dataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SubjectToResultPropagation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
|
<arg>--resultType</arg><arg>dataset</arg>
|
||||||
|
<arg>--preparedPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="propagate_subject_software">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SubjectToResultPropagation</name>
|
||||||
|
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
|
<arg>--resultType</arg><arg>software</arg>
|
||||||
|
<arg>--preparedPath</arg><arg>${workingDir}/preparedInfo</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="wait_propagation" to="End"/>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,159 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjectpropagation;
|
||||||
|
|
||||||
|
import static org.apache.spark.sql.functions.desc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.resulttocommunityfromsemrel.ResultToCommunityJobTest;
|
||||||
|
import eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
import eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1;
|
||||||
|
import eu.dnetlib.dhp.subjecttoresultfromsemrel.ResultSubjectList;
|
||||||
|
import eu.dnetlib.dhp.subjecttoresultfromsemrel.SubjectInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 05/10/22
|
||||||
|
*/
|
||||||
|
public class SubjectPreparationJobTest {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SubjectPreparationJobTest.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files.createTempDirectory(SubjectPreparationJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SubjectPreparationJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SubjectPreparationJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSparkSubjectToResultThroughSemRelJob() throws Exception {
|
||||||
|
PrepareResultResultStep1
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-allowedsemrels",
|
||||||
|
"IsSupplementedBy;IsSupplementTo;IsPreviousVersionOf;IsNewVersionOf;IsIdenticalTo;Obsoletes;IsObsoletedBy;IsVersionOf",
|
||||||
|
"-subjectlist", "fos;sdg",
|
||||||
|
"-resultType", "publication",
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath", getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/subjectpropagation")
|
||||||
|
.getPath(),
|
||||||
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-outputPath", workingDir.toString()
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
// 50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba fake_fos and fake_sdg IsVersionOf
|
||||||
|
// 50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98
|
||||||
|
// 50|06cdd3ff4700::ff21e3c55d527fa7db171137c5fd1f1f fake_fos2 obsoletes
|
||||||
|
// 50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98
|
||||||
|
// 50|355e65625b88::046477dc24819c5f1453166aa7bfb75e fake_fos2 isSupplementedBy
|
||||||
|
// 50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98
|
||||||
|
// 50|355e65625b88::046477dc24819c5f1453166aa7bfb75e fake_fos2 issupplementto
|
||||||
|
// 50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<ResultSubjectList> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/publication")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, ResultSubjectList.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(2, tmp.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp.filter(r -> r.getResId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98")).count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp.filter(r -> r.getResId().equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba")).count());
|
||||||
|
|
||||||
|
List<SubjectInfo> sbjList = tmp
|
||||||
|
.filter(r -> r.getResId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubjectList();
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, sbjList.size());
|
||||||
|
Assertions.assertEquals(1, sbjList.stream().filter(s -> s.getClassid().equals("sdg")).count());
|
||||||
|
Assertions.assertEquals(2, sbjList.stream().filter(s -> s.getClassid().equals("fos")).count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"fake_sdg",
|
||||||
|
sbjList.stream().filter(s -> s.getClassid().equalsIgnoreCase("sdg")).findFirst().get().getValue());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
sbjList
|
||||||
|
.stream()
|
||||||
|
.filter(s -> s.getClassid().equalsIgnoreCase("fos"))
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_fos")));
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
sbjList
|
||||||
|
.stream()
|
||||||
|
.filter(s -> s.getClassid().equalsIgnoreCase("fos"))
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_fos2")));
|
||||||
|
|
||||||
|
sbjList = tmp
|
||||||
|
.filter(r -> r.getResId().equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubjectList();
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, sbjList.size());
|
||||||
|
Assertions.assertEquals("fos", sbjList.get(0).getClassid().toLowerCase());
|
||||||
|
|
||||||
|
Assertions.assertEquals("fake_fos2", sbjList.get(0).getValue());
|
||||||
|
|
||||||
|
tmp.foreach(s -> System.out.println(OBJECT_MAPPER.writeValueAsString(s)));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,333 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.subjectpropagation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.PropagationConstant;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Subject;
|
||||||
|
import eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 06/10/22
|
||||||
|
*/
|
||||||
|
public class SubjectPropagationJobTest {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SubjectPropagationJobTest.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files.createTempDirectory(SubjectPropagationJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SubjectPropagationJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SubjectPropagationJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSparkSubjectToResultThroughSemRelJob() throws Exception {
|
||||||
|
SparkSubjectPropagationStep2
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-preparedPath", getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/subjectpropagation/preparedInfo")
|
||||||
|
.getPath(),
|
||||||
|
"-resultType", "publication",
|
||||||
|
"-sourcePath", getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/subjectpropagation")
|
||||||
|
.getPath(),
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-workingPath", workingDir.toString() + "/working",
|
||||||
|
"-outputPath", workingDir.toString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// 50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98 should receive fake_fos, fake_sdg and fake_fos2
|
||||||
|
// 50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba should receive fake_fos2
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/publication")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(4, tmp.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
JavaRDD<Subject> sbjs = tmp
|
||||||
|
.flatMap((FlatMapFunction<Publication, Subject>) r -> r.getSubject().iterator())
|
||||||
|
.filter(
|
||||||
|
s -> s.getDataInfo().getInferenceprovenance().equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE));
|
||||||
|
|
||||||
|
Assertions.assertEquals(4, sbjs.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
4, sbjs
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getProvenanceaction()
|
||||||
|
.getClassid()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_ID))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
4,
|
||||||
|
sbjs
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getProvenanceaction()
|
||||||
|
.getClassname()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_SUBJECT_RESULT_SEMREL_CLASS_NAME))
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(3, sbjs.filter(s -> s.getQualifier().getClassid().equals("FOS")).count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(3, sbjs.filter(s -> s.getQualifier().getClassname().equals("Field of Science")).count());
|
||||||
|
Assertions.assertEquals(1, sbjs.filter(s -> s.getQualifier().getClassid().equals("SDG")).count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, sbjs.filter(s -> s.getQualifier().getClassname().equals("Support and Development Goals")).count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
6,
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> !s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) &&
|
||||||
|
s.getQualifier().getClassid().equals("FOS"))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) &&
|
||||||
|
s.getQualifier().getClassid().equals("SDG"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_fos")));
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_fos2")));
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_sdg")));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
6,
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
5, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> !s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) &&
|
||||||
|
s.getQualifier().getClassid().equals("FOS"))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
0, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getId()
|
||||||
|
.equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
s -> s
|
||||||
|
.getDataInfo()
|
||||||
|
.getInferenceprovenance()
|
||||||
|
.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) &&
|
||||||
|
s.getQualifier().getClassid().equals("SDG"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
|
||||||
|
.first()
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(s -> s.getValue().equals("fake_fos2")));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
{"resId":"50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba","subjectList":[{"classid":"FOS", "classname":"Field of Science","value":"fake_fos2"}]}
|
||||||
|
{"resId":"50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98","subjectList":[{"classid":"FOS", "classname":"Field of Science","value":"fake_fos"},{"classid":"SDG","classname":"Support and Development Goals","value":"fake_sdg"},{"classid":"FOS", "classname":"Field of Science","value":"fake_fos2"}]}
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,10 @@
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"IsVersionOf","relType":"datasourceOrganization","source":"50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba","subRelType":"provision","target":"50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"Obsolets","relType":"datasourceOrganization","source":"50|06cdd3ff4700::ff21e3c55d527fa7db171137c5fd1f1f","subRelType":"provision","target":"50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"IsSupplementedBy","relType":"datasourceOrganization","source":"50|355e65625b88::046477dc24819c5f1453166aa7bfb75e","subRelType":"provision","target":"50|06cdd3ff4700::cd7711c65d518859f1d87056e2c45d98","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"IsSupplementTo","relType":"datasourceOrganization","source":"50|355e65625b88::046477dc24819c5f1453166aa7bfb75e","subRelType":"provision","target":"50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::8b75543067b50076e70764917e188178","subRelType":"provision","target":"20|doajarticles::50cb15ff7a6a3f8531f063770179e346","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::9f3ff882f023209d9ffb4dc32b77d376","subRelType":"provision","target":"20|doajarticles::ffc1811633b3222e4764c7b0517f83e8","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::ccc002b33c28ea8a1eab16db2eebd7a5","subRelType":"provision","target":"20|pending_org_::ab7b11bb317a6249f9b6becc7dd98043","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::d70ec79cbd10ebee6e6f54689d2cc7a9","subRelType":"provision","target":"20|openorgs____::9d6dceaf5e56ef060226e4ef7faa28a0","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::e2d4766f243c621e9a9300e6aa74d5a0","subRelType":"provision","target":"20|pending_org_::cfea8083b4f958b42314b50497aebd59","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1658906673376,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::fc496a0a4bb49d654649598b04008682","subRelType":"provision","target":"20|pending_org_::a41c5f55ae53f44abb8bc0e89398074e","validated":false}
|
Loading…
Reference in New Issue