diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java new file mode 100644 index 000000000..bdfea7979 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java @@ -0,0 +1,77 @@ +package eu.dnetlib.dhp.graph; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.stream.Collectors; + +public class GraphMapper { + + + public void map(final SparkSession spark, final String outPath) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + sc.textFile(outPath + "/linked_entities") + .map(LinkedEntityWrapper::parse) + .map(GraphMapper::asLinkedEntity) + .map(e -> new ObjectMapper().writeValueAsString(e)) + .saveAsTextFile(outPath + "/linked_entities_types"); + } + + private static LinkedEntity asLinkedEntity(final LinkedEntityWrapper lw) throws JsonProcessingException { + final LinkedEntity le = new LinkedEntity(); + + try { + le.setType(lw.getEntity().getType()); + le.setEntity(parseEntity(lw.getEntity().getOaf(), le.getType())); + le.setLinks(lw.getLinks() + .stream() + .map(l -> new Link() + .setRelation(parseRelation(l.getRelation().getOaf())) + .setRelatedEntity(RelatedEntity.parse(l.getTarget().getOaf()))) + .collect(Collectors.toList())); + return le; + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(new ObjectMapper().writeValueAsString(lw), e); + } + } + + private static Relation parseRelation(final String s) { + try { + return new ObjectMapper().readValue(s, Relation.class); + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode Relation: " + s); + } + } + + private static OafEntity parseEntity(final String json, final String type) { + final ObjectMapper o = new ObjectMapper(); + try { + switch (type) { + case "publication": + return o.readValue(json, Publication.class); + case "dataset": + return o.readValue(json, Dataset.class); + case "otherresearchproduct": + return o.readValue(json, OtherResearchProduct.class); + case "software": + return o.readValue(json, Software.class); + case "datasource": + return o.readValue(json, Datasource.class); + case "project": + return o.readValue(json, Project.class); + case "organization": + return o.readValue(json, Organization.class); + default: + throw new IllegalArgumentException("invalid entity type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode oaf entity: " + json); + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java new file mode 100644 index 000000000..8426fbd12 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java @@ -0,0 +1,30 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; + +public class Link implements Serializable { + + private Relation relation; + + private RelatedEntity relatedEntity; + + public Relation getRelation() { + return relation; + } + + public Link setRelation(Relation relation) { + this.relation = relation; + return this; + } + + public RelatedEntity getRelatedEntity() { + return relatedEntity; + } + + public Link setRelatedEntity(RelatedEntity relatedEntity) { + this.relatedEntity = relatedEntity; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java index 9e6fc0d38..c7c2d1892 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java @@ -1,28 +1,41 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.schema.oaf.OafEntity; + import java.io.Serializable; import java.util.List; public class LinkedEntity implements Serializable { - private TypedRow entity; + private String type; - private List links; + private OafEntity entity; - public TypedRow getEntity() { + private List links; + + public String getType() { + return type; + } + + public LinkedEntity setType(String type) { + this.type = type; + return this; + } + + public OafEntity getEntity() { return entity; } - public LinkedEntity setEntity(TypedRow entity) { + public LinkedEntity setEntity(OafEntity entity) { this.entity = entity; return this; } - public List getLinks() { + public List getLinks() { return links; } - public LinkedEntity setLinks(List links) { + public LinkedEntity setLinks(List links) { this.links = links; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java new file mode 100644 index 000000000..17853208c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java @@ -0,0 +1,40 @@ +package eu.dnetlib.dhp.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +public class LinkedEntityWrapper implements Serializable { + + private TypedRow entity; + + private List links; + + public static LinkedEntityWrapper parse(final String s) { + try { + return new ObjectMapper().readValue(s, LinkedEntityWrapper.class); + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode LinkedEntityWrapper: " + s); + } + } + + public TypedRow getEntity() { + return entity; + } + + public LinkedEntityWrapper setEntity(TypedRow entity) { + this.entity = entity; + return this; + } + + public List getLinks() { + return links; + } + + public LinkedEntityWrapper setLinks(List links) { + this.links = links; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java similarity index 70% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java index 1eb0491a7..eb60e1474 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.graph; import java.io.Serializable; -public class Tuple implements Serializable { +public class TupleWrapper implements Serializable { private TypedRow relation; @@ -13,7 +13,7 @@ public class Tuple implements Serializable { return relation; } - public Tuple setRelation(TypedRow relation) { + public TupleWrapper setRelation(TypedRow relation) { this.relation = relation; return this; } @@ -22,7 +22,7 @@ public class Tuple implements Serializable { return target; } - public Tuple setTarget(TypedRow target) { + public TupleWrapper setTarget(TypedRow target) { this.target = target; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java index 0deb3d81a..fdff4d984 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java @@ -39,4 +39,14 @@ public class MappingUtilsTest { System.out.println(out); } + + @Test + public void testParseRelatedEntity() throws IOException { + + final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("related_entity.json")); + final RelatedEntity e = new ObjectMapper().readValue(in, RelatedEntity.class); + + System.out.println(e); + + } } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json new file mode 100644 index 000000000..25c92baa3 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json @@ -0,0 +1,5 @@ +{ + "id": "20|nih_________::6b8108b6d6399f7163a6a7ccdd0efc2d", + "type": "organization", + "legalname": "MCGILL UNIVERSITY" +} \ No newline at end of file