diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index ec5af8d3c..491cbe668 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.1.5-SNAPSHOT
+ 1.0.5-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java
deleted file mode 100644
index 894d54eaf..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Entity.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class Entity implements Serializable {
-
- private String identifier;
-
- private List pid;
-
- private List title;
-
- private List date;
-
- private String typology;
-
- private List authors;
-
- private List subject;
-
- private String description;
-
- private String completionStatus;
-
- private List collectedFrom;
-
- private List publisher;
-
-
- public String getIdentifier() {
- return identifier;
- }
-
- public void setIdentifier(String identifier) {
- this.identifier = identifier;
- }
-
- public List getPid() {
- return pid;
- }
-
- public void setPid(List pid) {
- this.pid = pid;
- }
-
- public List getTitle() {
- return title;
- }
-
- public void setTitle(List title) {
- this.title = title;
- }
-
- public List getDate() {
- return date;
- }
-
- public void setDate(List date) {
- this.date = date;
- }
-
- public String getTypology() {
- return typology;
- }
-
- public void setTypology(String typology) {
- this.typology = typology;
- }
-
- public List getAuthors() {
- return authors;
- }
-
- public void setAuthors(List authors) {
- this.authors = authors;
- }
-
- public List getSubject() {
- return subject;
- }
-
- public void setSubject(List subject) {
- this.subject = subject;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public List getCollectedFrom() {
- return collectedFrom;
- }
-
- public void setCollectedFrom(List collectedFrom) {
- this.collectedFrom = collectedFrom;
- }
-
- public List getPublisher() {
- return publisher;
- }
-
- public void setPublisher(List publisher) {
- this.publisher = publisher;
- }
-
- public String getCompletionStatus() {
- return completionStatus;
- }
-
- public void setCompletionStatus(String completionStatus) {
- this.completionStatus = completionStatus;
- }
-}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java
deleted file mode 100644
index 252245f45..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Pid.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-import eu.dnetlib.dhp.utils.DHPUtils;
-import org.apache.commons.lang3.StringUtils;
-
-public class Pid {
-
- private String pid;
-
- private String pidType;
-
- public String getPid() {
- return pid;
- }
-
- public void setPid(String pid) {
- this.pid = pid;
- }
-
- public String getPidType() {
- return pidType;
- }
-
- public void setPidType(String pidType) {
- this.pidType = pidType;
- }
-
- public String generateId() {
- if(StringUtils.isEmpty(pid) || StringUtils.isEmpty(pidType))
- return null;
- return DHPUtils.md5(String.format("%s::%s", pid, pidType));
- }
-}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java
deleted file mode 100644
index 300b1134b..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Provenance.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-public class Provenance {
-
- private String datasourceId;
-
- private String datasourceName;
-
- private String completionStatus;
-
-
- public String getDatasourceId() {
- return datasourceId;
- }
-
- public void setDatasourceId(String datasourceId) {
- this.datasourceId = datasourceId;
- }
-
- public String getDatasourceName() {
- return datasourceName;
- }
-
- public void setDatasourceName(String datasourceName) {
- this.datasourceName = datasourceName;
- }
-
- public String getCompletionStatus() {
- return completionStatus;
- }
-
- public void setCompletionStatus(String completionStatus) {
- this.completionStatus = completionStatus;
- }
-}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java
deleted file mode 100644
index b83cccb73..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class Relation implements Serializable {
-
- private String source;
-
- private String target;
-
- private List provenance;
-
- private RelationSemantic semantic;
-
- public String getSource() {
- return source;
- }
-
- public void setSource(String source) {
- this.source = source;
- }
-
- public String getTarget() {
- return target;
- }
-
- public void setTarget(String target) {
- this.target = target;
- }
-
- public List getProvenance() {
- return provenance;
- }
-
- public void setProvenance(List provenance) {
- this.provenance = provenance;
- }
-
- public RelationSemantic getSemantic() {
- return semantic;
- }
-
- public void setSemantic(RelationSemantic semantic) {
- this.semantic = semantic;
- }
-}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java
deleted file mode 100644
index ff871ef2d..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/RelationSemantic.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-import java.io.Serializable;
-
-public class RelationSemantic extends Subject implements Serializable {
-
- public String inverse;
-
- public String getInverse() {
- return inverse;
- }
-
- public void setInverse(String inverse) {
- this.inverse = inverse;
- }
-}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java
deleted file mode 100644
index bd89bc6dd..000000000
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Subject.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package eu.dnetlib.dhp.schema.dli;
-
-import java.io.Serializable;
-
-public class Subject implements Serializable {
-
- private String schema;
-
- private String value;
-
- public Subject() {
-
- }
-
- public Subject(String schema, String value) {
- this.schema = schema;
- this.value = value;
- }
-
- public String getSchema() {
- return schema;
- }
-
- public void setSchema(String schema) {
- this.schema = schema;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-}
diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties
index 882053c1a..1870b0e6e 100644
--- a/dhp-workflows/dhp-graph-provision/job-override.properties
+++ b/dhp-workflows/dhp-graph-provision/job-override.properties
@@ -1,5 +1,4 @@
sparkDriverMemory=7G
sparkExecutorMemory=7G
-sparkExecutorMemoryOverhead=5G
hive_db_name=claudio
sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index d47463774..62d8ac2ae 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -19,6 +19,10 @@
org.apache.spark
spark-sql_2.11
+
+ com.jayway.jsonpath
+ json-path
+
eu.dnetlib.dhp
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java
deleted file mode 100644
index be1babae2..000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package eu.dnetlib.dhp.graph;
-
-public class EntityNode {
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java
index ac89e4351..e8ecc2e30 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java
@@ -1,20 +1,30 @@
package eu.dnetlib.dhp.graph;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-
import java.io.Serializable;
public class EntityRelEntity implements Serializable {
private TypedRow source;
- private Relation relation;
+ private TypedRow relation;
private TypedRow target;
+ public EntityRelEntity() {
+ }
+
public EntityRelEntity(TypedRow source) {
this.source = source;
}
+
+ //helpers
+ public Boolean hasMainEntity() {
+ return getSource() != null & getRelation() == null & getTarget() == null;
+ }
+
+ public Boolean hasRelatedEntity() {
+ return getSource() == null & getRelation() != null & getTarget() != null;
+ }
+
+
public TypedRow getSource() {
return source;
}
@@ -24,11 +34,11 @@ public class EntityRelEntity implements Serializable {
return this;
}
- public Relation getRelation() {
+ public TypedRow getRelation() {
return relation;
}
- public EntityRelEntity setRelation(Relation relation) {
+ public EntityRelEntity setRelation(TypedRow relation) {
this.relation = relation;
return this;
}
@@ -42,12 +52,4 @@ public class EntityRelEntity implements Serializable {
return this;
}
- @Override
- public String toString() {
- try {
- return new ObjectMapper().writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
index 5764642dc..aca436f52 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
@@ -1,139 +1,119 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.schema.oaf.*;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.Serializable;
+import java.util.List;
public class GraphJoiner implements Serializable {
- public static final int MAX_RELS = 100;
+ public static final int MAX_RELS = 10;
public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- /*
- JavaPairRDD entities = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
- .map(oaf -> new TypedRow("publication", oaf))
- .mapToPair(toPair());
+ final String entityIdPath = "$.id";
- */
+ JavaPairRDD datasource = readPathEntity(sc, entityIdPath, inputPath, "datasource");
+ JavaPairRDD organization = readPathEntity(sc, entityIdPath, inputPath, "organization");
+ JavaPairRDD project = readPathEntity(sc, entityIdPath, inputPath, "project");
+ JavaPairRDD dataset = readPathEntity(sc, entityIdPath, inputPath, "dataset");
+ JavaPairRDD otherresearchproduct = readPathEntity(sc, entityIdPath, inputPath, "otherresearchproduct");
+ JavaPairRDD software = readPathEntity(sc, entityIdPath, inputPath, "software");
+ JavaPairRDD publication = readPathEntity(sc, entityIdPath, inputPath, "publication");
- JavaPairRDD entities = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
- .map(oaf -> new TypedRow("datasource", oaf))
- .mapToPair(toPair())
- .union(sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
- .map(oaf -> new TypedRow("organization", oaf))
- .mapToPair(toPair()))
- .union(sc.sequenceFile(inputPath + "/project", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Project.class))
- .map(oaf -> new TypedRow("project", oaf))
- .mapToPair(toPair()))
- .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
- .map(oaf -> new TypedRow("dataset", oaf))
- .mapToPair(toPair()))
- .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
- .map(oaf -> new TypedRow("otherresearchproduct", oaf))
- .mapToPair(toPair()))
- .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
- .map(oaf -> new TypedRow("software", oaf))
- .mapToPair(toPair()));
- /*
- .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
- .map(oaf -> new TypedRow("publication", oaf))
- .mapToPair(toPair()));
+ final String entitiesPath = outPath + "/entities";
+ datasource
+ .union(organization)
+ .union(project)
+ .union(dataset)
+ .union(otherresearchproduct)
+ .union(software)
+ .union(publication)
+ .map(e -> new EntityRelEntity().setSource(e._2()))
+ .map(e -> new ObjectMapper().writeValueAsString(e))
+ .saveAsTextFile(entitiesPath, GzipCodec.class);
- */
+ JavaPairRDD entities = sc.textFile(entitiesPath)
+ .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
+ .mapToPair(t -> new Tuple2<>(t.getSource().getSource(), t));
- /*
- JavaRDD rels = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
- .map(oaf -> new TypedRow("relation", oaf))
- .mapToPair(toPair())
+ final JavaPairRDD relation = readPathRelation(sc, inputPath)
+ .map(p -> new EntityRelEntity().setRelation(p))
+ .mapToPair(p -> new Tuple2<>(p.getRelation().getSource(), p))
.groupByKey()
- .map(t -> Iterables.limit(t._2(), MAX_RELS))
- .flatMap(t -> t.iterator())
- .map(t -> (Relation) t.getOaf());
+ .map(p -> Iterables.limit(p._2(), MAX_RELS))
+ .flatMap(p -> p.iterator())
+ .mapToPair(p -> new Tuple2<>(p.getRelation().getTarget(), p));
- spark.createDataset(rels.rdd(), Encoders.bean(Relation.class))
- .write()
- .mode(SaveMode.Overwrite)
- .saveAsTable(hiveDbName + ".relation_100");
- */
+ final String joinByTargetPath = outPath + "/join_by_target";
+ relation.join(entities)
+ .map(s -> new EntityRelEntity()
+ .setRelation(s._2()._1().getRelation())
+ .setTarget(s._2()._2().getSource()))
+ .map(e -> new ObjectMapper().writeValueAsString(e))
+ .saveAsTextFile(joinByTargetPath, GzipCodec.class);
- JavaPairRDD bounded_rels = spark.table(hiveDbName + ".relation_" + MAX_RELS)
- .as(Encoders.bean(Relation.class))
- .javaRDD()
- .map(r -> new TypedRow("relation", r))
- .mapToPair(toPair());
- // build the adjacency list: e -> r
- JavaPairRDD>> adjacency_list = entities.leftOuterJoin(bounded_rels);
+ JavaPairRDD bySource = sc.textFile(joinByTargetPath)
+ .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
+ .mapToPair(t -> new Tuple2<>(t.getRelation().getSource(), t));
- JavaRDD linked_entities = adjacency_list
- .mapToPair(toPairTarget()) // make rel.targetid explicit so that we can join it
- .leftOuterJoin(entities) // again with the entities to get the target entity
- .map(l -> toEntityRelEntity(l)); // and map it to a more readable representation
-
- spark.createDataFrame(linked_entities, EntityRelEntity.class)
- .write()
- .mode(SaveMode.Overwrite)
- .saveAsTable(hiveDbName + ".linked_entities");
+ entities
+ .union(bySource)
+ .groupByKey() // by source id
+ .map(p -> {
+ final LinkedEntity e = new LinkedEntity();
+ final List links = Lists.newArrayList();
+ for(EntityRelEntity rel : p._2()) {
+ if (rel.hasMainEntity() & e.getEntity() == null) {
+ e.setEntity(rel.getSource());
+ }
+ if (rel.hasRelatedEntity()) {
+ links.add(new Tuple()
+ .setRelation(rel.getRelation())
+ .setTarget(rel.getTarget()));
+ }
+ }
+ e.setLinks(links);
+ if (e.getEntity() == null) {
+ throw new IllegalStateException("missing main entity on '" + p._1() + "'");
+ }
+ return e;
+ })
+ .map(e -> new ObjectMapper().writeValueAsString(e))
+ .saveAsTextFile(outPath + "/linked_entities", GzipCodec.class);
}
- private EntityRelEntity toEntityRelEntity(Tuple2>>, Optional>> l) {
- // extract the entity source
- final EntityRelEntity res = new EntityRelEntity(l._2()._1()._2()._1());
-
- if(l._2()._1()._2()._2().isPresent() && l._2()._2().isPresent()) {
-
- // extract the relationship
- res.setRelation((Relation) l._2()._1()._2()._2().get().getOaf());
-
- // extract the related entity
- res.setTarget(l._2()._2().get());
- }
-
- return res;
+ private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String idPath, final String inputPath, final String type) {
+ return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
+ .mapToPair((PairFunction, String, TypedRow>) item -> {
+ final String json = item._2().toString();
+ final String id = JsonPath.read(json, idPath);
+ return new Tuple2<>(id, new TypedRow(id, type, json));
+ });
}
- private PairFunction>>, String, Tuple2>>> toPairTarget() {
- return e -> {
- Optional o = e._2()._2();
- if (o.isPresent()) {
- return new Tuple2<>(((Relation) o.get().getOaf()).getTarget(), e);
- } else {
- return new Tuple2<>(null, e);
- }
- };
- }
-
- private PairFunction toPair() {
- return e -> {
- if (!"relation".equals(e.getType())) {
- return new Tuple2<>( ((OafEntity) e.getOaf()).getId(), e);
- } else {
- return new Tuple2<>( ((Relation) e.getOaf()).getSource(), e);
- }
- };
+ private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) {
+ return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
+ .map(item -> {
+ final String json = item._2().toString();
+ final String source = JsonPath.read(json, "$.source");
+ final String target = JsonPath.read(json, "$.target");
+ return new TypedRow(source, target, "relation", json);
+ });
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java
new file mode 100644
index 000000000..9e6fc0d38
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java
@@ -0,0 +1,29 @@
+package eu.dnetlib.dhp.graph;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class LinkedEntity implements Serializable {
+
+ private TypedRow entity;
+
+ private List links;
+
+ public TypedRow getEntity() {
+ return entity;
+ }
+
+ public LinkedEntity setEntity(TypedRow entity) {
+ this.entity = entity;
+ return this;
+ }
+
+ public List getLinks() {
+ return links;
+ }
+
+ public LinkedEntity setLinks(List links) {
+ this.links = links;
+ return this;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java
deleted file mode 100644
index dbab04f16..000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package eu.dnetlib.dhp.graph;
-
-import java.io.Serializable;
-
-public class RelatedEntity implements Serializable {
-
- private String relType;
-
- private String subRelType;
-
- private String relClass;
-
- private String type;
-
- private String payload;
-
- public RelatedEntity(String relType, String subRelType, String relClass, String type, String payload) {
- this.relType = relType;
- this.subRelType = subRelType;
- this.relClass = relClass;
- this.type = type;
- this.payload = payload;
- }
-
- public String getRelType() {
- return relType;
- }
-
- public RelatedEntity setRelType(String relType) {
- this.relType = relType;
- return this;
- }
-
- public String getSubRelType() {
- return subRelType;
- }
-
- public RelatedEntity setSubRelType(String subRelType) {
- this.subRelType = subRelType;
- return this;
- }
-
- public String getRelClass() {
- return relClass;
- }
-
- public RelatedEntity setRelClass(String relClass) {
- this.relClass = relClass;
- return this;
- }
-
- public String getType() {
- return type;
- }
-
- public RelatedEntity setType(String type) {
- this.type = type;
- return this;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public RelatedEntity setPayload(String payload) {
- this.payload = payload;
- return this;
- }
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
index ce8e7e396..1d55dda89 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
@@ -4,21 +4,27 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class SparkGraphIndexingJob {
- private final static String ENTITY_NODES_PATH = "/tmp/entity_node";
+ private final static String OUTPUT_BASE_PATH = "/tmp/openaire_provision";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
parser.parseArgument(args);
+
+ final SparkConf conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+
final SparkSession spark = SparkSession
.builder()
+ .config(conf)
.appName(SparkGraphIndexingJob.class.getSimpleName())
.master(parser.get("master"))
- .config("hive.metastore.uris", parser.get("hive_metastore_uris"))
.enableHiveSupport()
.getOrCreate();
@@ -26,11 +32,12 @@ public class SparkGraphIndexingJob {
final String hiveDbName = parser.get("hive_db_name");
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
- if (fs.exists(new Path(ENTITY_NODES_PATH))) {
- fs.delete(new Path(ENTITY_NODES_PATH), true);
+ if (fs.exists(new Path(OUTPUT_BASE_PATH))) {
+ fs.delete(new Path(OUTPUT_BASE_PATH), true);
+ fs.mkdirs(new Path(OUTPUT_BASE_PATH));
}
- new GraphJoiner().join(spark, inputPath, hiveDbName, ENTITY_NODES_PATH);
+ new GraphJoiner().join(spark, inputPath, hiveDbName, OUTPUT_BASE_PATH);
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java
new file mode 100644
index 000000000..0b22a63a5
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java
@@ -0,0 +1,31 @@
+package eu.dnetlib.dhp.graph;
+
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+import java.io.Serializable;
+
+public class Tuple implements Serializable {
+
+ private TypedRow relation;
+
+ private TypedRow target;
+
+ public TypedRow getRelation() {
+ return relation;
+ }
+
+ public Tuple setRelation(TypedRow relation) {
+ this.relation = relation;
+ return this;
+ }
+
+ public TypedRow getTarget() {
+ return target;
+ }
+
+ public Tuple setTarget(TypedRow target) {
+ this.target = target;
+ return this;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java
index 5c933ca80..60c3b64b2 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java
@@ -1,20 +1,46 @@
package eu.dnetlib.dhp.graph;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-
import java.io.Serializable;
public class TypedRow implements Serializable {
- private String type;
- private Oaf oaf;
- public TypedRow(String type, Oaf oaf) {
+ private String source;
+ private String target;
+ private String type;
+ private String oaf;
+
+ public TypedRow() {
+ }
+
+ public TypedRow(String source, String type, String oaf) {
+ this.source = source;
this.type = type;
this.oaf = oaf;
}
+ public TypedRow(String source, String target, String type, String oaf) {
+ this(source, type, oaf);
+ this.target = target;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public TypedRow setSource(String source) {
+ this.source = source;
+ return this;
+ }
+
+ public String getTarget() {
+ return target;
+ }
+
+ public TypedRow setTarget(String target) {
+ this.target = target;
+ return this;
+ }
+
public String getType() {
return type;
}
@@ -24,21 +50,13 @@ public class TypedRow implements Serializable {
return this;
}
- public Oaf getOaf() {
+ public String getOaf() {
return oaf;
}
- public TypedRow setOaf(Oaf oaf) {
+ public TypedRow setOaf(String oaf) {
this.oaf = oaf;
return this;
}
- @Override
- public String toString() {
- try {
- return new ObjectMapper().writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
index fcab9dd00..624d3ea76 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
@@ -23,4 +23,12 @@
hive_db_name
openaire
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088
+
+
+ spark2EventLogDir
+ /user/spark/applicationHistory
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
index 00a890268..a91759ade 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
@@ -16,6 +16,14 @@
sparkExecutorCores
number of cores used by single executor
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
@@ -33,7 +41,16 @@
GraphIndexing
eu.dnetlib.dhp.graph.SparkGraphIndexingJob
dhp-graph-provision-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.yarn.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
+ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
+ --conf spark.sql.warehouse.dir="/user/hive/warehouse"
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
-mt yarn-cluster
--sourcePath${sourcePath}
--hive_db_name${hive_db_name}