WIP: subjectPropagation #269

Draft
miriam.baglioni wants to merge 5 commits from subjectPropagation into beta
13 changed files with 99 additions and 68 deletions
Showing only changes of commit 3d49668431 - Show all commits

View File

@ -12,14 +12,13 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class PropagationConstant {
@ -237,4 +236,30 @@ public class PropagationConstant {
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
public static <R extends Oaf> Dataset<R> readOafKryoPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz));
}
public static Class[] getModelClasses() {
List<Class<?>> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses());
modelClasses
.addAll(
Lists
.newArrayList(
Result.class,
Qualifier.class,
DataInfo.class,
Publication.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Software.class,
OtherResearchProduct.class,
Subject.class,
AccessRight.class));
return modelClasses.toArray(new Class[] {});
}
}

View File

@ -70,7 +70,7 @@ public class PrepareResultResultStep1 implements Serializable {
final List<String> allowedSemRel = Arrays
.asList(
parser.get("allowedSemRel").split(";"))
parser.get("allowedsemrels").split(";"))
.stream()
.map(s -> s.toLowerCase())
.collect(Collectors.toList());
@ -98,7 +98,7 @@ public class PrepareResultResultStep1 implements Serializable {
Dataset<R> result = readPath(spark, inputPath + "/" + resultType, resultClazz)
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible() &&
!r.getDataInfo().getInvisible() && Optional.ofNullable(r.getSubject()).isPresent() &&
r
.getSubject()
.stream()
@ -116,22 +116,28 @@ public class PrepareResultResultStep1 implements Serializable {
(MapGroupsFunction<String, Tuple2<R, Relation>, ResultSubjectList>) (k,
it) -> getResultSubjectList(subjectClassList, k, it),
Encoders.bean(ResultSubjectList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + resultType);
}
@NotNull
private static <R extends Result> ResultSubjectList getResultSubjectList(List<String> subjectClassList, String k,
Iterator<Tuple2<R, Relation>> it) {
Tuple2<R, Relation> first = it.next();
if (!Optional.ofNullable(first._1()).isPresent()) {
return null;
}
ResultSubjectList rsl = new ResultSubjectList();
rsl.setResId(k);
Tuple2<R, Relation> first = it.next();
List<SubjectInfo> sbjInfo = new ArrayList<>();
Set<String> subjectSet = new HashSet<>();
extracted(subjectClassList, first._1().getSubject(), sbjInfo, subjectSet);
it.forEachRemaining(t2 -> extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet));
it.forEachRemaining(t2 -> {
if (Optional.ofNullable(t2._1()).isPresent())
extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet);
});
rsl.setSubjectList(sbjInfo);
return rsl;
}

View File

