forked from D-Net/dnet-hadoop
save adjacency list as JoinedEntity
This commit is contained in:
parent
1ecca69f49
commit
b2691a3b0a
|
@ -2,3 +2,4 @@ sparkDriverMemory=7G
|
||||||
sparkExecutorMemory=7G
|
sparkExecutorMemory=7G
|
||||||
hive_db_name=claudio
|
hive_db_name=claudio
|
||||||
sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06
|
sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06
|
||||||
|
outputPath=/tmp/openaire_provision
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.graph;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class EntityRelEntity implements Serializable {
|
public class EntityRelEntity implements Serializable {
|
||||||
|
|
||||||
private TypedRow source;
|
private TypedRow source;
|
||||||
private TypedRow relation;
|
private TypedRow relation;
|
||||||
private TypedRow target;
|
private TypedRow target;
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
@ -15,8 +16,10 @@ import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
|
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
|
||||||
|
@ -40,21 +43,32 @@ public class GraphJoiner implements Serializable {
|
||||||
|
|
||||||
public static final int MAX_RELS = 10;
|
public static final int MAX_RELS = 10;
|
||||||
|
|
||||||
public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) {
|
private SparkSession spark;
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
private String inputPath;
|
||||||
|
|
||||||
|
private String outPath;
|
||||||
|
|
||||||
|
public GraphJoiner(SparkSession spark, String inputPath, String outPath) {
|
||||||
|
this.spark = spark;
|
||||||
|
this.inputPath = inputPath;
|
||||||
|
this.outPath = outPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphJoiner adjacencyLists() {
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
|
||||||
|
|
||||||
// read each entity
|
// read each entity
|
||||||
JavaPairRDD<String, TypedRow> datasource = readPathEntity(sc, inputPath, "datasource");
|
JavaPairRDD<String, TypedRow> datasource = readPathEntity(sc, getInputPath(), "datasource");
|
||||||
JavaPairRDD<String, TypedRow> organization = readPathEntity(sc, inputPath, "organization");
|
JavaPairRDD<String, TypedRow> organization = readPathEntity(sc, getInputPath(), "organization");
|
||||||
JavaPairRDD<String, TypedRow> project = readPathEntity(sc, inputPath, "project");
|
JavaPairRDD<String, TypedRow> project = readPathEntity(sc, getInputPath(), "project");
|
||||||
JavaPairRDD<String, TypedRow> dataset = readPathEntity(sc, inputPath, "dataset");
|
JavaPairRDD<String, TypedRow> dataset = readPathEntity(sc, getInputPath(), "dataset");
|
||||||
JavaPairRDD<String, TypedRow> otherresearchproduct = readPathEntity(sc, inputPath, "otherresearchproduct");
|
JavaPairRDD<String, TypedRow> otherresearchproduct = readPathEntity(sc, getInputPath(), "otherresearchproduct");
|
||||||
JavaPairRDD<String, TypedRow> software = readPathEntity(sc, inputPath, "software");
|
JavaPairRDD<String, TypedRow> software = readPathEntity(sc, getInputPath(), "software");
|
||||||
JavaPairRDD<String, TypedRow> publication = readPathEntity(sc, inputPath, "publication");
|
JavaPairRDD<String, TypedRow> publication = readPathEntity(sc, getInputPath(), "publication");
|
||||||
|
|
||||||
// create the union between all the entities
|
// create the union between all the entities
|
||||||
final String entitiesPath = outPath + "/entities";
|
final String entitiesPath = getOutPath() + "/0_entities";
|
||||||
datasource
|
datasource
|
||||||
.union(organization)
|
.union(organization)
|
||||||
.union(project)
|
.union(project)
|
||||||
|
@ -63,7 +77,7 @@ public class GraphJoiner implements Serializable {
|
||||||
.union(software)
|
.union(software)
|
||||||
.union(publication)
|
.union(publication)
|
||||||
.map(e -> new EntityRelEntity().setSource(e._2()))
|
.map(e -> new EntityRelEntity().setSource(e._2()))
|
||||||
.map(e -> new ObjectMapper().writeValueAsString(e))
|
.map(GraphMappingUtils::serialize)
|
||||||
.saveAsTextFile(entitiesPath, GzipCodec.class);
|
.saveAsTextFile(entitiesPath, GzipCodec.class);
|
||||||
|
|
||||||
JavaPairRDD<String, EntityRelEntity> entities = sc.textFile(entitiesPath)
|
JavaPairRDD<String, EntityRelEntity> entities = sc.textFile(entitiesPath)
|
||||||
|
@ -71,7 +85,7 @@ public class GraphJoiner implements Serializable {
|
||||||
.mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t));
|
.mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t));
|
||||||
|
|
||||||
// reads the relationships
|
// reads the relationships
|
||||||
final JavaPairRDD<String, EntityRelEntity> relation = readPathRelation(sc, inputPath)
|
final JavaPairRDD<String, EntityRelEntity> relation = readPathRelation(sc, getInputPath())
|
||||||
.filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted
|
.filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted
|
||||||
.map(p -> new EntityRelEntity().setRelation(p))
|
.map(p -> new EntityRelEntity().setRelation(p))
|
||||||
.mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p))
|
.mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p))
|
||||||
|
@ -80,45 +94,156 @@ public class GraphJoiner implements Serializable {
|
||||||
.flatMap(p -> p.iterator())
|
.flatMap(p -> p.iterator())
|
||||||
.mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p));
|
.mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p));
|
||||||
|
|
||||||
final String joinByTargetPath = outPath + "/join_by_target";
|
final String joinByTargetPath = getOutPath() + "/1_join_by_target";
|
||||||
relation
|
relation
|
||||||
.join(entities
|
.join(entities
|
||||||
.filter(e -> !e._2().getSource().getDeleted())
|
.filter(e -> !e._2().getSource().getDeleted())
|
||||||
/*.mapToPair(e -> new Tuple2<>(e._1(), new MappingUtils().pruneModel(e._2())))*/)
|
.mapToPair(e -> new Tuple2<>(e._1(), new GraphMappingUtils().pruneModel(e._2()))))
|
||||||
.map(s -> new EntityRelEntity()
|
.map(s -> new EntityRelEntity()
|
||||||
.setRelation(s._2()._1().getRelation())
|
.setRelation(s._2()._1().getRelation())
|
||||||
.setTarget(s._2()._2().getSource()))
|
.setTarget(s._2()._2().getSource()))
|
||||||
.map(e -> new ObjectMapper().writeValueAsString(e))
|
.map(GraphMappingUtils::serialize)
|
||||||
.saveAsTextFile(joinByTargetPath, GzipCodec.class);
|
.saveAsTextFile(joinByTargetPath, GzipCodec.class);
|
||||||
|
|
||||||
JavaPairRDD<String, EntityRelEntity> bySource = sc.textFile(joinByTargetPath)
|
JavaPairRDD<String, EntityRelEntity> bySource = sc.textFile(joinByTargetPath)
|
||||||
.map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
|
.map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
|
||||||
.mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t));
|
.mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t));
|
||||||
|
|
||||||
|
final String linkedEntityPath = getOutPath() + "/2_linked_entities";
|
||||||
entities
|
entities
|
||||||
.union(bySource)
|
.union(bySource)
|
||||||
.groupByKey() // by source id
|
.groupByKey() // by source id
|
||||||
.map(p -> {
|
.map(p -> toLinkedEntity(p))
|
||||||
final LinkedEntity e = new LinkedEntity();
|
.map(e -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(e))
|
||||||
final List<Tuple> links = Lists.newArrayList();
|
.saveAsTextFile(linkedEntityPath, GzipCodec.class);
|
||||||
for(EntityRelEntity rel : p._2()) {
|
|
||||||
if (rel.hasMainEntity() & e.getEntity() == null) {
|
final String joinedEntitiesPath = getOutPath() + "/3_joined_entities";
|
||||||
e.setEntity(rel.getSource());
|
sc.textFile(linkedEntityPath)
|
||||||
}
|
.map(s -> new ObjectMapper().readValue(s, LinkedEntity.class))
|
||||||
if (rel.hasRelatedEntity()) {
|
.map(l -> toJoinedEntity(l))
|
||||||
links.add(new Tuple()
|
.map(j -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(j))
|
||||||
.setRelation(rel.getRelation())
|
.saveAsTextFile(joinedEntitiesPath);
|
||||||
.setTarget(rel.getTarget()));
|
|
||||||
}
|
return this;
|
||||||
}
|
}
|
||||||
e.setLinks(links);
|
|
||||||
if (e.getEntity() == null) {
|
public GraphJoiner asXML() {
|
||||||
throw new IllegalStateException("missing main entity on '" + p._1() + "'");
|
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
|
||||||
}
|
|
||||||
return e;
|
final String joinedEntitiesPath = getOutPath() + "/3_joined_entities";
|
||||||
})
|
sc.textFile(joinedEntitiesPath)
|
||||||
.map(e -> new ObjectMapper().writeValueAsString(e))
|
.map(s -> new ObjectMapper().readValue(s, LinkedEntity.class))
|
||||||
.saveAsTextFile(outPath + "/linked_entities", GzipCodec.class);
|
.map(l -> toXML(l))
|
||||||
|
.saveAsTextFile(getOutPath() + "/4_xml");
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String toXML(LinkedEntity l) {
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SparkSession getSpark() {
|
||||||
|
return spark;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphJoiner setSpark(SparkSession spark) {
|
||||||
|
this.spark = spark;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInputPath() {
|
||||||
|
return inputPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphJoiner setInputPath(String inputPath) {
|
||||||
|
this.inputPath = inputPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOutPath() {
|
||||||
|
return outPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphJoiner setOutPath(String outPath) {
|
||||||
|
this.outPath = outPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// HELPERS
|
||||||
|
|
||||||
|
private OafEntity parseOaf(final String json, final String type) {
|
||||||
|
final ObjectMapper o = new ObjectMapper();
|
||||||
|
try {
|
||||||
|
switch (type) {
|
||||||
|
case "publication":
|
||||||
|
return o.readValue(json, Publication.class);
|
||||||
|
case "dataset":
|
||||||
|
return o.readValue(json, Dataset.class);
|
||||||
|
case "otherresearchproduct":
|
||||||
|
return o.readValue(json, OtherResearchProduct.class);
|
||||||
|
case "software":
|
||||||
|
return o.readValue(json, Software.class);
|
||||||
|
case "datasource":
|
||||||
|
return o.readValue(json, Datasource.class);
|
||||||
|
case "organization":
|
||||||
|
return o.readValue(json, Organization.class);
|
||||||
|
case "project":
|
||||||
|
return o.readValue(json, Project.class);
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("invalid type: " + type);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts the result of grouping <rel, target> pairs and the entities by source id to LinkedEntity
|
||||||
|
* @param p
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private LinkedEntity toLinkedEntity(Tuple2<String, Iterable<EntityRelEntity>> p) {
|
||||||
|
final LinkedEntity e = new LinkedEntity();
|
||||||
|
final List<Tuple> links = Lists.newArrayList();
|
||||||
|
for(EntityRelEntity rel : p._2()) {
|
||||||
|
if (rel.hasMainEntity() & e.getEntity() == null) {
|
||||||
|
e.setEntity(rel.getSource());
|
||||||
|
}
|
||||||
|
if (rel.hasRelatedEntity()) {
|
||||||
|
links.add(new Tuple()
|
||||||
|
.setRelation(rel.getRelation())
|
||||||
|
.setTarget(rel.getTarget()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.setLinks(links);
|
||||||
|
if (e.getEntity() == null) {
|
||||||
|
throw new IllegalStateException("missing main entity on '" + p._1() + "'");
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a LinkedEntity to a JoinedEntity
|
||||||
|
* @param l
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private JoinedEntity toJoinedEntity(LinkedEntity l) {
|
||||||
|
return new JoinedEntity().setType(l.getEntity().getType())
|
||||||
|
.setEntity(parseOaf(l.getEntity().getOaf(), l.getEntity().getType()))
|
||||||
|
.setLinks(l.getLinks()
|
||||||
|
.stream()
|
||||||
|
.map(t -> {
|
||||||
|
final ObjectMapper o = new ObjectMapper();
|
||||||
|
try {
|
||||||
|
return new Tuple2<>(
|
||||||
|
o.readValue(t.getRelation().getOaf(), Relation.class),
|
||||||
|
o.readValue(t.getTarget().getOaf(), RelatedEntity.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException(e);
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -132,14 +257,14 @@ public class GraphJoiner implements Serializable {
|
||||||
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
|
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
|
||||||
return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
|
return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
|
||||||
.mapToPair((PairFunction<Tuple2<Text, Text>, String, TypedRow>) item -> {
|
.mapToPair((PairFunction<Tuple2<Text, Text>, String, TypedRow>) item -> {
|
||||||
|
final String s = item._2().toString();
|
||||||
final String json = item._2().toString();
|
final DocumentContext json = JsonPath.parse(s);
|
||||||
final String id = JsonPath.read(json, "$.id");
|
final String id = json.read("$.id");
|
||||||
return new Tuple2<>(id, new TypedRow()
|
return new Tuple2<>(id, new TypedRow()
|
||||||
.setSourceId(id)
|
.setSourceId(id)
|
||||||
.setDeleted(JsonPath.read(json, "$.dataInfo.deletedbyinference"))
|
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
|
||||||
.setType(type)
|
.setType(type)
|
||||||
.setOaf(json));
|
.setOaf(s));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,13 +278,14 @@ public class GraphJoiner implements Serializable {
|
||||||
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
|
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
|
||||||
return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
||||||
.map(item -> {
|
.map(item -> {
|
||||||
final String json = item._2().toString();
|
final String s = item._2().toString();
|
||||||
|
final DocumentContext json = JsonPath.parse(s);
|
||||||
return new TypedRow()
|
return new TypedRow()
|
||||||
.setSourceId(JsonPath.read(json, "$.source"))
|
.setSourceId(json.read("$.source"))
|
||||||
.setTargetId(JsonPath.read(json, "$.target"))
|
.setTargetId(json.read("$.target"))
|
||||||
.setDeleted(JsonPath.read(json, "$.dataInfo.deletedbyinference"))
|
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
|
||||||
.setType("relation")
|
.setType("relation")
|
||||||
.setOaf(json);
|
.setOaf(s);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,18 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import net.minidev.json.JSONArray;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class GraphMappingUtils {
|
public class GraphMappingUtils {
|
||||||
|
|
||||||
|
@ -20,4 +29,132 @@ public class GraphMappingUtils {
|
||||||
types.put("relation", Relation.class);
|
types.put("relation", Relation.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static EntityRelEntity pruneModel(EntityRelEntity e) {
|
||||||
|
|
||||||
|
final DocumentContext j = JsonPath.parse(e.getSource().getOaf());
|
||||||
|
final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType());
|
||||||
|
|
||||||
|
switch (e.getSource().getType()) {
|
||||||
|
case "publication":
|
||||||
|
case "dataset":
|
||||||
|
case "otherresearchproduct":
|
||||||
|
case "software":
|
||||||
|
mapTitle(j, re);
|
||||||
|
re.setDateofacceptance(j.read("$.dateofacceptance.value"));
|
||||||
|
re.setPublisher(j.read("$.publisher.value"));
|
||||||
|
|
||||||
|
JSONArray pids = j.read("$.pid");
|
||||||
|
re.setPid(pids.stream()
|
||||||
|
.map(p -> asStructuredProperty((LinkedHashMap<String, Object>) p))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
re.setResulttype(asQualifier(j.read("$.resulttype")));
|
||||||
|
|
||||||
|
JSONArray collfrom = j.read("$.collectedfrom");
|
||||||
|
re.setCollectedfrom(collfrom.stream()
|
||||||
|
.map(c -> asKV((LinkedHashMap<String, Object>)c))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
//TODO still to be mapped
|
||||||
|
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "datasource":
|
||||||
|
re.setOfficialname(j.read("$.officialname.value"));
|
||||||
|
re.setWebsiteurl(j.read("$.websiteurl.value"));
|
||||||
|
re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
|
||||||
|
re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "organization":
|
||||||
|
re.setLegalname(j.read("$.legalname.value"));
|
||||||
|
re.setLegalshortname(j.read("$.legalshortname.value"));
|
||||||
|
re.setCountry(asQualifier(j.read("$.country")));
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "project":
|
||||||
|
re.setProjectTitle(j.read("$.title.value"));
|
||||||
|
re.setCode(j.read("$.code.value"));
|
||||||
|
re.setAcronym(j.read("$.acronym.value"));
|
||||||
|
re.setContracttype(asQualifier(j.read("$.contracttype")));
|
||||||
|
|
||||||
|
JSONArray f = j.read("$.fundingtree");
|
||||||
|
if (!f.isEmpty()) {
|
||||||
|
re.setFundingtree(f.stream()
|
||||||
|
.map(s -> s.toString())
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return new EntityRelEntity().setSource(
|
||||||
|
new TypedRow()
|
||||||
|
.setSourceId(e.getSource().getSourceId())
|
||||||
|
.setDeleted(e.getSource().getDeleted())
|
||||||
|
.setType(e.getSource().getType())
|
||||||
|
.setOaf(serialize(re)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
|
||||||
|
final KeyValue kv = new KeyValue();
|
||||||
|
kv.setKey((String) j.get("key"));
|
||||||
|
kv.setValue((String) j.get("value"));
|
||||||
|
return kv;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void mapTitle(DocumentContext j, RelatedEntity re) {
|
||||||
|
final JSONArray a = j.read("$.title");
|
||||||
|
if (!a.isEmpty()) {
|
||||||
|
final StructuredProperty sp = asStructuredProperty((LinkedHashMap<String, Object>) a.get(0));
|
||||||
|
if(StringUtils.isNotBlank(sp.getValue())) {
|
||||||
|
re.setTitle(sp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StructuredProperty asStructuredProperty(LinkedHashMap<String, Object> j) {
|
||||||
|
final StructuredProperty sp = new StructuredProperty();
|
||||||
|
final String value = (String) j.get("value");
|
||||||
|
if (StringUtils.isNotBlank(value)) {
|
||||||
|
sp.setValue((String) j.get("value"));
|
||||||
|
sp.setQualifier(asQualifier((LinkedHashMap<String, String>) j.get("qualifier")));
|
||||||
|
}
|
||||||
|
return sp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Qualifier asQualifier(LinkedHashMap<String, String> j) {
|
||||||
|
final Qualifier q = new Qualifier();
|
||||||
|
|
||||||
|
final String classid = j.get("classid");
|
||||||
|
if (StringUtils.isNotBlank(classid)) {
|
||||||
|
q.setClassid(classid);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String classname = j.get("classname");
|
||||||
|
if (StringUtils.isNotBlank(classname)) {
|
||||||
|
q.setClassname(classname);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String schemeid = j.get("schemeid");
|
||||||
|
if (StringUtils.isNotBlank(schemeid)) {
|
||||||
|
q.setSchemeid(schemeid);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String schemename = j.get("schemename");
|
||||||
|
if (StringUtils.isNotBlank(schemename)) {
|
||||||
|
q.setSchemename(schemename);
|
||||||
|
}
|
||||||
|
return q;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String serialize(final Object o) {
|
||||||
|
try {
|
||||||
|
return new ObjectMapper()
|
||||||
|
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
|
||||||
|
.writeValueAsString(o);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new IllegalArgumentException("unable to serialize: " + o.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class JoinedEntity implements Serializable {
|
||||||
|
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
private OafEntity entity;
|
||||||
|
|
||||||
|
private List<Tuple2<Relation, RelatedEntity>> links;
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JoinedEntity setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OafEntity getEntity() {
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JoinedEntity setEntity(OafEntity entity) {
|
||||||
|
this.entity = entity;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Tuple2<Relation, RelatedEntity>> getLinks() {
|
||||||
|
return links;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JoinedEntity setLinks(List<Tuple2<Relation, RelatedEntity>> links) {
|
||||||
|
this.links = links;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,103 +0,0 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
import net.minidev.json.JSONArray;
|
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class MappingUtils {
|
|
||||||
|
|
||||||
public EntityRelEntity pruneModel(EntityRelEntity e) throws JsonProcessingException {
|
|
||||||
|
|
||||||
final DocumentContext j = JsonPath.parse(e.getSource().getOaf());
|
|
||||||
final RelatedEntity re = new RelatedEntity();
|
|
||||||
|
|
||||||
switch (e.getSource().getType()) {
|
|
||||||
case "publication":
|
|
||||||
case "dataset":
|
|
||||||
case "otherresearchproduct":
|
|
||||||
case "software":
|
|
||||||
|
|
||||||
mapTitle(j, re);
|
|
||||||
re.setDateofacceptance(j.read("$.dateofacceptance.value"));
|
|
||||||
re.setPublisher(j.read("$.publisher.value"));
|
|
||||||
|
|
||||||
JSONArray pids = j.read("$.pid");
|
|
||||||
re.setPid(pids.stream()
|
|
||||||
.map(p -> asStructuredProperty((LinkedHashMap<String, Object>) p))
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
|
|
||||||
re.setResulttype(asQualifier(j.read("$.resulttype")));
|
|
||||||
|
|
||||||
JSONArray collfrom = j.read("$.collectedfrom");
|
|
||||||
re.setCollectedfrom(collfrom.stream()
|
|
||||||
.map(c -> asKV((LinkedHashMap<String, Object>)c))
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
|
|
||||||
//TODO still to be mapped
|
|
||||||
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
|
||||||
|
|
||||||
break;
|
|
||||||
case "datasource":
|
|
||||||
re.setOfficialname(j.read("$.officialname.value"));
|
|
||||||
re.setWebsiteurl(j.read("$.websiteurl.value"));
|
|
||||||
|
|
||||||
re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
|
|
||||||
re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
|
|
||||||
|
|
||||||
break;
|
|
||||||
case "organization":
|
|
||||||
|
|
||||||
break;
|
|
||||||
case "project":
|
|
||||||
mapTitle(j, re);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new EntityRelEntity().setSource(
|
|
||||||
new TypedRow()
|
|
||||||
.setSourceId(e.getSource().getSourceId())
|
|
||||||
.setDeleted(e.getSource().getDeleted())
|
|
||||||
.setType(e.getSource().getType())
|
|
||||||
.setOaf(new ObjectMapper().writeValueAsString(re)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private KeyValue asKV(LinkedHashMap<String, Object> j) {
|
|
||||||
final KeyValue kv = new KeyValue();
|
|
||||||
kv.setKey((String) j.get("key"));
|
|
||||||
kv.setValue((String) j.get("value"));
|
|
||||||
return kv;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void mapTitle(DocumentContext j, RelatedEntity re) {
|
|
||||||
JSONArray a = j.read("$.title");
|
|
||||||
if (!a.isEmpty()) {
|
|
||||||
re.setTitle(asStructuredProperty((LinkedHashMap<String, Object>) a.get(0)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private StructuredProperty asStructuredProperty(LinkedHashMap<String, Object> j) {
|
|
||||||
final StructuredProperty sp = new StructuredProperty();
|
|
||||||
sp.setValue((String) j.get("value"));
|
|
||||||
sp.setQualifier(asQualifier((LinkedHashMap<String, String>) j.get("qualifier")));
|
|
||||||
return sp;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier asQualifier(LinkedHashMap<String, String> j) {
|
|
||||||
Qualifier q = new Qualifier();
|
|
||||||
q.setClassid(j.get("classid"));
|
|
||||||
q.setClassname(j.get("classname"));
|
|
||||||
q.setSchemeid(j.get("schemeid"));
|
|
||||||
q.setSchemename(j.get("schemename"));
|
|
||||||
return q;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -3,14 +3,22 @@ package eu.dnetlib.dhp.graph;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class RelatedEntity implements Serializable {
|
public class RelatedEntity implements Serializable {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
// common fields
|
||||||
|
private StructuredProperty title;
|
||||||
|
private String websiteurl; // datasource, organizations, projects
|
||||||
|
|
||||||
// results
|
// results
|
||||||
private StructuredProperty title; // also for projects
|
|
||||||
private String dateofacceptance;
|
private String dateofacceptance;
|
||||||
private String publisher;
|
private String publisher;
|
||||||
private List<StructuredProperty> pid;
|
private List<StructuredProperty> pid;
|
||||||
|
@ -20,11 +28,10 @@ public class RelatedEntity implements Serializable {
|
||||||
|
|
||||||
// datasource
|
// datasource
|
||||||
private String officialname;
|
private String officialname;
|
||||||
private String websiteurl; // also for organizations, projects
|
|
||||||
private Qualifier datasourcetype;
|
private Qualifier datasourcetype;
|
||||||
private Qualifier datasourcetypeui;
|
private Qualifier datasourcetypeui;
|
||||||
//private String aggregatortype;
|
|
||||||
private Qualifier openairecompatibility;
|
private Qualifier openairecompatibility;
|
||||||
|
//private String aggregatortype;
|
||||||
|
|
||||||
// organization
|
// organization
|
||||||
private String legalname;
|
private String legalname;
|
||||||
|
@ -32,10 +39,28 @@ public class RelatedEntity implements Serializable {
|
||||||
private Qualifier country;
|
private Qualifier country;
|
||||||
|
|
||||||
// project
|
// project
|
||||||
|
private String projectTitle;
|
||||||
private String code;
|
private String code;
|
||||||
private String acronym;
|
private String acronym;
|
||||||
private Qualifier contracttype;
|
private Qualifier contracttype;
|
||||||
private String fundingtree;
|
private List<String> fundingtree;
|
||||||
|
|
||||||
|
public static RelatedEntity parse(final String json) {
|
||||||
|
try {
|
||||||
|
return new ObjectMapper().readValue(json, RelatedEntity.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException("invalid RelatedEntity, cannot parse: " + json);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RelatedEntity setId(String id) {
|
||||||
|
this.id = id;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public StructuredProperty getTitle() {
|
public StructuredProperty getTitle() {
|
||||||
return title;
|
return title;
|
||||||
|
@ -199,12 +224,30 @@ public class RelatedEntity implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getFundingtree() {
|
public List<String> getFundingtree() {
|
||||||
return fundingtree;
|
return fundingtree;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntity setFundingtree(String fundingtree) {
|
public RelatedEntity setFundingtree(List<String> fundingtree) {
|
||||||
this.fundingtree = fundingtree;
|
this.fundingtree = fundingtree;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getProjectTitle() {
|
||||||
|
return projectTitle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RelatedEntity setProjectTitle(String projectTitle) {
|
||||||
|
this.projectTitle = projectTitle;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RelatedEntity setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -7,37 +7,34 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
public class SparkGraphIndexingJob {
|
public class SparkXmlRecordBuilderJob {
|
||||||
|
|
||||||
private final static String OUTPUT_BASE_PATH = "/tmp/openaire_provision";
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf()
|
final SparkConf conf = new SparkConf()
|
||||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
||||||
|
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.appName(SparkGraphIndexingJob.class.getSimpleName())
|
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.enableHiveSupport()
|
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
final String hiveDbName = parser.get("hive_db_name");
|
final String outputPath = parser.get("outputPath");
|
||||||
|
|
||||||
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
||||||
if (fs.exists(new Path(OUTPUT_BASE_PATH))) {
|
if (fs.exists(new Path(outputPath))) {
|
||||||
fs.delete(new Path(OUTPUT_BASE_PATH), true);
|
fs.delete(new Path(outputPath), true);
|
||||||
fs.mkdirs(new Path(OUTPUT_BASE_PATH));
|
fs.mkdirs(new Path(outputPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
new GraphJoiner().join(spark, inputPath, hiveDbName, OUTPUT_BASE_PATH);
|
new GraphJoiner(spark, inputPath, outputPath)
|
||||||
|
.adjacencyLists();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true},
|
{"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true},
|
||||||
{"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true},
|
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
|
||||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -26,20 +26,20 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="GraphJoinEntities"/>
|
<start to="adjancency_lists"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="GraphJoinEntities">
|
<action name="adjancency_lists">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>GraphIndexing</name>
|
<name>build_adjacency_lists</name>
|
||||||
<class>eu.dnetlib.dhp.graph.SparkGraphIndexingJob</class>
|
<class>eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob</class>
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
@ -53,8 +53,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -9,11 +9,11 @@ import java.io.InputStreamReader;
|
||||||
|
|
||||||
public class MappingUtilsTest {
|
public class MappingUtilsTest {
|
||||||
|
|
||||||
private MappingUtils utils;
|
private GraphMappingUtils utils;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
utils = new MappingUtils();
|
utils = new GraphMappingUtils();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue