migrate relation from RDD to Dataset

This commit is contained in:
Sandro La Bruzzo 2020-03-13 09:13:20 +01:00
parent 7b28783fb4
commit addaaa091f
25 changed files with 1131 additions and 115 deletions

View File

@ -0,0 +1,24 @@
package eu.dnetlib.scholexplorer.relation;
import java.io.Serializable;
public class RelInfo implements Serializable {
private String original;
private String inverse;
public String getOriginal() {
return original;
}
public void setOriginal(String original) {
this.original = original;
}
public String getInverse() {
return inverse;
}
public void setInverse(String inverse) {
this.inverse = inverse;
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.scholexplorer.relation;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import java.io.Serializable;
import java.util.HashMap;
public class RelationMapper extends HashMap<String,RelInfo > implements Serializable {
public static RelationMapper load() throws Exception {
final String json = IOUtils.toString(RelationMapper.class.getResourceAsStream("relations.json"));
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, RelationMapper.class);
}
}

View File

@ -0,0 +1,158 @@
{
"cites":{
"original":"Cites",
"inverse":"IsCitedBy"
},
"compiles":{
"original":"Compiles",
"inverse":"IsCompiledBy"
},
"continues":{
"original":"Continues",
"inverse":"IsContinuedBy"
},
"derives":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"describes":{
"original":"Describes",
"inverse":"IsDescribedBy"
},
"documents":{
"original":"Documents",
"inverse":"IsDocumentedBy"
},
"hasmetadata":{
"original":"HasMetadata",
"inverse":"IsMetadataOf"
},
"hasassociationwith":{
"original":"HasAssociationWith",
"inverse":"HasAssociationWith"
},
"haspart":{
"original":"HasPart",
"inverse":"IsPartOf"
},
"hasversion":{
"original":"HasVersion",
"inverse":"IsVersionOf"
},
"iscitedby":{
"original":"IsCitedBy",
"inverse":"Cites"
},
"iscompiledby":{
"original":"IsCompiledBy",
"inverse":"Compiles"
},
"iscontinuedby":{
"original":"IsContinuedBy",
"inverse":"Continues"
},
"isderivedfrom":{
"original":"IsDerivedFrom",
"inverse":"IsSourceOf"
},
"isdescribedby":{
"original":"IsDescribedBy",
"inverse":"Describes"
},
"isdocumentedby":{
"original":"IsDocumentedBy",
"inverse":"Documents"
},
"isidenticalto":{
"original":"IsIdenticalTo",
"inverse":"IsIdenticalTo"
},
"ismetadatafor":{
"original":"IsMetadataFor",
"inverse":"IsMetadataOf"
},
"ismetadataof":{
"original":"IsMetadataOf",
"inverse":"IsMetadataFor"
},
"isnewversionof":{
"original":"IsNewVersionOf",
"inverse":"IsPreviousVersionOf"
},
"isobsoletedby":{
"original":"IsObsoletedBy",
"inverse":"Obsoletes"
},
"isoriginalformof":{
"original":"IsOriginalFormOf",
"inverse":"IsVariantFormOf"
},
"ispartof":{
"original":"IsPartOf",
"inverse":"HasPart"
},
"ispreviousversionof":{
"original":"IsPreviousVersionOf",
"inverse":"IsNewVersionOf"
},
"isreferencedby":{
"original":"IsReferencedBy",
"inverse":"References"
},
"isrelatedto":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"isrequiredby":{
"original":"IsRequiredBy",
"inverse":"Requires"
},
"isreviewedby":{
"original":"IsReviewedBy",
"inverse":"Reviews"
},
"issourceof":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"issupplementedby":{
"original":"IsSupplementedBy",
"inverse":"IsSupplementTo"
},
"issupplementto":{
"original":"IsSupplementTo",
"inverse":"IsSupplementedBy"
},
"isvariantformof":{
"original":"IsVariantFormOf",
"inverse":"IsOriginalFormOf"
},
"isversionof":{
"original":"IsVersionOf",
"inverse":"HasVersion"
},
"obsoletes":{
"original":"Obsoletes",
"inverse":"IsObsoletedBy"
},
"references":{
"original":"References",
"inverse":"IsReferencedBy"
},
"requires":{
"original":"Requires",
"inverse":"IsRequiredBy"
},
"related":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"reviews":{
"original":"Reviews",
"inverse":"IsReviewedBy"
},
"unknown":{
"original":"Unknown",
"inverse":"Unknown"
}
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.scholexplorer.relation;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
public class RelationMapperTest {
@Test
public void testLoadRels() throws Exception{
RelationMapper relationMapper = RelationMapper.load();
relationMapper.keySet().forEach(System.out::println);
}
}

View File

@ -0,0 +1,158 @@
{
"cites":{
"original":"Cites",
"inverse":"IsCitedBy"
},
"compiles":{
"original":"Compiles",
"inverse":"IsCompiledBy"
},
"continues":{
"original":"Continues",
"inverse":"IsContinuedBy"
},
"derives":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"describes":{
"original":"Describes",
"inverse":"IsDescribedBy"
},
"documents":{
"original":"Documents",
"inverse":"IsDocumentedBy"
},
"hasmetadata":{
"original":"HasMetadata",
"inverse":"IsMetadataOf"
},
"hasassociationwith":{
"original":"HasAssociationWith",
"inverse":"HasAssociationWith"
},
"haspart":{
"original":"HasPart",
"inverse":"IsPartOf"
},
"hasversion":{
"original":"HasVersion",
"inverse":"IsVersionOf"
},
"iscitedby":{
"original":"IsCitedBy",
"inverse":"Cites"
},
"iscompiledby":{
"original":"IsCompiledBy",
"inverse":"Compiles"
},
"iscontinuedby":{
"original":"IsContinuedBy",
"inverse":"Continues"
},
"isderivedfrom":{
"original":"IsDerivedFrom",
"inverse":"IsSourceOf"
},
"isdescribedby":{
"original":"IsDescribedBy",
"inverse":"Describes"
},
"isdocumentedby":{
"original":"IsDocumentedBy",
"inverse":"Documents"
},
"isidenticalto":{
"original":"IsIdenticalTo",
"inverse":"IsIdenticalTo"
},
"ismetadatafor":{
"original":"IsMetadataFor",
"inverse":"IsMetadataOf"
},
"ismetadataof":{
"original":"IsMetadataOf",
"inverse":"IsMetadataFor"
},
"isnewversionof":{
"original":"IsNewVersionOf",
"inverse":"IsPreviousVersionOf"
},
"isobsoletedby":{
"original":"IsObsoletedBy",
"inverse":"Obsoletes"
},
"isoriginalformof":{
"original":"IsOriginalFormOf",
"inverse":"IsVariantFormOf"
},
"ispartof":{
"original":"IsPartOf",
"inverse":"HasPart"
},
"ispreviousversionof":{
"original":"IsPreviousVersionOf",
"inverse":"IsNewVersionOf"
},
"isreferencedby":{
"original":"IsReferencedBy",
"inverse":"References"
},
"isrelatedto":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"isrequiredby":{
"original":"IsRequiredBy",
"inverse":"Requires"
},
"isreviewedby":{
"original":"IsReviewedBy",
"inverse":"Reviews"
},
"issourceof":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"issupplementedby":{
"original":"IsSupplementedBy",
"inverse":"IsSupplementTo"
},
"issupplementto":{
"original":"IsSupplementTo",
"inverse":"IsSupplementedBy"
},
"isvariantformof":{
"original":"IsVariantFormOf",
"inverse":"IsOriginalFormOf"
},
"isversionof":{
"original":"IsVersionOf",
"inverse":"HasVersion"
},
"obsoletes":{
"original":"Obsoletes",
"inverse":"IsObsoletedBy"
},
"references":{
"original":"References",
"inverse":"IsReferencedBy"
},
"requires":{
"original":"Requires",
"inverse":"IsRequiredBy"
},
"related":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"reviews":{
"original":"Reviews",
"inverse":"IsReviewedBy"
},
"unknown":{
"original":"Unknown",
"inverse":"Unknown"
}
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.schema.oaf;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
public class Relation extends Oaf {
@ -63,4 +64,22 @@ public class Relation extends Oaf {
public void setCollectedFrom(List<KeyValue> collectedFrom) {
this.collectedFrom = collectedFrom;
}
public void mergeFrom(Relation other) {
this.mergeOAFDataInfo(other);
if (other.getCollectedFrom() == null || other.getCollectedFrom().size() == 0)
return;
if (collectedFrom == null && other.getCollectedFrom() != null) {
collectedFrom = other.getCollectedFrom();
return;
}
if (other.getCollectedFrom() != null) {
collectedFrom.addAll(other.getCollectedFrom());
collectedFrom = new ArrayList<>(collectedFrom
.stream()
.collect(Collectors.toMap(KeyValue::toComparableString, x -> x, (x1, x2) -> x1))
.values());
}
}
}

View File

@ -11,6 +11,8 @@ import java.util.Map;
public class DLIDataset extends Dataset {
private String originalObjIdentifier;
private List<ProvenaceInfo> dlicollectedfrom;
private String completionStatus;
@ -31,6 +33,14 @@ public class DLIDataset extends Dataset {
this.dlicollectedfrom = dlicollectedfrom;
}
public String getOriginalObjIdentifier() {
return originalObjIdentifier;
}
public void setOriginalObjIdentifier(String originalObjIdentifier) {
this.originalObjIdentifier = originalObjIdentifier;
}
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);

View File

@ -7,6 +7,9 @@ import java.io.Serializable;
import java.util.*;
public class DLIPublication extends Publication implements Serializable {
private String originalObjIdentifier;
private List<ProvenaceInfo> dlicollectedfrom;
private String completionStatus;
@ -27,6 +30,14 @@ public class DLIPublication extends Publication implements Serializable {
this.dlicollectedfrom = dlicollectedfrom;
}
public String getOriginalObjIdentifier() {
return originalObjIdentifier;
}
public void setOriginalObjIdentifier(String originalObjIdentifier) {
this.originalObjIdentifier = originalObjIdentifier;
}
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);

View File

@ -13,11 +13,9 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.io.IOException;
@ -45,42 +43,31 @@ public class SparkPropagateRelationsJob {
final String targetRelPath = parser.get("targetRelPath");
final Dataset<Relation> df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
final Dataset<Relation> merge = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)).where("relClass == 'merges'");
final Dataset<Relation> rels= spark.read().load(relationPath).as(Encoders.bean(Relation.class));
final Dataset<Relation> firstJoin = rels.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
final JavaPairRDD<String, String> mergedIds = df
.where("relClass == 'merges'")
.select(df.col("source"),df.col("target"))
.distinct()
.toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(1), r.getString(0)));
if(mergeRelation!= null)
relation.setSource(mergeRelation.getSource());
return relation;
}, Encoders.bean(Relation.class));
final Dataset<Relation> secondJoin = firstJoin.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
if (mergeRelation != null )
relation.setTarget(mergeRelation.getSource());
return relation;
}, Encoders.bean(Relation.class));
final JavaRDD<String> sourceEntity = sc.textFile(relationPath);
JavaRDD<String> newRels = sourceEntity.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE);
}
return v1._2()._1();
})
.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET);
}
return v1._2()._1();
}).filter(SparkPropagateRelationsJob::containsDedup)
.repartition(500);
newRels.union(sourceEntity).repartition(1000).saveAsTextFile(targetRelPath, GzipCodec.class);
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
}
private static boolean containsDedup(final String json) {

View File

@ -15,11 +15,9 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.io.IOException;
@ -55,17 +53,6 @@ public class SparkUpdateEntityJob {
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
final JavaRDD<String> sourceEntity = sc.textFile(entityPath);
if ("relation".equalsIgnoreCase(entity)) {
sourceEntity.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1())
.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1())
.saveAsTextFile(destination, GzipCodec.class);
} else {
final JavaRDD<String> dedupEntity = sc.textFile(dedupRecordPath);
JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s));
Class<? extends Oaf> mainClass;
@ -83,19 +70,12 @@ public class SparkUpdateEntityJob {
throw new IllegalArgumentException("Illegal type " + entity);
}
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1());
map.union(dedupEntity).saveAsTextFile(destination, GzipCodec.class);
}
}
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {

View File

@ -26,7 +26,7 @@
</property>
</parameters>
<start to="updateDeletedByInferenceRelation"/>
<start to="DeleteWorkingPath"/>
<kill name="Kill">
@ -132,7 +132,7 @@
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--mergeRelPath</arg><arg>${targetPath}/${entity}/mergeRel</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<arg>--targetRelPath</arg><arg>${targetPath}/${entity}/relation_propagated</arg>
<arg>--targetRelPath</arg><arg>${targetPath}/${entity}/updated_relation</arg>
</spark>
<ok to="updateDeletedByInferenceEntity"/>
<error to="Kill"/>
@ -160,35 +160,35 @@
<arg>--dedupRecordPath</arg><arg>${targetPath}/${entity}/dedup_records</arg>
<arg>--targetPath</arg><arg>${targetPath}/${entity}/updated_record</arg>
</spark>
<ok to="updateDeletedByInferenceRelation"/>
<error to="Kill"/>
</action>
<action name="updateDeletedByInferenceRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Update ${entity} set deleted by Inference</name>
<class>eu.dnetlib.dedup.SparkUpdateEntityJob</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--entityPath</arg><arg>${targetPath}/${entity}/relation_propagated</arg>
<arg>--mergeRelPath</arg><arg>${targetPath}/${entity}/mergeRel</arg>
<arg>--entity</arg><arg>relation</arg>
<arg>--dedupRecordPath</arg><arg>${targetPath}/${entity}/dedup_records</arg>
<arg>--targetPath</arg><arg>${targetPath}/${entity}/updated_relation</arg>
</spark>
<ok to="replaceEntity"/>
<error to="Kill"/>
</action>
<!-- <action name="updateDeletedByInferenceRelation">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Update ${entity} set deleted by Inference</name>-->
<!-- <class>eu.dnetlib.dedup.SparkUpdateEntityJob</class>-->
<!-- <jar>dhp-dedup-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory ${sparkExecutorMemory}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- ${sparkExtraOPT}-->
<!-- </spark-opts>-->
<!-- <arg>-mt</arg><arg>yarn-cluster</arg>-->
<!-- <arg>&#45;&#45;entityPath</arg><arg>${targetPath}/${entity}/relation_propagated</arg>-->
<!-- <arg>&#45;&#45;mergeRelPath</arg><arg>${targetPath}/${entity}/mergeRel</arg>-->
<!-- <arg>&#45;&#45;entity</arg><arg>relation</arg>-->
<!-- <arg>&#45;&#45;dedupRecordPath</arg><arg>${targetPath}/${entity}/dedup_records</arg>-->
<!-- <arg>&#45;&#45;targetPath</arg><arg>${targetPath}/${entity}/updated_relation</arg>-->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="replaceEntity">
<fs>

View File

@ -21,15 +21,19 @@ public class SparkCreateDedupTest {
}
@Test
@Ignore
public void createSimRelsTest() throws Exception {
SparkCreateSimRels.main(new String[] {
public void PropagateRelationsTest() throws Exception {
SparkPropagateRelationsJob.main(new String[] {
"-mt", "local[*]",
"-s", "/Users/miconis/dumps",
"-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration),
"-t", "/tmp/dedup",
"-ep", "/Users/sandro/Downloads/scholix/graph/relation",
"-mr", "/Users/sandro/Downloads/scholix/dedupGraphWD/publication/mergeRel",
"-mt", "local[*]",
"-t", "/Users/sandro/Downloads/scholix/dedupGraphWD/publication/rel_fixed",
});
}

View File

@ -0,0 +1,56 @@
package eu.dnetlib.dhp.graph.scholexplorer;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
public class SparkScholexplorerGenerateSimRel {
final static String IDJSONPATH = "$.id";
final static String OBJIDPATH = "$.originalObjIdentifier";
public static void generateDataFrame(final SparkSession spark, final JavaSparkContext sc, final String inputPath, final String targetPath) {
final JavaPairRDD<String, String> datasetSimRel = sc.textFile(inputPath+"/dataset/*")
.mapToPair((PairFunction<String, String, String>) k ->
new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, k),DHPUtils.getJPathString(OBJIDPATH, k)))
.filter(t ->
!StringUtils.substringAfter(t._1(), "|")
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
final JavaPairRDD<String, String> publicationSimRel = sc.textFile(inputPath+"/publication/*")
.mapToPair((PairFunction<String, String, String>) k ->
new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, k),DHPUtils.getJPathString(OBJIDPATH, k)))
.filter(t ->
!StringUtils.substringAfter(t._1(), "|")
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
JavaRDD<Relation> simRel = datasetSimRel.union(publicationSimRel).map(s -> {
final Relation r = new Relation();
r.setSource(s._1());
r.setTarget(s._2());
r.setRelType("similar");
return r;
}
);
spark.createDataset(simRel.rdd(), Encoders.bean(Relation.class)).distinct().write()
.mode(SaveMode.Overwrite).save(targetPath+"/pid_simRel");
}
}

View File

@ -6,6 +6,7 @@ import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.graph.scholexplorer.parser.PublicationScholexplorerParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@ -29,15 +30,17 @@ public class SparkScholexplorerGraphImporter {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
RelationMapper relationMapper = RelationMapper.load();
sc.sequenceFile(inputPath, IntWritable.class, Text.class).map(Tuple2::_2).map(Text::toString).repartition(500)
.flatMap((FlatMapFunction<String, Oaf>) record -> {
switch (parser.get("entity")) {
case "dataset":
final DatasetScholexplorerParser d = new DatasetScholexplorerParser();
return d.parseObject(record).iterator();
return d.parseObject(record,relationMapper).iterator();
case "publication":
final PublicationScholexplorerParser p = new PublicationScholexplorerParser();
return p.parseObject(record).iterator();
return p.parseObject(record,relationMapper).iterator();
default:
throw new IllegalArgumentException("wrong values of entities");
}

View File

@ -12,16 +12,23 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import sun.rmi.log.ReliableLog;
import javax.xml.crypto.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -41,6 +48,8 @@ public class SparkScholexplorerMergeEntitiesJob {
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.config(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
.appName(SparkGraphImporterJob.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
@ -102,21 +111,54 @@ public class SparkScholexplorerMergeEntitiesJob {
}).saveAsTextFile(targetPath, GzipCodec.class);
break;
case "relation":
union.mapToPair((PairFunction<String, String, Relation>) f -> {
SparkScholexplorerGenerateSimRel.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") );
RDD<Relation> rdd = union.mapToPair((PairFunction<String, String, Relation>) f -> {
final String source = getJPathString(SOURCEJSONPATH, f);
final String target = getJPathString(TARGETJSONPATH, f);
final String reltype = getJPathString(RELJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source, reltype, target)), mapper.readValue(f, Relation.class));
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, Relation.class));
}).reduceByKey((a, b) -> {
a.mergeOAFDataInfo(b);
a.mergeFrom(b);
return a;
}).map(item -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item._2());
}).saveAsTextFile(targetPath, GzipCodec.class);
break;
}).map(Tuple2::_2).rdd();
spark.createDataset(rdd, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
Dataset<Relation> rel_ds =spark.read().load(targetPath).as(Encoders.bean(Relation.class));
System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel");
Dataset<Relation>sim_ds =spark.read().load(targetPath.replace("/relation","")+"/pid_simRel").as(Encoders.bean(Relation.class));
TargetFunction tf = new TargetFunction();
Dataset<Relation> ids = sim_ds.map(tf, Encoders.bean(Relation.class));
final Dataset<Relation> firstJoin = rel_ds
.joinWith(ids, ids.col("target")
.equalTo(rel_ds.col("source")), "left_outer")
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) s ->
{
if (s._2() != null) {
s._1().setSource(s._2().getSource());
}
return s._1();
}
, Encoders.bean(Relation.class));
Dataset<Relation> secondJoin = firstJoin.joinWith(ids, ids.col("target").equalTo(firstJoin.col("target")),"left_outer")
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) s ->
{
if (s._2() != null) {
s._1().setTarget(s._2().getSource());
}
return s._1();
}
, Encoders.bean(Relation.class));
secondJoin.write().mode(SaveMode.Overwrite).save(targetPath+"_fixed");
}
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.graph.scholexplorer;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
public class TargetFunction implements MapFunction<Relation, Relation> {
@Override
public Relation call(Relation relation) throws Exception {
final String type = StringUtils.substringBefore(relation.getSource(), "|");
relation.setTarget(String.format("%s|%s", type, StringUtils.substringAfter(relation.getTarget(),"::")));
return relation;
}
}

