diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java deleted file mode 100644 index bdfea7979..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMapper.java +++ /dev/null @@ -1,77 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -import java.io.IOException; -import java.util.stream.Collectors; - -public class GraphMapper { - - - public void map(final SparkSession spark, final String outPath) { - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - sc.textFile(outPath + "/linked_entities") - .map(LinkedEntityWrapper::parse) - .map(GraphMapper::asLinkedEntity) - .map(e -> new ObjectMapper().writeValueAsString(e)) - .saveAsTextFile(outPath + "/linked_entities_types"); - } - - private static LinkedEntity asLinkedEntity(final LinkedEntityWrapper lw) throws JsonProcessingException { - final LinkedEntity le = new LinkedEntity(); - - try { - le.setType(lw.getEntity().getType()); - le.setEntity(parseEntity(lw.getEntity().getOaf(), le.getType())); - le.setLinks(lw.getLinks() - .stream() - .map(l -> new Link() - .setRelation(parseRelation(l.getRelation().getOaf())) - .setRelatedEntity(RelatedEntity.parse(l.getTarget().getOaf()))) - .collect(Collectors.toList())); - return le; - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(new ObjectMapper().writeValueAsString(lw), e); - } - } - - private static Relation parseRelation(final String s) { - try { - return new ObjectMapper().readValue(s, Relation.class); - } catch (IOException e) { - throw new IllegalArgumentException("unable to decode Relation: " + s); - } - } - - private static OafEntity parseEntity(final String json, final String type) { - final ObjectMapper o = new ObjectMapper(); - try { - switch (type) { - case "publication": - return o.readValue(json, Publication.class); - case "dataset": - return o.readValue(json, Dataset.class); - case "otherresearchproduct": - return o.readValue(json, OtherResearchProduct.class); - case "software": - return o.readValue(json, Software.class); - case "datasource": - return o.readValue(json, Datasource.class); - case "project": - return o.readValue(json, Project.class); - case "organization": - return o.readValue(json, Organization.class); - default: - throw new IllegalArgumentException("invalid entity type: " + type); - } - } catch (IOException e) { - throw new IllegalArgumentException("unable to decode oaf entity: " + json); - } - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java deleted file mode 100644 index 8426fbd12..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Link.java +++ /dev/null @@ -1,30 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -import java.io.Serializable; - -public class Link implements Serializable { - - private Relation relation; - - private RelatedEntity relatedEntity; - - public Relation getRelation() { - return relation; - } - - public Link setRelation(Relation relation) { - this.relation = relation; - return this; - } - - public RelatedEntity getRelatedEntity() { - return relatedEntity; - } - - public Link setRelatedEntity(RelatedEntity relatedEntity) { - this.relatedEntity = relatedEntity; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java index c7c2d1892..9e6fc0d38 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java @@ -1,41 +1,28 @@ package eu.dnetlib.dhp.graph; -import eu.dnetlib.dhp.schema.oaf.OafEntity; - import java.io.Serializable; import java.util.List; public class LinkedEntity implements Serializable { - private String type; + private TypedRow entity; - private OafEntity entity; + private List links; - private List links; - - public String getType() { - return type; - } - - public LinkedEntity setType(String type) { - this.type = type; - return this; - } - - public OafEntity getEntity() { + public TypedRow getEntity() { return entity; } - public LinkedEntity setEntity(OafEntity entity) { + public LinkedEntity setEntity(TypedRow entity) { this.entity = entity; return this; } - public List getLinks() { + public List getLinks() { return links; } - public LinkedEntity setLinks(List links) { + public LinkedEntity setLinks(List links) { this.links = links; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java deleted file mode 100644 index 17853208c..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntityWrapper.java +++ /dev/null @@ -1,40 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -public class LinkedEntityWrapper implements Serializable { - - private TypedRow entity; - - private List links; - - public static LinkedEntityWrapper parse(final String s) { - try { - return new ObjectMapper().readValue(s, LinkedEntityWrapper.class); - } catch (IOException e) { - throw new IllegalArgumentException("unable to decode LinkedEntityWrapper: " + s); - } - } - - public TypedRow getEntity() { - return entity; - } - - public LinkedEntityWrapper setEntity(TypedRow entity) { - this.entity = entity; - return this; - } - - public List getLinks() { - return links; - } - - public LinkedEntityWrapper setLinks(List links) { - this.links = links; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java index 38bc2bae2..2a518eb92 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java @@ -34,7 +34,8 @@ public class SparkXmlRecordBuilderJob { } new GraphJoiner(spark, inputPath, outputPath) - .adjacencyLists(); + .adjacencyLists() + .asXML(); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java new file mode 100644 index 000000000..1eb0491a7 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.graph; + +import java.io.Serializable; + +public class Tuple implements Serializable { + + private TypedRow relation; + + private TypedRow target; + + + public TypedRow getRelation() { + return relation; + } + + public Tuple setRelation(TypedRow relation) { + this.relation = relation; + return this; + } + + public TypedRow getTarget() { + return target; + } + + public Tuple setTarget(TypedRow target) { + this.target = target; + return this; + } +}