diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties
new file mode 100644
index 000000000..31f7f88f5
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/job-override.properties
@@ -0,0 +1,3 @@
+sparkDriverMemory=16G
+sparkExecutorMemory=16G
+hive_db_name=claudio
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
new file mode 100644
index 000000000..d47463774
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+
+ dhp-graph-provision
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java
new file mode 100644
index 000000000..be1babae2
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java
@@ -0,0 +1,4 @@
+package eu.dnetlib.dhp.graph;
+
+public class EntityNode {
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java
new file mode 100644
index 000000000..ab19ff2b5
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java
@@ -0,0 +1,23 @@
+package eu.dnetlib.dhp.graph;
+
+import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.schema.oaf.*;
+
+import java.util.Map;
+
+public class GraphMappingUtils {
+
+ public final static Map types = Maps.newHashMap();
+
+ static {
+ types.put("datasource", Datasource.class);
+ types.put("organization", Organization.class);
+ types.put("project", Project.class);
+ types.put("dataset", Dataset.class);
+ types.put("otherresearchproduct", OtherResearchProduct.class);
+ types.put("software", Software.class);
+ types.put("publication", Publication.class);
+ types.put("relation", Relation.class);
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java
new file mode 100644
index 000000000..dbab04f16
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java
@@ -0,0 +1,69 @@
+package eu.dnetlib.dhp.graph;
+
+import java.io.Serializable;
+
+public class RelatedEntity implements Serializable {
+
+ private String relType;
+
+ private String subRelType;
+
+ private String relClass;
+
+ private String type;
+
+ private String payload;
+
+ public RelatedEntity(String relType, String subRelType, String relClass, String type, String payload) {
+ this.relType = relType;
+ this.subRelType = subRelType;
+ this.relClass = relClass;
+ this.type = type;
+ this.payload = payload;
+ }
+
+ public String getRelType() {
+ return relType;
+ }
+
+ public RelatedEntity setRelType(String relType) {
+ this.relType = relType;
+ return this;
+ }
+
+ public String getSubRelType() {
+ return subRelType;
+ }
+
+ public RelatedEntity setSubRelType(String subRelType) {
+ this.subRelType = subRelType;
+ return this;
+ }
+
+ public String getRelClass() {
+ return relClass;
+ }
+
+ public RelatedEntity setRelClass(String relClass) {
+ this.relClass = relClass;
+ return this;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public RelatedEntity setType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public RelatedEntity setPayload(String payload) {
+ this.payload = payload;
+ return this;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
new file mode 100644
index 000000000..04711efbd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java
@@ -0,0 +1,102 @@
+package eu.dnetlib.dhp.graph;
+
+import com.google.common.collect.Sets;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.EntityPayload;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.substringAfter;
+import static org.apache.commons.lang3.StringUtils.substringBefore;
+import static org.apache.spark.sql.Encoders.bean;
+
+public class SparkGraphIndexingJob {
+
+ private final static String ENTITY_NODES_PATH = "/tmp/entity_node";
+ private static final long LIMIT = 100;
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkGraphIndexingJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .config("hive.metastore.uris", parser.get("hive_metastore_uris"))
+ .config("spark.driver.cores", 1)
+ .config("spark.executor.cores", 1)
+ .config("spark.yarn.executor.memoryOverhead", "4G")
+ .config("spark.yarn.driver.memoryOverhead", "4G")
+ .enableHiveSupport()
+ .getOrCreate();
+
+ final String hiveDbName = parser.get("hive_db_name");
+
+ final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
+ if (fs.exists(new Path(ENTITY_NODES_PATH))) {
+ fs.delete(new Path(ENTITY_NODES_PATH), true);
+ }
+
+ spark
+ .sql(getJoinEntitiesSQL(hiveDbName))
+ .transform(toEntityNode())
+ /*
+ .map((MapFunction) r -> {
+ return null;
+ }, bean(String.class))
+ */
+ .rdd()
+
+ .saveAsTextFile(ENTITY_NODES_PATH, GzipCodec.class);
+ }
+
+ private static AbstractFunction1, Dataset> toEntityNode() {
+ return new AbstractFunction1, Dataset>() {
+ @Override
+ public Dataset apply(Dataset d) {
+ return d.map((MapFunction) r -> {
+
+ final List res = r.getList(r.fieldIndex("related_entity"));
+ final byte[] payload = r.getAs("payload");
+ return new EntityNode(r.getAs("id"), r.getAs("type"), new String(payload))
+ .setRelatedEntities(res
+ .stream()
+ .map(re -> new Tuple2<>(substringBefore(re, "@@"), substringAfter(re, "@@")))
+ .map(re -> new RelatedEntity(r.getAs("reltype"), r.getAs("subreltype"), r.getAs("relclass"), re._1(), re._2()))
+ .limit(LIMIT)
+ .collect(Collectors.toList()));
+
+ }, bean(EntityNode.class));
+ }
+ };
+ }
+
+ private static String getJoinEntitiesSQL(String hiveDbName) {
+ return String.format(
+ "SELECT " +
+ "E_s.id AS id, " +
+ "E_s.type AS type, " +
+ "E_s.payload AS payload, " +
+ "r.reltype AS reltype, r.subreltype AS subreltype, r.relclass AS relclass, " +
+ "collect_list(concat(E_t.type, '@@', E_t.payload)) AS related_entity " +
+ "FROM %s.entities " + "" /*"TABLESAMPLE(0.1 PERCENT) "*/ + "E_s " +
+ "LEFT JOIN %s.relation r ON (r.source = E_s.id) " +
+ "JOIN %s.entities E_t ON (E_t.id = r.target) \n" +
+ "GROUP BY E_s.id, E_s.type, E_s.payload, r.reltype, r.subreltype, r.relclass", hiveDbName, hiveDbName, hiveDbName);
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json
new file mode 100644
index 000000000..613389d79
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true},
+ {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
new file mode 100644
index 000000000..fcab9dd00
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
@@ -0,0 +1,26 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hive_db_name
+ openaire
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
new file mode 100644
index 000000000..473b697cd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
@@ -0,0 +1,46 @@
+
+
+
+ hive_db_name
+ the target hive database name
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ GraphIndexing
+ eu.dnetlib.dhp.graph.SparkGraphIndexingJob
+ dhp-graph-provision-${projectVersion}.jar
+ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
+ -mt yarn-cluster
+ --hive_db_name${hive_db_name}
+ --hive_metastore_uris${hive_metastore_uris}
+
+
+
+
+
+
+
\ No newline at end of file