diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
index 8915a090b..0e4e64842 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
@@ -9,8 +9,12 @@
The IS lookUp service endopoint
- targetPath
- The target path
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
skipUpdate
@@ -19,12 +23,31 @@
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
yarn
@@ -43,16 +66,52 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--workingPath${baselineWorkingPath}
- --targetPath${targetPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--masteryarn
--isLookupUrl${isLookupUrl}
--hdfsServerUri${nameNode}
--skipUpdate${skipUpdate}
-
-
+
+
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
index 8ac8b00bf..639918151 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
@@ -2,9 +2,12 @@ package eu.dnetlib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils
+import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.sx.bio.pubmed._
+import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
@@ -164,11 +167,15 @@ object SparkCreateBaselineDataFrame {
val workingPath = parser.get("workingPath")
log.info("workingPath: {}", workingPath)
- val targetPath = parser.get("targetPath")
- log.info("targetPath: {}", targetPath)
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion)
+
+ val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info("outputBasePath: {}", outputBasePath)
val hdfsServerUri = parser.get("hdfsServerUri")
- log.info("hdfsServerUri: {}", targetPath)
+ log.info("hdfsServerUri: {}", hdfsServerUri)
val skipUpdate = parser.get("skipUpdate")
log.info("skipUpdate: {}", skipUpdate)
@@ -216,8 +223,11 @@ object SparkCreateBaselineDataFrame {
.map(a => PubMedToOaf.convert(a, vocabularies))
.as[Oaf]
.filter(p => p != null),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
+ val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH")
+ val mdStoreSize = df.count
+ writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH")
}
}