forked from D-Net/dnet-hadoop
Merge branch 'provision_indexing' of https://code-repo.d4science.org/D-Net/dnet-hadoop into HEAD
Conflicts: dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java
This commit is contained in:
commit
7bacd6812e
|
@ -0,0 +1,77 @@
|
||||||
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class GraphMapper {
|
||||||
|
|
||||||
|
|
||||||
|
public void map(final SparkSession spark, final String outPath) {
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
sc.textFile(outPath + "/linked_entities")
|
||||||
|
.map(LinkedEntityWrapper::parse)
|
||||||
|
.map(GraphMapper::asLinkedEntity)
|
||||||
|
.map(e -> new ObjectMapper().writeValueAsString(e))
|
||||||
|
.saveAsTextFile(outPath + "/linked_entities_types");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LinkedEntity asLinkedEntity(final LinkedEntityWrapper lw) throws JsonProcessingException {
|
||||||
|
final LinkedEntity le = new LinkedEntity();
|
||||||
|
|
||||||
|
try {
|
||||||
|
le.setType(lw.getEntity().getType());
|
||||||
|
le.setEntity(parseEntity(lw.getEntity().getOaf(), le.getType()));
|
||||||
|
le.setLinks(lw.getLinks()
|
||||||
|
.stream()
|
||||||
|
.map(l -> new Link()
|
||||||
|
.setRelation(parseRelation(l.getRelation().getOaf()))
|
||||||
|
.setRelatedEntity(RelatedEntity.parse(l.getTarget().getOaf())))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
return le;
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new IllegalArgumentException(new ObjectMapper().writeValueAsString(lw), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Relation parseRelation(final String s) {
|
||||||
|
try {
|
||||||
|
return new ObjectMapper().readValue(s, Relation.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException("unable to decode Relation: " + s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OafEntity parseEntity(final String json, final String type) {
|
||||||
|
final ObjectMapper o = new ObjectMapper();
|
||||||
|
try {
|
||||||
|
switch (type) {
|
||||||
|
case "publication":
|
||||||
|
return o.readValue(json, Publication.class);
|
||||||
|
case "dataset":
|
||||||
|
return o.readValue(json, Dataset.class);
|
||||||
|
case "otherresearchproduct":
|
||||||
|
return o.readValue(json, OtherResearchProduct.class);
|
||||||
|
case "software":
|
||||||
|
return o.readValue(json, Software.class);
|
||||||
|
case "datasource":
|
||||||
|
return o.readValue(json, Datasource.class);
|
||||||
|
case "project":
|
||||||
|
return o.readValue(json, Project.class);
|
||||||
|
case "organization":
|
||||||
|
return o.readValue(json, Organization.class);
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("invalid entity type: " + type);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException("unable to decode oaf entity: " + json);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class Link implements Serializable {
|
||||||
|
|
||||||
|
private Relation relation;
|
||||||
|
|
||||||
|
private RelatedEntity relatedEntity;
|
||||||
|
|
||||||
|
public Relation getRelation() {
|
||||||
|
return relation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Link setRelation(Relation relation) {
|
||||||
|
this.relation = relation;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RelatedEntity getRelatedEntity() {
|
||||||
|
return relatedEntity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Link setRelatedEntity(RelatedEntity relatedEntity) {
|
||||||
|
this.relatedEntity = relatedEntity;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,28 +1,41 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class LinkedEntity implements Serializable {
|
public class LinkedEntity implements Serializable {
|
||||||
|
|
||||||
private TypedRow entity;
|
private String type;
|
||||||
|
|
||||||
private List<Tuple> links;
|
private OafEntity entity;
|
||||||
|
|
||||||
public TypedRow getEntity() {
|
private List<Link> links;
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkedEntity setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OafEntity getEntity() {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinkedEntity setEntity(TypedRow entity) {
|
public LinkedEntity setEntity(OafEntity entity) {
|
||||||
this.entity = entity;
|
this.entity = entity;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Tuple> getLinks() {
|
public List<Link> getLinks() {
|
||||||
return links;
|
return links;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinkedEntity setLinks(List<Tuple> links) {
|
public LinkedEntity setLinks(List<Link> links) {
|
||||||
this.links = links;
|
this.links = links;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class LinkedEntityWrapper implements Serializable {
|
||||||
|
|
||||||
|
private TypedRow entity;
|
||||||
|
|
||||||
|
private List<TupleWrapper> links;
|
||||||
|
|
||||||
|
public static LinkedEntityWrapper parse(final String s) {
|
||||||
|
try {
|
||||||
|
return new ObjectMapper().readValue(s, LinkedEntityWrapper.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException("unable to decode LinkedEntityWrapper: " + s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public TypedRow getEntity() {
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkedEntityWrapper setEntity(TypedRow entity) {
|
||||||
|
this.entity = entity;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<TupleWrapper> getLinks() {
|
||||||
|
return links;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkedEntityWrapper setLinks(List<TupleWrapper> links) {
|
||||||
|
this.links = links;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,7 +2,7 @@ package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Tuple implements Serializable {
|
public class TupleWrapper implements Serializable {
|
||||||
|
|
||||||
private TypedRow relation;
|
private TypedRow relation;
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ public class Tuple implements Serializable {
|
||||||
return relation;
|
return relation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Tuple setRelation(TypedRow relation) {
|
public TupleWrapper setRelation(TypedRow relation) {
|
||||||
this.relation = relation;
|
this.relation = relation;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ public class Tuple implements Serializable {
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Tuple setTarget(TypedRow target) {
|
public TupleWrapper setTarget(TypedRow target) {
|
||||||
this.target = target;
|
this.target = target;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
|
@ -39,4 +39,14 @@ public class MappingUtilsTest {
|
||||||
System.out.println(out);
|
System.out.println(out);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseRelatedEntity() throws IOException {
|
||||||
|
|
||||||
|
final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("related_entity.json"));
|
||||||
|
final RelatedEntity e = new ObjectMapper().readValue(in, RelatedEntity.class);
|
||||||
|
|
||||||
|
System.out.println(e);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"id": "20|nih_________::6b8108b6d6399f7163a6a7ccdd0efc2d",
|
||||||
|
"type": "organization",
|
||||||
|
"legalname": "MCGILL UNIVERSITY"
|
||||||
|
}
|
Loading…
Reference in New Issue