diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 8cecaffbed..c327262ae5 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -8,7 +8,7 @@
dhp-workflows
1.0.0-SNAPSHOT
- dhp-aggregations
+ dhp-aggregation
org.apache.spark
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
new file mode 100644
index 0000000000..3af9b376ad
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ dhp-graph-mapper
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib
+ dnet-openaire-data-protos
+
+
+
+ com.googlecode.protobuf-java-format
+ protobuf-java-format
+
+
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ 1.0.0-SNAPSHOT
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ 1.0.0-SNAPSHOT
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java
new file mode 100644
index 0000000000..9f22d7875e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java
@@ -0,0 +1,92 @@
+package eu.dnetlib.dhp.graph;
+
+
+import com.googlecode.protobuf.format.JsonFormat;
+import eu.dnetlib.data.proto.KindProtos;
+import eu.dnetlib.data.proto.OafProtos;
+import eu.dnetlib.dhp.schema.oaf.*;
+
+import java.io.Serializable;
+
+
+public class ProtoConverter implements Serializable {
+
+
+ public static Oaf convert(String s) {
+ try {
+ final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder();
+ JsonFormat.merge(s, builder);
+
+ if (builder.getKind() == KindProtos.Kind.entity)
+ return convertEntity(builder);
+ else {
+ return convertRelation(builder);
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Relation convertRelation(OafProtos.Oaf.Builder oaf) {
+ return new Relation();
+ }
+
+ private static OafEntity convertEntity(OafProtos.Oaf.Builder oaf) {
+
+ switch (oaf.getEntity().getType()) {
+ case result:
+ return convertResult(oaf);
+ case project:
+ return convertProject(oaf);
+ case datasource:
+ return convertDataSource(oaf);
+ case organization:
+ return convertOrganization(oaf);
+ default:
+ throw new RuntimeException("received unknown type");
+ }
+ }
+
+ private static Organization convertOrganization(OafProtos.Oaf.Builder oaf) {
+ return new Organization();
+ }
+
+ private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) {
+ return new Datasource();
+ }
+
+ private static Project convertProject(OafProtos.Oaf.Builder oaf) {
+ return new Project();
+ }
+
+ private static Result convertResult(OafProtos.Oaf.Builder oaf) {
+ switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
+ case "dataset":
+ return createDataset(oaf);
+ case "publication":
+ return createPublication(oaf);
+ case "software":
+ return createSoftware(oaf);
+ case "orp":
+ return createORP(oaf);
+ default:
+ throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
+ }
+ }
+
+ private static Software createSoftware(OafProtos.Oaf.Builder oaf) {
+ return new Software();
+ }
+
+ private static OtherResearchProducts createORP(OafProtos.Oaf.Builder oaf) {
+ return new OtherResearchProducts();
+ }
+
+ private static Publication createPublication(OafProtos.Oaf.Builder oaf) {
+ return new Publication();
+ }
+
+ private static Dataset createDataset(OafProtos.Oaf.Builder oaf) {
+ return new Dataset();
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java
new file mode 100644
index 0000000000..4ef046631b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java
@@ -0,0 +1,44 @@
+package eu.dnetlib.dhp.graph;
+
+
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+public class SparkGraphImporterJob {
+
+
+ public static void main(String[] args) throws Exception{
+
+ //TODO add argument parser
+// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/graph_importer_parameters.json")));
+// parser.parseArgument(args);
+
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName("ImportGraph")
+ //TODO replace with: master(parser.get("master"))
+ .master("local[16]")
+ .getOrCreate();
+
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+
+
+ final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-02236", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
+
+ final long totalPublication = inputRDD
+ .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
+ .map(Tuple2::_2)
+ .map(ProtoConverter::convert)
+ .filter(s -> s instanceof Publication)
+ .count();
+
+ System.out.println(totalPublication);
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java
new file mode 100644
index 0000000000..1e561aa3eb
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java
@@ -0,0 +1,12 @@
+package eu.dnetlib.dhp.graph;
+
+import org.junit.Test;
+
+public class SparkGraphImporterJobTest {
+
+ @Test
+ public void testImport() throws Exception {
+ SparkGraphImporterJob.main(null);
+ }
+
+}
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index bb6a53feb0..df6119d3c8 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -17,6 +17,7 @@
dhp-aggregation
dhp-distcp
+ dhp-graph-mapper
diff --git a/pom.xml b/pom.xml
index a91114b2ec..5e00ada157 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,6 @@
dhp-schemas
dhp-common
dhp-workflows
- dhp-applications