View File

@ -6,6 +6,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -21,7 +22,7 @@ public abstract class AbstractScholexplorerParser {
final static Pattern pattern = Pattern.compile("10\\.\\d{4,9}/[-._;()/:A-Z0-9]+$", Pattern.CASE_INSENSITIVE);
private List<String> datasetSubTypes = Arrays.asList("dataset", "software", "film", "sound", "physicalobject", "audiovisual", "collection", "other", "study", "metadata");
public abstract List<Oaf> parseObject(final String record);
public abstract List<Oaf> parseObject(final String record, final RelationMapper relMapper);
protected Map<String, String> getAttributes(final XMLStreamReader parser) {
final Map<String, String> attributesMap = new HashMap<>();

View File

@ -10,6 +10,8 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node;
import eu.dnetlib.scholexplorer.relation.RelInfo;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@ -21,7 +23,7 @@ import java.util.stream.Collectors;
public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
@Override
public List<Oaf> parseObject(String record) {
public List<Oaf> parseObject(String record, final RelationMapper relationMapper) {
try {
final DLIDataset parsedObject = new DLIDataset();
final VTDGen vg = new VTDGen();
@ -40,7 +42,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"));
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
@ -145,9 +147,20 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
final String relationSemantic = n.getAttributes().get("relationType");
final String inverseRelation = n.getAttributes().get("inverseRelationType");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = n.getAttributes().get("inverseRelationType");
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
inverseRelation = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setRelClass("datacite");

View File

@ -8,6 +8,8 @@ import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.scholexplorer.relation.RelInfo;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@ -19,7 +21,7 @@ import java.util.stream.Collectors;
public class PublicationScholexplorerParser extends AbstractScholexplorerParser {
@Override
public List<Oaf> parseObject(final String record) {
public List<Oaf> parseObject(final String record, final RelationMapper relationMapper) {
try {
final List<Oaf> result = new ArrayList<>();
final DLIPublication parsedObject = new DLIPublication();
@ -63,6 +65,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "publication");
parsedObject.setId(sourceId);
parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"));
String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
List<Node> collectedFromNodes =
@ -125,9 +129,19 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
final String relationSemantic = n.getAttributes().get("relationType");
final String inverseRelation = n.getAttributes().get("inverseRelationType");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = "Unknown";
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setCollectedFrom(parsedObject.getCollectedfrom());

View File

@ -1,4 +1,4 @@
<workflow-app name="import_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Infospace Merge Entities" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>

View File

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true}
]

View File

@ -0,0 +1,158 @@
{
"cites":{
"original":"Cites",
"inverse":"IsCitedBy"
},
"compiles":{
"original":"Compiles",
"inverse":"IsCompiledBy"
},
"continues":{
"original":"Continues",
"inverse":"IsContinuedBy"
},
"derives":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"describes":{
"original":"Describes",
"inverse":"IsDescribedBy"
},
"documents":{
"original":"Documents",
"inverse":"IsDocumentedBy"
},
"hasmetadata":{
"original":"HasMetadata",
"inverse":"IsMetadataOf"
},
"hasassociationwith":{
"original":"HasAssociationWith",
"inverse":"HasAssociationWith"
},
"haspart":{
"original":"HasPart",
"inverse":"IsPartOf"
},
"hasversion":{
"original":"HasVersion",
"inverse":"IsVersionOf"
},
"iscitedby":{
"original":"IsCitedBy",
"inverse":"Cites"
},
"iscompiledby":{
"original":"IsCompiledBy",
"inverse":"Compiles"
},
"iscontinuedby":{
"original":"IsContinuedBy",
"inverse":"Continues"
},
"isderivedfrom":{
"original":"IsDerivedFrom",
"inverse":"IsSourceOf"
},
"isdescribedby":{
"original":"IsDescribedBy",
"inverse":"Describes"
},
"isdocumentedby":{
"original":"IsDocumentedBy",
"inverse":"Documents"
},
"isidenticalto":{
"original":"IsIdenticalTo",
"inverse":"IsIdenticalTo"
},
"ismetadatafor":{
"original":"IsMetadataFor",
"inverse":"IsMetadataOf"
},
"ismetadataof":{
"original":"IsMetadataOf",
"inverse":"IsMetadataFor"
},
"isnewversionof":{
"original":"IsNewVersionOf",
"inverse":"IsPreviousVersionOf"
},
"isobsoletedby":{
"original":"IsObsoletedBy",
"inverse":"Obsoletes"
},
"isoriginalformof":{
"original":"IsOriginalFormOf",
"inverse":"IsVariantFormOf"
},
"ispartof":{
"original":"IsPartOf",
"inverse":"HasPart"
},
"ispreviousversionof":{
"original":"IsPreviousVersionOf",
"inverse":"IsNewVersionOf"
},
"isreferencedby":{
"original":"IsReferencedBy",
"inverse":"References"
},
"isrelatedto":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"isrequiredby":{
"original":"IsRequiredBy",
"inverse":"Requires"
},
"isreviewedby":{
"original":"IsReviewedBy",
"inverse":"Reviews"
},
"issourceof":{
"original":"IsSourceOf",
"inverse":"IsDerivedFrom"
},
"issupplementedby":{
"original":"IsSupplementedBy",
"inverse":"IsSupplementTo"
},
"issupplementto":{
"original":"IsSupplementTo",
"inverse":"IsSupplementedBy"
},
"isvariantformof":{
"original":"IsVariantFormOf",
"inverse":"IsOriginalFormOf"
},
"isversionof":{
"original":"IsVersionOf",
"inverse":"HasVersion"
},
"obsoletes":{
"original":"Obsoletes",
"inverse":"IsObsoletedBy"
},
"references":{
"original":"References",
"inverse":"IsReferencedBy"
},
"requires":{
"original":"Requires",
"inverse":"IsRequiredBy"
},
"related":{
"original":"IsRelatedTo",
"inverse":"IsRelatedTo"
},
"reviews":{
"original":"Reviews",
"inverse":"IsReviewedBy"
},
"unknown":{
"original":"Unknown",
"inverse":"Unknown"
}
}

View File

@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
@ -15,11 +16,11 @@ public class ScholexplorerParserTest {
@Test
public void testDataciteParser() throws IOException {
public void testDataciteParser() throws Exception {
String xml = IOUtils.toString(this.getClass().getResourceAsStream("dmf.xml"));
DatasetScholexplorerParser p = new DatasetScholexplorerParser();
List<Oaf> oaves = p.parseObject(xml);
List<Oaf> oaves = p.parseObject(xml, RelationMapper.load());
ObjectMapper m = new ObjectMapper();
m.enable(SerializationFeature.INDENT_OUTPUT);

View File

@ -0,0 +1,18 @@
package eu.dnetlib.dhp.graph.scholexplorer;
import org.junit.Ignore;
import org.junit.Test;
public class SparkScholexplorerMergeEntitiesJobTest {
@Test
@Ignore
public void testMerge() throws Exception {
SparkScholexplorerMergeEntitiesJob.main(new String[]{
"-mt", "local[*]",
"-e", "relation",
"-s", "file:///Users/sandro/Downloads/scholix/relation",
"-t", "file:///Users/sandro/Downloads/scholix/relation"}
);
}
}

File diff suppressed because one or more lines are too long