diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala
new file mode 100644
index 000000000..a8a737c23
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala
@@ -0,0 +1,85 @@
+package eu.dnetlib.sx.pangaea
+
+
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods.parse
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+
+case class PangaeaDataModel(datestamp:String, identifier:String, xml:String) {}
+
+
+
+object PangaeaUtils {
+
+
+ def toDataset(input:String):PangaeaDataModel = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+
+ val d = new Date()
+ val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format(d)}Z"
+
+ val ds = (json \ "internal-datestamp").extractOrElse[String](s)
+ val identifier= (json \ "metadatalink").extractOrElse[String]()
+ val xml= (json \ "xml").extract[String]
+ PangaeaDataModel(ds, identifier,xml)
+ }
+
+
+ def getDatasetAggregator(): Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] = new Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel]{
+
+
+ override def zero: PangaeaDataModel = null
+
+ override def reduce(b: PangaeaDataModel, a: (String, PangaeaDataModel)): PangaeaDataModel = {
+ if (b == null)
+ a._2
+ else {
+ if (a == null)
+ b
+ else {
+ val ts1 = b.datestamp
+ val ts2 = a._2.datestamp
+ if (ts1 > ts2)
+ b
+ else
+ a._2
+
+ }
+ }
+ }
+
+ override def merge(b1: PangaeaDataModel, b2: PangaeaDataModel): PangaeaDataModel = {
+ if (b1 == null)
+ b2
+ else {
+ if (b2 == null)
+ b1
+ else {
+ val ts1 = b1.datestamp
+ val ts2 = b2.datestamp
+ if (ts1 > ts2)
+ b1
+ else
+ b2
+
+ }
+ }
+ }
+ override def finish(reduction: PangaeaDataModel): PangaeaDataModel = reduction
+
+ override def bufferEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
+
+ override def outputEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
+ }
+
+
+
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala
new file mode 100644
index 000000000..17b286a7e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala
@@ -0,0 +1,53 @@
+package eu.dnetlib.sx.pangaea
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.io.Source
+
+object SparkGeneratePanagaeaDataset {
+
+
+ def main(args: Array[String]): Unit = {
+ val logger: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json")).mkString)
+ parser.parseArgument(args)
+
+
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}"))
+ logger.info("Converting sequential file into Dataset")
+ val sc:SparkContext = spark.sparkContext
+
+ val workingPath:String = parser.get("workingPath")
+
+ implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
+
+ val inputRDD:RDD[PangaeaDataModel] = sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s))
+
+ spark.createDataset(inputRDD).as[PangaeaDataModel]
+ .map(s => (s.identifier,s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(PangaeaUtils.getDatasetAggregator().toColumn)
+ .map(s => s._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset_updated")
+
+ }
+
+
+
+
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml
new file mode 100644
index 000000000..bdd48b0ab
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml
@@ -0,0 +1,19 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml
new file mode 100644
index 000000000..60acee211
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml
@@ -0,0 +1,40 @@
+
+
+
+ pangaeaWorkingPath
+ the Pangaea Working Path
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Convert Pangaea to Dataset
+ eu.dnetlib.sx.pangaea.SparkGeneratePanagaeaDataset
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --workingPath${pangaeaWorkingPath}
+ --masteryarn
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json
new file mode 100644
index 000000000..366f1426e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json
@@ -0,0 +1,4 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala
new file mode 100644
index 000000000..55eb4ee98
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala
@@ -0,0 +1,25 @@
+package eu.dnetlib.dhp.sx.pangaea
+
+import org.junit.jupiter.api.Test
+import java.util.TimeZone
+import java.text.SimpleDateFormat
+import java.util.Date
+class PangaeaTransformTest {
+
+
+
+ @Test
+ def test_dateStamp() :Unit ={
+
+
+
+ val d = new Date()
+
+ val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format(d)}Z"
+
+
+ println(s)
+
+ }
+
+}