From 9d342a47da489d71c3a739b06943a4f09a1225ee Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 18 Dec 2023 11:48:57 +0100 Subject: [PATCH] updated the transformation Baseline workflow to include mdstore rollback/commit action --- .../dhp/sx/bio/pubmed/oozie_app/workflow.xml | 69 +++++++++++++++++-- .../ebi/SparkCreateBaselineDataFrame.scala | 18 +++-- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 8915a090b..30eb41469 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + baselineWorkingPath @@ -9,8 +9,12 @@ The IS lookUp service endopoint - targetPath - The target path + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore skipUpdate @@ -19,12 +23,31 @@ - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + yarn @@ -43,16 +66,52 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${baselineWorkingPath} - --targetPath${targetPath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --masteryarn --isLookupUrl${isLookupUrl} --hdfsServerUri${nameNode} --skipUpdate${skipUpdate} + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 8ac8b00bf..639918151 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -2,9 +2,12 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.sx.bio.pubmed._ +import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration @@ -164,11 +167,15 @@ object SparkCreateBaselineDataFrame { val workingPath = parser.get("workingPath") log.info("workingPath: {}", workingPath) - val targetPath = parser.get("targetPath") - log.info("targetPath: {}", targetPath) + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion) + + val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info("outputBasePath: {}", outputBasePath) val hdfsServerUri = parser.get("hdfsServerUri") - log.info("hdfsServerUri: {}", targetPath) + log.info("hdfsServerUri: {}", hdfsServerUri) val skipUpdate = parser.get("skipUpdate") log.info("skipUpdate: {}", skipUpdate) @@ -216,8 +223,11 @@ object SparkCreateBaselineDataFrame { .map(a => PubMedToOaf.convert(a, vocabularies)) .as[Oaf] .filter(p => p != null), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) + val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH") + val mdStoreSize = df.count + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH") } }