diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala
new file mode 100644
index 000000000..5860c50ac
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala
@@ -0,0 +1,46 @@
+package eu.dnetlib.dhp.actionmanager.datacite
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
+import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
+import eu.dnetlib.dhp.utils.ISLookupClientFactory
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+
+object FilterCrossrefEntitiesSpark {
+
+ val log: Logger = LoggerFactory.getLogger(getClass.getClass)
+
+ def main(args: Array[String]): Unit = {
+ val conf = new SparkConf
+ val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString)
+ parser.parseArgument(args)
+ val master = parser.get("master")
+ val sourcePath = parser.get("sourcePath")
+ log.info("sourcePath: {}", sourcePath)
+ val targetPath = parser.get("targetPath")
+ log.info("targetPath: {}", targetPath)
+
+
+
+ val spark: SparkSession = SparkSession.builder().config(conf)
+ .appName(getClass.getSimpleName)
+ .master(master)
+ .getOrCreate()
+
+
+
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result]
+
+ val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf]
+
+ d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath)
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json
new file mode 100644
index 000000000..63e080337
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json
@@ -0,0 +1,21 @@
+[
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePath",
+ "paramDescription": "the source mdstore path",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "t",
+ "paramLongName": "targetPath",
+ "paramDescription": "the target mdstore path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "m",
+ "paramLongName": "master",
+ "paramDescription": "the master name",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml
new file mode 100644
index 000000000..dd3c32c62
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml
@@ -0,0 +1,23 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml
new file mode 100644
index 000000000..397288c69
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml
@@ -0,0 +1,84 @@
+
+
+
+ datacitePath
+ the path of Datacite spark dataset
+
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
+
+ crossrefPath
+ the path of Crossref spark dataset
+
+
+
+ targetPath
+ the path of Crossref spark dataset
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ yarn-cluster
+ cluster
+ ImportDatacite
+ eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${datacitePath}
+ --targetPath${targetPath}/datacite_oaf
+ --isLookupUrl${isLookupUrl}
+ --exportLinkstrue
+ --masteryarn-cluster
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ FilterCrossrefEntities
+ eu.dnetlib.dhp.actionmanager.datacite.FilterCrossrefEntitiesSpark
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${crossrefPath}
+ --targetPath${targetPath}/crossref_oaf
+ --masteryarn-cluster
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
index bac3687e2..1895ab741 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
@@ -10,7 +10,7 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]