From 799929c1e3e40534f89be315d1884eb135e63516 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 21 Jan 2020 16:35:44 +0100 Subject: [PATCH] joining entities using T x R x S method with groupByKey --- dhp-schemas/pom.xml | 2 +- .../eu/dnetlib/dhp/schema/dli/Entity.java | 118 ----------- .../java/eu/dnetlib/dhp/schema/dli/Pid.java | 33 ---- .../eu/dnetlib/dhp/schema/dli/Provenance.java | 35 ---- .../eu/dnetlib/dhp/schema/dli/Relation.java | 47 ----- .../dhp/schema/dli/RelationSemantic.java | 16 -- .../eu/dnetlib/dhp/schema/dli/Subject.java | 35 ---- .../job-override.properties | 1 - dhp-workflows/dhp-graph-provision/pom.xml | 4 + .../java/eu/dnetlib/dhp/graph/EntityNode.java | 4 - .../eu/dnetlib/dhp/graph/EntityRelEntity.java | 32 +-- .../eu/dnetlib/dhp/graph/GraphJoiner.java | 186 ++++++++---------- .../eu/dnetlib/dhp/graph/LinkedEntity.java | 29 +++ .../eu/dnetlib/dhp/graph/RelatedEntity.java | 69 ------- .../dhp/graph/SparkGraphIndexingJob.java | 17 +- .../main/java/eu/dnetlib/dhp/graph/Tuple.java | 31 +++ .../java/eu/dnetlib/dhp/graph/TypedRow.java | 52 +++-- .../dhp/graph/oozie_app/config-default.xml | 8 + .../dnetlib/dhp/graph/oozie_app/workflow.xml | 19 +- 19 files changed, 238 insertions(+), 500 deletions(-) delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml index ec5af8d3c..491cbe668 100644 --- a/dhp-schemas/pom.xml +++ b/dhp-schemas/pom.xml @@ -5,7 +5,7 @@ eu.dnetlib.dhp dhp - 1.1.5-SNAPSHOT + 1.0.5-SNAPSHOT ../ diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java deleted file mode 100644 index 894d54eaf..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java +++ /dev/null @@ -1,118 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -import java.io.Serializable; -import java.util.List; - -public class Entity implements Serializable { - - private String identifier; - - private List pid; - - private List title; - - private List date; - - private String typology; - - private List authors; - - private List subject; - - private String description; - - private String completionStatus; - - private List collectedFrom; - - private List publisher; - - - public String getIdentifier() { - return identifier; - } - - public void setIdentifier(String identifier) { - this.identifier = identifier; - } - - public List getPid() { - return pid; - } - - public void setPid(List pid) { - this.pid = pid; - } - - public List getTitle() { - return title; - } - - public void setTitle(List title) { - this.title = title; - } - - public List getDate() { - return date; - } - - public void setDate(List date) { - this.date = date; - } - - public String getTypology() { - return typology; - } - - public void setTypology(String typology) { - this.typology = typology; - } - - public List getAuthors() { - return authors; - } - - public void setAuthors(List authors) { - this.authors = authors; - } - - public List getSubject() { - return subject; - } - - public void setSubject(List subject) { - this.subject = subject; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public List getCollectedFrom() { - return collectedFrom; - } - - public void setCollectedFrom(List collectedFrom) { - this.collectedFrom = collectedFrom; - } - - public List getPublisher() { - return publisher; - } - - public void setPublisher(List publisher) { - this.publisher = publisher; - } - - public String getCompletionStatus() { - return completionStatus; - } - - public void setCompletionStatus(String completionStatus) { - this.completionStatus = completionStatus; - } -} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java deleted file mode 100644 index 252245f45..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java +++ /dev/null @@ -1,33 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -import eu.dnetlib.dhp.utils.DHPUtils; -import org.apache.commons.lang3.StringUtils; - -public class Pid { - - private String pid; - - private String pidType; - - public String getPid() { - return pid; - } - - public void setPid(String pid) { - this.pid = pid; - } - - public String getPidType() { - return pidType; - } - - public void setPidType(String pidType) { - this.pidType = pidType; - } - - public String generateId() { - if(StringUtils.isEmpty(pid) || StringUtils.isEmpty(pidType)) - return null; - return DHPUtils.md5(String.format("%s::%s", pid, pidType)); - } -} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java deleted file mode 100644 index 300b1134b..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java +++ /dev/null @@ -1,35 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -public class Provenance { - - private String datasourceId; - - private String datasourceName; - - private String completionStatus; - - - public String getDatasourceId() { - return datasourceId; - } - - public void setDatasourceId(String datasourceId) { - this.datasourceId = datasourceId; - } - - public String getDatasourceName() { - return datasourceName; - } - - public void setDatasourceName(String datasourceName) { - this.datasourceName = datasourceName; - } - - public String getCompletionStatus() { - return completionStatus; - } - - public void setCompletionStatus(String completionStatus) { - this.completionStatus = completionStatus; - } -} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java deleted file mode 100644 index b83cccb73..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java +++ /dev/null @@ -1,47 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -import java.io.Serializable; -import java.util.List; - -public class Relation implements Serializable { - - private String source; - - private String target; - - private List provenance; - - private RelationSemantic semantic; - - public String getSource() { - return source; - } - - public void setSource(String source) { - this.source = source; - } - - public String getTarget() { - return target; - } - - public void setTarget(String target) { - this.target = target; - } - - public List getProvenance() { - return provenance; - } - - public void setProvenance(List provenance) { - this.provenance = provenance; - } - - public RelationSemantic getSemantic() { - return semantic; - } - - public void setSemantic(RelationSemantic semantic) { - this.semantic = semantic; - } -} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java deleted file mode 100644 index ff871ef2d..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java +++ /dev/null @@ -1,16 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -import java.io.Serializable; - -public class RelationSemantic extends Subject implements Serializable { - - public String inverse; - - public String getInverse() { - return inverse; - } - - public void setInverse(String inverse) { - this.inverse = inverse; - } -} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java deleted file mode 100644 index bd89bc6dd..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java +++ /dev/null @@ -1,35 +0,0 @@ -package eu.dnetlib.dhp.schema.dli; - -import java.io.Serializable; - -public class Subject implements Serializable { - - private String schema; - - private String value; - - public Subject() { - - } - - public Subject(String schema, String value) { - this.schema = schema; - this.value = value; - } - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } -} diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties index 882053c1a..1870b0e6e 100644 --- a/dhp-workflows/dhp-graph-provision/job-override.properties +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -1,5 +1,4 @@ sparkDriverMemory=7G sparkExecutorMemory=7G -sparkExecutorMemoryOverhead=5G hive_db_name=claudio sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06 \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index d47463774..62d8ac2ae 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -19,6 +19,10 @@ org.apache.spark spark-sql_2.11 + + com.jayway.jsonpath + json-path + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java deleted file mode 100644 index be1babae2..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java +++ /dev/null @@ -1,4 +0,0 @@ -package eu.dnetlib.dhp.graph; - -public class EntityNode { -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java index ac89e4351..e8ecc2e30 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java @@ -1,20 +1,30 @@ package eu.dnetlib.dhp.graph; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.Relation; - import java.io.Serializable; public class EntityRelEntity implements Serializable { private TypedRow source; - private Relation relation; + private TypedRow relation; private TypedRow target; + public EntityRelEntity() { + } + public EntityRelEntity(TypedRow source) { this.source = source; } + + //helpers + public Boolean hasMainEntity() { + return getSource() != null & getRelation() == null & getTarget() == null; + } + + public Boolean hasRelatedEntity() { + return getSource() == null & getRelation() != null & getTarget() != null; + } + + public TypedRow getSource() { return source; } @@ -24,11 +34,11 @@ public class EntityRelEntity implements Serializable { return this; } - public Relation getRelation() { + public TypedRow getRelation() { return relation; } - public EntityRelEntity setRelation(Relation relation) { + public EntityRelEntity setRelation(TypedRow relation) { this.relation = relation; return this; } @@ -42,12 +52,4 @@ public class EntityRelEntity implements Serializable { return this; } - @Override - public String toString() { - try { - return new ObjectMapper().writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index 5764642dc..aca436f52 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -1,139 +1,119 @@ package eu.dnetlib.dhp.graph; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.*; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.jayway.jsonpath.JsonPath; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.io.Serializable; +import java.util.List; public class GraphJoiner implements Serializable { - public static final int MAX_RELS = 100; + public static final int MAX_RELS = 10; public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - /* - JavaPairRDD entities = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) - .map(oaf -> new TypedRow("publication", oaf)) - .mapToPair(toPair()); + final String entityIdPath = "$.id"; - */ + JavaPairRDD datasource = readPathEntity(sc, entityIdPath, inputPath, "datasource"); + JavaPairRDD organization = readPathEntity(sc, entityIdPath, inputPath, "organization"); + JavaPairRDD project = readPathEntity(sc, entityIdPath, inputPath, "project"); + JavaPairRDD dataset = readPathEntity(sc, entityIdPath, inputPath, "dataset"); + JavaPairRDD otherresearchproduct = readPathEntity(sc, entityIdPath, inputPath, "otherresearchproduct"); + JavaPairRDD software = readPathEntity(sc, entityIdPath, inputPath, "software"); + JavaPairRDD publication = readPathEntity(sc, entityIdPath, inputPath, "publication"); - JavaPairRDD entities = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) - .map(oaf -> new TypedRow("datasource", oaf)) - .mapToPair(toPair()) - .union(sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) - .map(oaf -> new TypedRow("organization", oaf)) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/project", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Project.class)) - .map(oaf -> new TypedRow("project", oaf)) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) - .map(oaf -> new TypedRow("dataset", oaf)) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) - .map(oaf -> new TypedRow("otherresearchproduct", oaf)) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) - .map(oaf -> new TypedRow("software", oaf)) - .mapToPair(toPair())); - /* - .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) - .map(oaf -> new TypedRow("publication", oaf)) - .mapToPair(toPair())); + final String entitiesPath = outPath + "/entities"; + datasource + .union(organization) + .union(project) + .union(dataset) + .union(otherresearchproduct) + .union(software) + .union(publication) + .map(e -> new EntityRelEntity().setSource(e._2())) + .map(e -> new ObjectMapper().writeValueAsString(e)) + .saveAsTextFile(entitiesPath, GzipCodec.class); - */ + JavaPairRDD entities = sc.textFile(entitiesPath) + .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) + .mapToPair(t -> new Tuple2<>(t.getSource().getSource(), t)); - /* - JavaRDD rels = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) - .map(oaf -> new TypedRow("relation", oaf)) - .mapToPair(toPair()) + final JavaPairRDD relation = readPathRelation(sc, inputPath) + .map(p -> new EntityRelEntity().setRelation(p)) + .mapToPair(p -> new Tuple2<>(p.getRelation().getSource(), p)) .groupByKey() - .map(t -> Iterables.limit(t._2(), MAX_RELS)) - .flatMap(t -> t.iterator()) - .map(t -> (Relation) t.getOaf()); + .map(p -> Iterables.limit(p._2(), MAX_RELS)) + .flatMap(p -> p.iterator()) + .mapToPair(p -> new Tuple2<>(p.getRelation().getTarget(), p)); - spark.createDataset(rels.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + ".relation_100"); - */ + final String joinByTargetPath = outPath + "/join_by_target"; + relation.join(entities) + .map(s -> new EntityRelEntity() + .setRelation(s._2()._1().getRelation()) + .setTarget(s._2()._2().getSource())) + .map(e -> new ObjectMapper().writeValueAsString(e)) + .saveAsTextFile(joinByTargetPath, GzipCodec.class); - JavaPairRDD bounded_rels = spark.table(hiveDbName + ".relation_" + MAX_RELS) - .as(Encoders.bean(Relation.class)) - .javaRDD() - .map(r -> new TypedRow("relation", r)) - .mapToPair(toPair()); - // build the adjacency list: e -> r - JavaPairRDD>> adjacency_list = entities.leftOuterJoin(bounded_rels); + JavaPairRDD bySource = sc.textFile(joinByTargetPath) + .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) + .mapToPair(t -> new Tuple2<>(t.getRelation().getSource(), t)); - JavaRDD linked_entities = adjacency_list - .mapToPair(toPairTarget()) // make rel.targetid explicit so that we can join it - .leftOuterJoin(entities) // again with the entities to get the target entity - .map(l -> toEntityRelEntity(l)); // and map it to a more readable representation - - spark.createDataFrame(linked_entities, EntityRelEntity.class) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + ".linked_entities"); + entities + .union(bySource) + .groupByKey() // by source id + .map(p -> { + final LinkedEntity e = new LinkedEntity(); + final List links = Lists.newArrayList(); + for(EntityRelEntity rel : p._2()) { + if (rel.hasMainEntity() & e.getEntity() == null) { + e.setEntity(rel.getSource()); + } + if (rel.hasRelatedEntity()) { + links.add(new Tuple() + .setRelation(rel.getRelation()) + .setTarget(rel.getTarget())); + } + } + e.setLinks(links); + if (e.getEntity() == null) { + throw new IllegalStateException("missing main entity on '" + p._1() + "'"); + } + return e; + }) + .map(e -> new ObjectMapper().writeValueAsString(e)) + .saveAsTextFile(outPath + "/linked_entities", GzipCodec.class); } - private EntityRelEntity toEntityRelEntity(Tuple2>>, Optional>> l) { - // extract the entity source - final EntityRelEntity res = new EntityRelEntity(l._2()._1()._2()._1()); - - if(l._2()._1()._2()._2().isPresent() && l._2()._2().isPresent()) { - - // extract the relationship - res.setRelation((Relation) l._2()._1()._2()._2().get().getOaf()); - - // extract the related entity - res.setTarget(l._2()._2().get()); - } - - return res; + private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String idPath, final String inputPath, final String type) { + return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class) + .mapToPair((PairFunction, String, TypedRow>) item -> { + final String json = item._2().toString(); + final String id = JsonPath.read(json, idPath); + return new Tuple2<>(id, new TypedRow(id, type, json)); + }); } - private PairFunction>>, String, Tuple2>>> toPairTarget() { - return e -> { - Optional o = e._2()._2(); - if (o.isPresent()) { - return new Tuple2<>(((Relation) o.get().getOaf()).getTarget(), e); - } else { - return new Tuple2<>(null, e); - } - }; - } - - private PairFunction toPair() { - return e -> { - if (!"relation".equals(e.getType())) { - return new Tuple2<>( ((OafEntity) e.getOaf()).getId(), e); - } else { - return new Tuple2<>( ((Relation) e.getOaf()).getSource(), e); - } - }; + private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { + return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> { + final String json = item._2().toString(); + final String source = JsonPath.read(json, "$.source"); + final String target = JsonPath.read(json, "$.target"); + return new TypedRow(source, target, "relation", json); + }); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java new file mode 100644 index 000000000..9e6fc0d38 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.graph; + +import java.io.Serializable; +import java.util.List; + +public class LinkedEntity implements Serializable { + + private TypedRow entity; + + private List links; + + public TypedRow getEntity() { + return entity; + } + + public LinkedEntity setEntity(TypedRow entity) { + this.entity = entity; + return this; + } + + public List getLinks() { + return links; + } + + public LinkedEntity setLinks(List links) { + this.links = links; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java deleted file mode 100644 index dbab04f16..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java +++ /dev/null @@ -1,69 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import java.io.Serializable; - -public class RelatedEntity implements Serializable { - - private String relType; - - private String subRelType; - - private String relClass; - - private String type; - - private String payload; - - public RelatedEntity(String relType, String subRelType, String relClass, String type, String payload) { - this.relType = relType; - this.subRelType = subRelType; - this.relClass = relClass; - this.type = type; - this.payload = payload; - } - - public String getRelType() { - return relType; - } - - public RelatedEntity setRelType(String relType) { - this.relType = relType; - return this; - } - - public String getSubRelType() { - return subRelType; - } - - public RelatedEntity setSubRelType(String subRelType) { - this.subRelType = subRelType; - return this; - } - - public String getRelClass() { - return relClass; - } - - public RelatedEntity setRelClass(String relClass) { - this.relClass = relClass; - return this; - } - - public String getType() { - return type; - } - - public RelatedEntity setType(String type) { - this.type = type; - return this; - } - - public String getPayload() { - return payload; - } - - public RelatedEntity setPayload(String payload) { - this.payload = payload; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java index ce8e7e396..1d55dda89 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java @@ -4,21 +4,27 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; public class SparkGraphIndexingJob { - private final static String ENTITY_NODES_PATH = "/tmp/entity_node"; + private final static String OUTPUT_BASE_PATH = "/tmp/openaire_provision"; public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); + + final SparkConf conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession .builder() + .config(conf) .appName(SparkGraphIndexingJob.class.getSimpleName()) .master(parser.get("master")) - .config("hive.metastore.uris", parser.get("hive_metastore_uris")) .enableHiveSupport() .getOrCreate(); @@ -26,11 +32,12 @@ public class SparkGraphIndexingJob { final String hiveDbName = parser.get("hive_db_name"); final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - if (fs.exists(new Path(ENTITY_NODES_PATH))) { - fs.delete(new Path(ENTITY_NODES_PATH), true); + if (fs.exists(new Path(OUTPUT_BASE_PATH))) { + fs.delete(new Path(OUTPUT_BASE_PATH), true); + fs.mkdirs(new Path(OUTPUT_BASE_PATH)); } - new GraphJoiner().join(spark, inputPath, hiveDbName, ENTITY_NODES_PATH); + new GraphJoiner().join(spark, inputPath, hiveDbName, OUTPUT_BASE_PATH); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java new file mode 100644 index 000000000..0b22a63a5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java @@ -0,0 +1,31 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; + +public class Tuple implements Serializable { + + private TypedRow relation; + + private TypedRow target; + + public TypedRow getRelation() { + return relation; + } + + public Tuple setRelation(TypedRow relation) { + this.relation = relation; + return this; + } + + public TypedRow getTarget() { + return target; + } + + public Tuple setTarget(TypedRow target) { + this.target = target; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java index 5c933ca80..60c3b64b2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java @@ -1,20 +1,46 @@ package eu.dnetlib.dhp.graph; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.Oaf; - import java.io.Serializable; public class TypedRow implements Serializable { - private String type; - private Oaf oaf; - public TypedRow(String type, Oaf oaf) { + private String source; + private String target; + private String type; + private String oaf; + + public TypedRow() { + } + + public TypedRow(String source, String type, String oaf) { + this.source = source; this.type = type; this.oaf = oaf; } + public TypedRow(String source, String target, String type, String oaf) { + this(source, type, oaf); + this.target = target; + } + + public String getSource() { + return source; + } + + public TypedRow setSource(String source) { + this.source = source; + return this; + } + + public String getTarget() { + return target; + } + + public TypedRow setTarget(String target) { + this.target = target; + return this; + } + public String getType() { return type; } @@ -24,21 +50,13 @@ public class TypedRow implements Serializable { return this; } - public Oaf getOaf() { + public String getOaf() { return oaf; } - public TypedRow setOaf(Oaf oaf) { + public TypedRow setOaf(String oaf) { this.oaf = oaf; return this; } - @Override - public String toString() { - try { - return new ObjectMapper().writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml index fcab9dd00..624d3ea76 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -23,4 +23,12 @@ hive_db_name openaire + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088 + + + spark2EventLogDir + /user/spark/applicationHistory + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 00a890268..a91759ade 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -16,6 +16,14 @@ sparkExecutorCores number of cores used by single executor + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + @@ -33,7 +41,16 @@ GraphIndexing eu.dnetlib.dhp.graph.SparkGraphIndexingJob dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.yarn.executor.memoryOverhead=${sparkExecutorMemoryOverhead} + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + -mt yarn-cluster --sourcePath${sourcePath} --hive_db_name${hive_db_name}