@ -50,6 +50,7 @@ public class SparkSubjectPropagationStep2 implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
@ -58,14 +59,15 @@ public class SparkSubjectPropagationStep2 implements Serializable {
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
final String inputPath = parser.get("inputPath");
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
@ -83,7 +85,11 @@ public class SparkSubjectPropagationStep2 implements Serializable {
Class<R> resultClazz,
String resultType) {
Dataset<R> results = readPath(spark, inputPath + "/" + resultType, resultClazz);
Dataset<Tuple2<String, R>> results = readOafKryoPath(spark, inputPath + "/" + resultType, resultClazz)
.map(
(MapFunction<R, Tuple2<String, R>>) r -> new Tuple2(r.getId(), r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(resultClazz)));
Dataset<ResultSubjectList> preparedResult = readPath(
spark, preparedPath + "/publication", ResultSubjectList.class)
.union(readPath(spark, preparedPath + "/dataset", ResultSubjectList.class))
@ -93,20 +99,26 @@ public class SparkSubjectPropagationStep2 implements Serializable {
results
.joinWith(
preparedResult,
results.col("id").equalTo(preparedResult.col("resId")),
results.col("_1").equalTo(preparedResult.col("resId")),
"left")
.map((MapFunction<Tuple2<R, ResultSubjectList>, R>) t2 -> {
R res = t2._1();
.map((MapFunction<Tuple2<Tuple2<String, R>, ResultSubjectList>, String>) t2 -> {
R res = t2._1()._2();
// estraggo le tipologie di subject dal result
Map<String, List<String>> resultMap = new HashMap<>();
if (Optional.ofNullable(t2._2()).isPresent()) {
// estraggo le tipologie di subject dal result
Map<String, List<String>> resultMap = new HashMap<>();
res.getSubject().stream().forEach(s -> {
String cid = s.getQualifier().getClassid();
if (!resultMap.containsKey(cid)) {
resultMap.put(cid, new ArrayList<>());
}
resultMap.get(cid).add(s.getValue());
});
if(Optional.ofNullable(res.getSubject()).isPresent()){
res.getSubject().stream().forEach(s -> {
String cid = s.getQualifier().getClassid();
if(!cid.equals(ModelConstants.DNET_SUBJECT_KEYWORD)){
if (!resultMap.containsKey(cid)) {
resultMap.put(cid, new ArrayList<>());
}
resultMap.get(cid).add(s.getValue());
}
});
}else{
res.setSubject(new ArrayList<>());
}
// Remove from the list all the subjects with the same class already present in the result
List<String> distinctClassId = t2
@ -142,12 +154,12 @@ public class SparkSubjectPropagationStep2 implements Serializable {
}
}
return res;
}, Encoders.bean(resultClazz))
return OBJECT_MAPPER.writeValueAsString(res);
}, Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/" + resultType);
.text(workingPath + "/" + resultType);
readPath(spark, workingPath + "/" + resultType, resultClazz)
.write()

View File

@ -3,7 +3,7 @@
{
"paramName":"asr",
"paramLongName":"allowedSemRel",
"paramLongName":"allowedsemrels",
"paramDescription": "the set of semantic relations between the results to be exploited to perform the propagation",
"paramRequired": true
},

View File

@ -13,8 +13,8 @@
"paramRequired": true
},
{
"paramName":"ip",
"paramLongName":"inputPath",
"paramName":"sp",
"paramLongName":"sourcePath",
"paramDescription": "the path of the input graph",
"paramRequired": true
},

View File

@ -48,7 +48,7 @@
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
<value>10G</value>
</property>
<property>
<name>sparkExecutorCores</name>

View File

@ -1,12 +1,4 @@
<workflow-app name="subject_to_result_propagation" xmlns="uri:oozie:workflow:0.5">
<!-- {-->
<!-- "paramName": "out",-->
<!-- "paramLongName": "outputPath",-->
<!-- "paramDescription": "the path used to store temporary output files",-->
<!-- "paramRequired": true-->
<!-- }-->
<parameters>
<property>
<name>sourcePath</name>
@ -16,14 +8,6 @@
<name>subjectlist</name>
<description>the list of subject classid to propagate (split by ;)</description>
</property>
<property>
<name>resultType</name>
<description>the result tapy</description>
</property>
<property>
<name>resultTableName</name>
<description>the class of the result</description>
</property>
<property>
<name>allowedsemrels</name>
<description>the allowed semantics </description>
@ -64,14 +48,14 @@
<path start="prepare_subject_propagation_publication"/>
<path start="prepare_subject_propagation_dataset"/>
<path start="prepare_subject_propagation_software"/>
<path start="prepare_subject_propagation_otherresearchproduct"/>
<path start="prepare_subject_propagation_orp"/>
</fork>
<action name="prepare_subject_propagation_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjectResultsAssociation</name>
<name>PrepareSubjectResultsAssociation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -98,7 +82,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjectResultsAssociation</name>
<name>PrepareSubjectResultsAssociation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -125,7 +109,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjectResultsAssociation</name>
<name>PrepareSubjectResultsAssociation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -152,7 +136,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjectResultsAssociation</name>
<name>PrepareSubjectResultsAssociation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -188,12 +172,12 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<name>SubjectToResultPropagation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--executor-memory=8G
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -201,8 +185,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
@ -217,8 +202,8 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<name>SubjectToResultPropagation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -230,8 +215,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
@ -246,8 +232,8 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<name>SubjectToResultPropagation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -259,8 +245,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
@ -275,8 +262,8 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<name>SubjectToResultPropagation</name>
<class>eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -288,8 +275,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
@ -300,7 +288,7 @@
<error to="Kill"/>
</action>
<join name="wait_prepare" to="End"/>
<join name="wait_propagation" to="End"/>
<end name="End"/>

View File

@ -81,7 +81,7 @@ public class SubjectPreparationJobTest {
PrepareResultResultStep1
.main(
new String[] {
"-allowedSemRel",
"-allowedsemrels",
"IsSupplementedBy;IsSupplementTo;IsPreviousVersionOf;IsNewVersionOf;IsIdenticalTo;Obsoletes;IsObsoletedBy;IsVersionOf",
"-subjectlist", "fos;sdg",
"-resultType", "publication",

View File

@ -76,7 +76,7 @@ public class SubjectPropagationJobTest {
.getResource("/eu/dnetlib/dhp/subjectpropagation/preparedInfo")
.getPath(),
"-resultType", "publication",
"-inputPath", getClass()
"-sourcePath", getClass()
.getResource("/eu/dnetlib/dhp/subjectpropagation")
.getPath(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),