diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 20896a61d..491cbe668 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -26,6 +26,11 @@
commons-lang3
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
eu.dnetlib.dhp
dhp-common
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
index 352ebbc6e..010633ec3 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
@@ -1,5 +1,8 @@
package eu.dnetlib.dhp.schema.oaf;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.Serializable;
public abstract class Oaf implements Serializable {
@@ -23,4 +26,14 @@ public abstract class Oaf implements Serializable {
public void setLastupdatetimestamp(Long lastupdatetimestamp) {
this.lastupdatetimestamp = lastupdatetimestamp;
}
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties
index 31f7f88f5..882053c1a 100644
--- a/dhp-workflows/dhp-graph-provision/job-override.properties
+++ b/dhp-workflows/dhp-graph-provision/job-override.properties
@@ -1,3 +1,5 @@
-sparkDriverMemory=16G
-sparkExecutorMemory=16G
-hive_db_name=claudio
\ No newline at end of file
+sparkDriverMemory=7G
+sparkExecutorMemory=7G
+sparkExecutorMemoryOverhead=5G
+hive_db_name=claudio
+sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java
new file mode 100644
index 000000000..ac89e4351
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java
@@ -0,0 +1,53 @@
+package eu.dnetlib.dhp.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+import java.io.Serializable;
+
+public class EntityRelEntity implements Serializable {
+ private TypedRow source;
+ private Relation relation;
+ private TypedRow target;
+
+ public EntityRelEntity(TypedRow source) {
+ this.source = source;
+ }
+
+ public TypedRow getSource() {
+ return source;
+ }
+
+ public EntityRelEntity setSource(TypedRow source) {
+ this.source = source;
+ return this;
+ }
+
+ public Relation getRelation() {
+ return relation;
+ }
+
+ public EntityRelEntity setRelation(Relation relation) {
+ this.relation = relation;
+ return this;
+ }
+
+ public TypedRow getTarget() {
+ return target;
+ }
+
+ public EntityRelEntity setTarget(TypedRow target) {
+ this.target = target;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
new file mode 100644
index 000000000..5764642dc
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
@@ -0,0 +1,139 @@
+package eu.dnetlib.dhp.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.io.Serializable;
+
+public class GraphJoiner implements Serializable {
+
+ public static final int MAX_RELS = 100;
+
+ public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) {
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+
+ /*
+ JavaPairRDD entities = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+ .map(oaf -> new TypedRow("publication", oaf))
+ .mapToPair(toPair());
+
+ */
+
+ JavaPairRDD entities = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
+ .map(oaf -> new TypedRow("datasource", oaf))
+ .mapToPair(toPair())
+ .union(sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
+ .map(oaf -> new TypedRow("organization", oaf))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/project", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Project.class))
+ .map(oaf -> new TypedRow("project", oaf))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
+ .map(oaf -> new TypedRow("dataset", oaf))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
+ .map(oaf -> new TypedRow("otherresearchproduct", oaf))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
+ .map(oaf -> new TypedRow("software", oaf))
+ .mapToPair(toPair()));
+ /*
+ .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+ .map(oaf -> new TypedRow("publication", oaf))
+ .mapToPair(toPair()));
+
+ */
+
+ /*
+ JavaRDD rels = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
+ .map(oaf -> new TypedRow("relation", oaf))
+ .mapToPair(toPair())
+ .groupByKey()
+ .map(t -> Iterables.limit(t._2(), MAX_RELS))
+ .flatMap(t -> t.iterator())
+ .map(t -> (Relation) t.getOaf());
+
+ spark.createDataset(rels.rdd(), Encoders.bean(Relation.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .saveAsTable(hiveDbName + ".relation_100");
+ */
+
+ JavaPairRDD bounded_rels = spark.table(hiveDbName + ".relation_" + MAX_RELS)
+ .as(Encoders.bean(Relation.class))
+ .javaRDD()
+ .map(r -> new TypedRow("relation", r))
+ .mapToPair(toPair());
+
+ // build the adjacency list: e -> r
+ JavaPairRDD>> adjacency_list = entities.leftOuterJoin(bounded_rels);
+
+ JavaRDD linked_entities = adjacency_list
+ .mapToPair(toPairTarget()) // make rel.targetid explicit so that we can join it
+ .leftOuterJoin(entities) // again with the entities to get the target entity
+ .map(l -> toEntityRelEntity(l)); // and map it to a more readable representation
+
+ spark.createDataFrame(linked_entities, EntityRelEntity.class)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .saveAsTable(hiveDbName + ".linked_entities");
+ }
+
+ private EntityRelEntity toEntityRelEntity(Tuple2>>, Optional>> l) {
+ // extract the entity source
+ final EntityRelEntity res = new EntityRelEntity(l._2()._1()._2()._1());
+
+ if(l._2()._1()._2()._2().isPresent() && l._2()._2().isPresent()) {
+
+ // extract the relationship
+ res.setRelation((Relation) l._2()._1()._2()._2().get().getOaf());
+
+ // extract the related entity
+ res.setTarget(l._2()._2().get());
+ }
+
+ return res;
+ }
+
+ private PairFunction>>, String, Tuple2>>> toPairTarget() {
+ return e -> {
+ Optional o = e._2()._2();
+ if (o.isPresent()) {
+ return new Tuple2<>(((Relation) o.get().getOaf()).getTarget(), e);
+ } else {
+ return new Tuple2<>(null, e);
+ }
+ };
+ }
+
+ private PairFunction toPair() {
+ return e -> {
+ if (!"relation".equals(e.getType())) {
+ return new Tuple2<>( ((OafEntity) e.getOaf()).getId(), e);
+ } else {
+ return new Tuple2<>( ((Relation) e.getOaf()).getSource(), e);
+ }
+ };
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
index 04711efbd..ce8e7e396 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
@@ -1,32 +1,14 @@
package eu.dnetlib.dhp.graph;
-import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.common.EntityPayload;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.commons.lang3.StringUtils.substringAfter;
-import static org.apache.commons.lang3.StringUtils.substringBefore;
-import static org.apache.spark.sql.Encoders.bean;
public class SparkGraphIndexingJob {
private final static String ENTITY_NODES_PATH = "/tmp/entity_node";
- private static final long LIMIT = 100;
public static void main(String[] args) throws Exception {
@@ -37,13 +19,10 @@ public class SparkGraphIndexingJob {
.appName(SparkGraphIndexingJob.class.getSimpleName())
.master(parser.get("master"))
.config("hive.metastore.uris", parser.get("hive_metastore_uris"))
- .config("spark.driver.cores", 1)
- .config("spark.executor.cores", 1)
- .config("spark.yarn.executor.memoryOverhead", "4G")
- .config("spark.yarn.driver.memoryOverhead", "4G")
.enableHiveSupport()
.getOrCreate();
+ final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
@@ -51,52 +30,7 @@ public class SparkGraphIndexingJob {
fs.delete(new Path(ENTITY_NODES_PATH), true);
}
- spark
- .sql(getJoinEntitiesSQL(hiveDbName))
- .transform(toEntityNode())
- /*
- .map((MapFunction) r -> {
- return null;
- }, bean(String.class))
- */
- .rdd()
-
- .saveAsTextFile(ENTITY_NODES_PATH, GzipCodec.class);
- }
-
- private static AbstractFunction1, Dataset> toEntityNode() {
- return new AbstractFunction1, Dataset>() {
- @Override
- public Dataset apply(Dataset d) {
- return d.map((MapFunction) r -> {
-
- final List res = r.getList(r.fieldIndex("related_entity"));
- final byte[] payload = r.getAs("payload");
- return new EntityNode(r.getAs("id"), r.getAs("type"), new String(payload))
- .setRelatedEntities(res
- .stream()
- .map(re -> new Tuple2<>(substringBefore(re, "@@"), substringAfter(re, "@@")))
- .map(re -> new RelatedEntity(r.getAs("reltype"), r.getAs("subreltype"), r.getAs("relclass"), re._1(), re._2()))
- .limit(LIMIT)
- .collect(Collectors.toList()));
-
- }, bean(EntityNode.class));
- }
- };
- }
-
- private static String getJoinEntitiesSQL(String hiveDbName) {
- return String.format(
- "SELECT " +
- "E_s.id AS id, " +
- "E_s.type AS type, " +
- "E_s.payload AS payload, " +
- "r.reltype AS reltype, r.subreltype AS subreltype, r.relclass AS relclass, " +
- "collect_list(concat(E_t.type, '@@', E_t.payload)) AS related_entity " +
- "FROM %s.entities " + "" /*"TABLESAMPLE(0.1 PERCENT) "*/ + "E_s " +
- "LEFT JOIN %s.relation r ON (r.source = E_s.id) " +
- "JOIN %s.entities E_t ON (E_t.id = r.target) \n" +
- "GROUP BY E_s.id, E_s.type, E_s.payload, r.reltype, r.subreltype, r.relclass", hiveDbName, hiveDbName, hiveDbName);
+ new GraphJoiner().join(spark, inputPath, hiveDbName, ENTITY_NODES_PATH);
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java
new file mode 100644
index 000000000..5c933ca80
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java
@@ -0,0 +1,44 @@
+package eu.dnetlib.dhp.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+
+import java.io.Serializable;
+
+public class TypedRow implements Serializable {
+ private String type;
+ private Oaf oaf;
+
+ public TypedRow(String type, Oaf oaf) {
+ this.type = type;
+ this.oaf = oaf;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public TypedRow setType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Oaf getOaf() {
+ return oaf;
+ }
+
+ public TypedRow setOaf(Oaf oaf) {
+ this.oaf = oaf;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json
index 613389d79..a197abc78 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json
@@ -1,5 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true},
- {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true}
+ {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
index 473b697cd..00a890268 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
@@ -33,8 +33,9 @@
GraphIndexing
eu.dnetlib.dhp.graph.SparkGraphIndexingJob
dhp-graph-provision-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
+ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.yarn.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
-mt yarn-cluster
+ --sourcePath${sourcePath}
--hive_db_name${hive_db_name}
--hive_metastore_uris${hive_metastore_uris}