diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
index 071d202b6..b2432ea7b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
sourcePath
@@ -8,19 +8,40 @@
database
the PDB Database Working Path
-
- targetPath
- the Target Working dir path
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
yarn
@@ -41,11 +62,48 @@
--masteryarn
--dbPath${sourcePath}
--database${database}
- --targetPath${targetPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
-
-
+
+
+
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
index 8dc8a2aae..3ba83764d 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
@@ -2,7 +2,7 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true},
+ {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the oaf path ", "paramRequired": true},
{"paramName":"s", "paramLongName":"skipUpdate", "paramDescription": "skip update ", "paramRequired": false},
{"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
index 76d0bfd6d..e205926ec 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
@@ -2,5 +2,5 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"db", "paramLongName":"database", "paramDescription": "should be PDB or UNIPROT", "paramRequired": true},
{"paramName":"p", "paramLongName":"dbPath", "paramDescription": "the path of the database to transform", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the OAF target path ", "paramRequired": true}
+ {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the oaf path ", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json
index 8039131b2..04d976a69 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json
@@ -1,5 +1,20 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the oaf path ", "paramRequired": true}
+ {
+ "paramName": "mt",
+ "paramLongName": "master",
+ "paramDescription": "should be local or yarn",
+ "paramRequired": true
+ },
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePath",
+ "paramDescription": "the source Path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mo",
+ "paramLongName": "mdstoreOutputVersion",
+ "paramDescription": "the oaf path ",
+ "paramRequired": true
+ }
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml
index 4b47ae38e..047a037dd 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml
@@ -9,34 +9,26 @@
the Working Path
- targetPath
- the OAF MDStore Path
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- sparkExecutorCores
- number of cores used by single executor
+ mdStoreManagerURI
+ the path of the cleaned mdstore
resumeFrom
- DownloadEBILinks
+ CreateEBIDataSet
node to start
-
+
${wf:conf('resumeFrom') eq 'DownloadEBILinks'}
- ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'}
+ ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'}
@@ -77,9 +69,29 @@
-
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
yarn-cluster
@@ -95,11 +107,49 @@
${sparkExtraOPT}
--sourcePath${sourcePath}/ebi_links_dataset
- --targetPath${targetPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--masteryarn
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
index 8915a090b..0e4e64842 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
@@ -9,8 +9,12 @@
The IS lookUp service endopoint
- targetPath
- The target path
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
skipUpdate
@@ -19,12 +23,31 @@
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
yarn
@@ -43,16 +66,52 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--workingPath${baselineWorkingPath}
- --targetPath${targetPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--masteryarn
--isLookupUrl${isLookupUrl}
--hdfsServerUri${nameNode}
--skipUpdate${skipUpdate}
-
-
+
+
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
index 96075b4f3..135713b9e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
@@ -2,13 +2,15 @@ package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils
+import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH,MDSTORE_SIZE_PATH}
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
-
+import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile}
object SparkTransformBioDatabaseToOAF {
def main(args: Array[String]): Unit = {
@@ -25,8 +27,13 @@ object SparkTransformBioDatabaseToOAF {
val dbPath: String = parser.get("dbPath")
log.info("dbPath: {}", database)
- val targetPath: String = parser.get("targetPath")
- log.info("targetPath: {}", database)
+
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion)
+
+ val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info("outputBasePath: {}", outputBasePath)
val spark: SparkSession =
SparkSession
@@ -43,24 +50,28 @@ object SparkTransformBioDatabaseToOAF {
case "UNIPROT" =>
CollectionUtils.saveDataset(
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
case "PDB" =>
CollectionUtils.saveDataset(
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
case "SCHOLIX" =>
CollectionUtils.saveDataset(
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
case "CROSSREF_LINKS" =>
CollectionUtils.saveDataset(
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
}
+
+ val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH")
+ val mdStoreSize = df.count
+ writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH")
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
index 8ac8b00bf..639918151 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
@@ -2,9 +2,12 @@ package eu.dnetlib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils
+import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.sx.bio.pubmed._
+import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
@@ -164,11 +167,15 @@ object SparkCreateBaselineDataFrame {
val workingPath = parser.get("workingPath")
log.info("workingPath: {}", workingPath)
- val targetPath = parser.get("targetPath")
- log.info("targetPath: {}", targetPath)
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion)
+
+ val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info("outputBasePath: {}", outputBasePath)
val hdfsServerUri = parser.get("hdfsServerUri")
- log.info("hdfsServerUri: {}", targetPath)
+ log.info("hdfsServerUri: {}", hdfsServerUri)
val skipUpdate = parser.get("skipUpdate")
log.info("skipUpdate: {}", skipUpdate)
@@ -216,8 +223,11 @@ object SparkCreateBaselineDataFrame {
.map(a => PubMedToOaf.convert(a, vocabularies))
.as[Oaf]
.filter(p => p != null),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
+ val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH")
+ val mdStoreSize = df.count
+ writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH")
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
index 7cb6153ff..227dccf14 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
@@ -9,6 +9,9 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
+import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
+import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile}
object SparkEBILinksToOaf {
@@ -32,8 +35,13 @@ object SparkEBILinksToOaf {
import spark.implicits._
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
- val targetPath = parser.get("targetPath")
- log.info(s"targetPath -> $targetPath")
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion)
+
+ val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info("outputBasePath: {}", outputBasePath)
+
implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
val ebLinks: Dataset[EBILinkItem] = spark.read
@@ -46,7 +54,10 @@ object SparkEBILinksToOaf {
.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)),
- targetPath
+ s"$outputBasePath/$MDSTORE_DATA_PATH"
)
+ val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH")
+ val mdStoreSize = df.count
+ writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH")
}
}