diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index 96d1f150a..110649522 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -1,11 +1,14 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.*; +import net.minidev.json.JSONArray; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; @@ -15,8 +18,10 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; +import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.stream.Collectors; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. @@ -38,7 +43,7 @@ import java.util.List; */ public class GraphJoiner implements Serializable { - public static final int MAX_RELS = 10; + public static final int MAX_RELS = 100; public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) { @@ -63,7 +68,7 @@ public class GraphJoiner implements Serializable { .union(software) .union(publication) .map(e -> new EntityRelEntity().setSource(e._2())) - .map(e -> new ObjectMapper().writeValueAsString(e)) + .map(MappingUtils::serialize) .saveAsTextFile(entitiesPath, GzipCodec.class); JavaPairRDD entities = sc.textFile(entitiesPath) @@ -84,41 +89,24 @@ public class GraphJoiner implements Serializable { relation .join(entities .filter(e -> !e._2().getSource().getDeleted()) - /*.mapToPair(e -> new Tuple2<>(e._1(), new MappingUtils().pruneModel(e._2())))*/) + .mapToPair(e -> new Tuple2<>(e._1(), MappingUtils.pruneModel(e._2())))) .map(s -> new EntityRelEntity() .setRelation(s._2()._1().getRelation()) .setTarget(s._2()._2().getSource())) - .map(e -> new ObjectMapper().writeValueAsString(e)) + .map(MappingUtils::serialize) .saveAsTextFile(joinByTargetPath, GzipCodec.class); JavaPairRDD bySource = sc.textFile(joinByTargetPath) .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); + final String linkedEntitiesPath = outPath + "/linked_entities"; entities .union(bySource) .groupByKey() // by source id - .map(p -> { - final LinkedEntity e = new LinkedEntity(); - final List links = Lists.newArrayList(); - for(EntityRelEntity rel : p._2()) { - if (rel.hasMainEntity() & e.getEntity() == null) { - e.setEntity(rel.getSource()); - } - if (rel.hasRelatedEntity()) { - links.add(new Tuple() - .setRelation(rel.getRelation()) - .setTarget(rel.getTarget())); - } - } - e.setLinks(links); - if (e.getEntity() == null) { - throw new IllegalStateException("missing main entity on '" + p._1() + "'"); - } - return e; - }) - .map(e -> new ObjectMapper().writeValueAsString(e)) - .saveAsTextFile(outPath + "/linked_entities", GzipCodec.class); + .map(GraphJoiner::asLinkedEntityWrapper) + .map(MappingUtils::serialize) + .saveAsTextFile(linkedEntitiesPath, GzipCodec.class); } /** @@ -153,14 +141,35 @@ public class GraphJoiner implements Serializable { private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> { - final String json = item._2().toString(); + final String s = item._2().toString(); + final DocumentContext json = JsonPath.parse(s); return new TypedRow() - .setSourceId(JsonPath.read(json, "$.source")) - .setTargetId(JsonPath.read(json, "$.target")) - .setDeleted(JsonPath.read(json, "$.dataInfo.deletedbyinference")) + .setSourceId(json.read("$.source")) + .setTargetId(json.read("$.target")) + .setDeleted(json.read("$.dataInfo.deletedbyinference")) .setType("relation") - .setOaf(json); + .setOaf(s); }); } + private static LinkedEntityWrapper asLinkedEntityWrapper(Tuple2> p) { + final LinkedEntityWrapper e = new LinkedEntityWrapper(); + final List links = Lists.newArrayList(); + for (EntityRelEntity rel : p._2()) { + if (rel.hasMainEntity() & e.getEntity() == null) { + e.setEntity(rel.getSource()); + } + if (rel.hasRelatedEntity()) { + links.add(new TupleWrapper() + .setRelation(rel.getRelation()) + .setTarget(rel.getTarget())); + } + } + e.setLinks(links); + if (e.getEntity() == null) { + throw new IllegalStateException("missing main entity on '" + p._1() + "'"); + } + return e; + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java new file mode 100644 index 000000000..bdfea7979 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java @@ -0,0 +1,77 @@ +package eu.dnetlib.dhp.graph; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.stream.Collectors; + +public class GraphMapper { + + + public void map(final SparkSession spark, final String outPath) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + sc.textFile(outPath + "/linked_entities") + .map(LinkedEntityWrapper::parse) + .map(GraphMapper::asLinkedEntity) + .map(e -> new ObjectMapper().writeValueAsString(e)) + .saveAsTextFile(outPath + "/linked_entities_types"); + } + + private static LinkedEntity asLinkedEntity(final LinkedEntityWrapper lw) throws JsonProcessingException { + final LinkedEntity le = new LinkedEntity(); + + try { + le.setType(lw.getEntity().getType()); + le.setEntity(parseEntity(lw.getEntity().getOaf(), le.getType())); + le.setLinks(lw.getLinks() + .stream() + .map(l -> new Link() + .setRelation(parseRelation(l.getRelation().getOaf())) + .setRelatedEntity(RelatedEntity.parse(l.getTarget().getOaf()))) + .collect(Collectors.toList())); + return le; + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(new ObjectMapper().writeValueAsString(lw), e); + } + } + + private static Relation parseRelation(final String s) { + try { + return new ObjectMapper().readValue(s, Relation.class); + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode Relation: " + s); + } + } + + private static OafEntity parseEntity(final String json, final String type) { + final ObjectMapper o = new ObjectMapper(); + try { + switch (type) { + case "publication": + return o.readValue(json, Publication.class); + case "dataset": + return o.readValue(json, Dataset.class); + case "otherresearchproduct": + return o.readValue(json, OtherResearchProduct.class); + case "software": + return o.readValue(json, Software.class); + case "datasource": + return o.readValue(json, Datasource.class); + case "project": + return o.readValue(json, Project.class); + case "organization": + return o.readValue(json, Organization.class); + default: + throw new IllegalArgumentException("invalid entity type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode oaf entity: " + json); + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java new file mode 100644 index 000000000..8426fbd12 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java @@ -0,0 +1,30 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; + +public class Link implements Serializable { + + private Relation relation; + + private RelatedEntity relatedEntity; + + public Relation getRelation() { + return relation; + } + + public Link setRelation(Relation relation) { + this.relation = relation; + return this; + } + + public RelatedEntity getRelatedEntity() { + return relatedEntity; + } + + public Link setRelatedEntity(RelatedEntity relatedEntity) { + this.relatedEntity = relatedEntity; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java index 9e6fc0d38..c7c2d1892 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java @@ -1,28 +1,41 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.schema.oaf.OafEntity; + import java.io.Serializable; import java.util.List; public class LinkedEntity implements Serializable { - private TypedRow entity; + private String type; - private List links; + private OafEntity entity; - public TypedRow getEntity() { + private List links; + + public String getType() { + return type; + } + + public LinkedEntity setType(String type) { + this.type = type; + return this; + } + + public OafEntity getEntity() { return entity; } - public LinkedEntity setEntity(TypedRow entity) { + public LinkedEntity setEntity(OafEntity entity) { this.entity = entity; return this; } - public List getLinks() { + public List getLinks() { return links; } - public LinkedEntity setLinks(List links) { + public LinkedEntity setLinks(List links) { this.links = links; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java new file mode 100644 index 000000000..17853208c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java @@ -0,0 +1,40 @@ +package eu.dnetlib.dhp.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +public class LinkedEntityWrapper implements Serializable { + + private TypedRow entity; + + private List links; + + public static LinkedEntityWrapper parse(final String s) { + try { + return new ObjectMapper().readValue(s, LinkedEntityWrapper.class); + } catch (IOException e) { + throw new IllegalArgumentException("unable to decode LinkedEntityWrapper: " + s); + } + } + + public TypedRow getEntity() { + return entity; + } + + public LinkedEntityWrapper setEntity(TypedRow entity) { + this.entity = entity; + return this; + } + + public List getLinks() { + return links; + } + + public LinkedEntityWrapper setLinks(List links) { + this.links = links; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java index 756506c12..9f7ca4d0b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.DocumentContext; @@ -14,17 +15,16 @@ import java.util.stream.Collectors; public class MappingUtils { - public EntityRelEntity pruneModel(EntityRelEntity e) throws JsonProcessingException { + public static EntityRelEntity pruneModel(EntityRelEntity e) { final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); - final RelatedEntity re = new RelatedEntity(); + final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); switch (e.getSource().getType()) { case "publication": case "dataset": case "otherresearchproduct": case "software": - mapTitle(j, re); re.setDateofacceptance(j.read("$.dateofacceptance.value")); re.setPublisher(j.read("$.publisher.value")); @@ -48,51 +48,62 @@ public class MappingUtils { case "datasource": re.setOfficialname(j.read("$.officialname.value")); re.setWebsiteurl(j.read("$.websiteurl.value")); - re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); break; case "organization": + re.setLegalname(j.read("$.legalname.value")); + re.setLegalshortname(j.read("$.legalshortname.value")); + re.setCountry(asQualifier(j.read("$.country"))); break; case "project": - mapTitle(j, re); + re.setProjectTitle(j.read("$.title.value")); + re.setCode(j.read("$.code.value")); + re.setAcronym(j.read("$.acronym.value")); + re.setContracttype(asQualifier(j.read("$.contracttype"))); + + JSONArray f = j.read("$.fundingtree"); + if (!f.isEmpty()) { + re.setFundingtree(f.stream() + .map(s -> s.toString()) + .collect(Collectors.toList())); + } + break; } - return new EntityRelEntity().setSource( new TypedRow() .setSourceId(e.getSource().getSourceId()) .setDeleted(e.getSource().getDeleted()) .setType(e.getSource().getType()) - .setOaf(new ObjectMapper().writeValueAsString(re))); + .setOaf(serialize(re))); } - private KeyValue asKV(LinkedHashMap j) { + private static KeyValue asKV(LinkedHashMap j) { final KeyValue kv = new KeyValue(); kv.setKey((String) j.get("key")); kv.setValue((String) j.get("value")); return kv; } - private void mapTitle(DocumentContext j, RelatedEntity re) { - JSONArray a = j.read("$.title"); + private static void mapTitle(DocumentContext j, RelatedEntity re) { + final JSONArray a = j.read("$.title"); if (!a.isEmpty()) { re.setTitle(asStructuredProperty((LinkedHashMap) a.get(0))); } } - private StructuredProperty asStructuredProperty(LinkedHashMap j) { + private static StructuredProperty asStructuredProperty(LinkedHashMap j) { final StructuredProperty sp = new StructuredProperty(); sp.setValue((String) j.get("value")); sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); return sp; - } - public Qualifier asQualifier(LinkedHashMap j) { - Qualifier q = new Qualifier(); + public static Qualifier asQualifier(LinkedHashMap j) { + final Qualifier q = new Qualifier(); q.setClassid(j.get("classid")); q.setClassname(j.get("classname")); q.setSchemeid(j.get("schemeid")); @@ -100,4 +111,14 @@ public class MappingUtils { return q; } + public static String serialize(final Object o) { + try { + return new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .writeValueAsString(o); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); + } + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java index a441392b2..afd6e310b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java @@ -3,14 +3,22 @@ package eu.dnetlib.dhp.graph; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import org.codehaus.jackson.map.ObjectMapper; +import java.io.IOException; import java.io.Serializable; import java.util.List; public class RelatedEntity implements Serializable { + private String id; + private String type; + + // common fields + private StructuredProperty title; + private String websiteurl; // datasource, organizations, projects + // results - private StructuredProperty title; // also for projects private String dateofacceptance; private String publisher; private List pid; @@ -20,11 +28,10 @@ public class RelatedEntity implements Serializable { // datasource private String officialname; - private String websiteurl; // also for organizations, projects private Qualifier datasourcetype; private Qualifier datasourcetypeui; - //private String aggregatortype; private Qualifier openairecompatibility; + //private String aggregatortype; // organization private String legalname; @@ -32,10 +39,28 @@ public class RelatedEntity implements Serializable { private Qualifier country; // project + private String projectTitle; private String code; private String acronym; private Qualifier contracttype; - private String fundingtree; + private List fundingtree; + + public static RelatedEntity parse(final String json) { + try { + return new ObjectMapper().readValue(json, RelatedEntity.class); + } catch (IOException e) { + throw new IllegalArgumentException("invalid RelatedEntity, cannot parse: " + json); + } + } + + public String getId() { + return id; + } + + public RelatedEntity setId(String id) { + this.id = id; + return this; + } public StructuredProperty getTitle() { return title; @@ -199,12 +224,30 @@ public class RelatedEntity implements Serializable { return this; } - public String getFundingtree() { + public List getFundingtree() { return fundingtree; } - public RelatedEntity setFundingtree(String fundingtree) { + public RelatedEntity setFundingtree(List fundingtree) { this.fundingtree = fundingtree; return this; } + + public String getProjectTitle() { + return projectTitle; + } + + public RelatedEntity setProjectTitle(String projectTitle) { + this.projectTitle = projectTitle; + return this; + } + + public String getType() { + return type; + } + + public RelatedEntity setType(String type) { + this.type = type; + return this; + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java index 1d55dda89..3915bef08 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java @@ -38,6 +38,7 @@ public class SparkGraphIndexingJob { } new GraphJoiner().join(spark, inputPath, hiveDbName, OUTPUT_BASE_PATH); + new GraphMapper().map(spark, OUTPUT_BASE_PATH); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java similarity index 70% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java index 1eb0491a7..eb60e1474 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.graph; import java.io.Serializable; -public class Tuple implements Serializable { +public class TupleWrapper implements Serializable { private TypedRow relation; @@ -13,7 +13,7 @@ public class Tuple implements Serializable { return relation; } - public Tuple setRelation(TypedRow relation) { + public TupleWrapper setRelation(TypedRow relation) { this.relation = relation; return this; } @@ -22,7 +22,7 @@ public class Tuple implements Serializable { return target; } - public Tuple setTarget(TypedRow target) { + public TupleWrapper setTarget(TypedRow target) { this.target = target; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java index 2edb0aa70..199d12132 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java @@ -39,4 +39,14 @@ public class MappingUtilsTest { System.out.println(out); } + + @Test + public void testParseRelatedEntity() throws IOException { + + final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("related_entity.json")); + final RelatedEntity e = new ObjectMapper().readValue(in, RelatedEntity.class); + + System.out.println(e); + + } } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json new file mode 100644 index 000000000..25c92baa3 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/related_entity.json @@ -0,0 +1,5 @@ +{ + "id": "20|nih_________::6b8108b6d6399f7163a6a7ccdd0efc2d", + "type": "organization", + "legalname": "MCGILL UNIVERSITY" +} \ No newline at end of file