From 1f1a6a5f5fce4643afae554c5b9045be9774b7c4 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 18 Dec 2023 11:47:00 +0100 Subject: [PATCH 1/3] updated the transformation Baseline workflow to include mdstore rollback/commit action --- .../dhp/sx/bio/pubmed/oozie_app/workflow.xml | 73 +++++++++++++++++-- .../ebi/SparkCreateBaselineDataFrame.scala | 18 ++++- 2 files changed, 80 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 8915a090b..0e4e64842 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -9,8 +9,12 @@ The IS lookUp service endopoint - targetPath - The target path + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore skipUpdate @@ -19,12 +23,31 @@ - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + yarn @@ -43,16 +66,52 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${baselineWorkingPath} - --targetPath${targetPath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --masteryarn --isLookupUrl${isLookupUrl} --hdfsServerUri${nameNode} --skipUpdate${skipUpdate} - - + + - + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 8ac8b00bf..639918151 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -2,9 +2,12 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.sx.bio.pubmed._ +import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration @@ -164,11 +167,15 @@ object SparkCreateBaselineDataFrame { val workingPath = parser.get("workingPath") log.info("workingPath: {}", workingPath) - val targetPath = parser.get("targetPath") - log.info("targetPath: {}", targetPath) + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion) + + val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info("outputBasePath: {}", outputBasePath) val hdfsServerUri = parser.get("hdfsServerUri") - log.info("hdfsServerUri: {}", targetPath) + log.info("hdfsServerUri: {}", hdfsServerUri) val skipUpdate = parser.get("skipUpdate") log.info("skipUpdate: {}", skipUpdate) @@ -216,8 +223,11 @@ object SparkCreateBaselineDataFrame { .map(a => PubMedToOaf.convert(a, vocabularies)) .as[Oaf] .filter(p => p != null), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) + val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH") + val mdStoreSize = df.count + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH") } } From 9d39845d1f484e1df7bf0d4a49bd7802cf38117b Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 18 Dec 2023 12:23:12 +0100 Subject: [PATCH 2/3] uploaded input parameters on CreateBaseline WF --- .../eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json index 8dc8a2aae..3ba83764d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json @@ -2,7 +2,7 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true}, + {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the oaf path ", "paramRequired": true}, {"paramName":"s", "paramLongName":"skipUpdate", "paramDescription": "skip update ", "paramRequired": false}, {"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true} ] \ No newline at end of file From 37e36baf7637e5b984f840b06d0805224d50f5b5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 18 Dec 2023 16:05:35 +0100 Subject: [PATCH 3/3] updated workflow for generation of Scholix Datasource's to use mdstore transactions --- .../dhp/sx/bio/db/oozie_app/workflow.xml | 76 +++++++++++++++-- .../dhp/sx/bio/ebi/bio_to_oaf_params.json | 2 +- .../dhp/sx/bio/ebi/ebi_to_df_params.json | 21 ++++- .../dhp/sx/bio/ebi/oozie_app/workflow.xml | 84 +++++++++++++++---- .../bio/SparkTransformBioDatabaseToOAF.scala | 25 ++++-- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 17 +++- 6 files changed, 185 insertions(+), 40 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml index 071d202b6..b2432ea7b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -8,19 +8,40 @@ database the PDB Database Working Path - - targetPath - the Target Working dir path + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + yarn @@ -41,11 +62,48 @@ --masteryarn --dbPath${sourcePath} --database${database} - --targetPath${targetPath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} - - + + + - + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json index 76d0bfd6d..e205926ec 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json @@ -2,5 +2,5 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"db", "paramLongName":"database", "paramDescription": "should be PDB or UNIPROT", "paramRequired": true}, {"paramName":"p", "paramLongName":"dbPath", "paramDescription": "the path of the database to transform", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the OAF target path ", "paramRequired": true} + {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the oaf path ", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json index 8039131b2..04d976a69 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json @@ -1,5 +1,20 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the oaf path ", "paramRequired": true} + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source Path", + "paramRequired": true + }, + { + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", + "paramDescription": "the oaf path ", + "paramRequired": true + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml index 4b47ae38e..047a037dd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml @@ -9,34 +9,26 @@ the Working Path - targetPath - the OAF MDStore Path + mdStoreOutputId + the identifier of the cleaned MDStore - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor + mdStoreManagerURI + the path of the cleaned mdstore resumeFrom - DownloadEBILinks + CreateEBIDataSet node to start - + ${wf:conf('resumeFrom') eq 'DownloadEBILinks'} - ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'} + ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'} @@ -77,9 +69,29 @@ - + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + yarn-cluster @@ -95,11 +107,49 @@ ${sparkExtraOPT} --sourcePath${sourcePath}/ebi_links_dataset - --targetPath${targetPath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --masteryarn + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 96075b4f3..135713b9e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -2,13 +2,15 @@ package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH,MDSTORE_SIZE_PATH} +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} - +import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile} object SparkTransformBioDatabaseToOAF { def main(args: Array[String]): Unit = { @@ -25,8 +27,13 @@ object SparkTransformBioDatabaseToOAF { val dbPath: String = parser.get("dbPath") log.info("dbPath: {}", database) - val targetPath: String = parser.get("targetPath") - log.info("targetPath: {}", database) + + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion) + + val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info("outputBasePath: {}", outputBasePath) val spark: SparkSession = SparkSession @@ -43,24 +50,28 @@ object SparkTransformBioDatabaseToOAF { case "UNIPROT" => CollectionUtils.saveDataset( spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) case "PDB" => CollectionUtils.saveDataset( spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) case "SCHOLIX" => CollectionUtils.saveDataset( spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) case "CROSSREF_LINKS" => CollectionUtils.saveDataset( spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) } + + val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH") + val mdStoreSize = df.count + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH") } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 7cb6153ff..227dccf14 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -9,6 +9,9 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile} object SparkEBILinksToOaf { @@ -32,8 +35,13 @@ object SparkEBILinksToOaf { import spark.implicits._ val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") - val targetPath = parser.get("targetPath") - log.info(s"targetPath -> $targetPath") + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info("mdstoreOutputVersion: {}", mdstoreOutputVersion) + + val cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info("outputBasePath: {}", outputBasePath) + implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) val ebLinks: Dataset[EBILinkItem] = spark.read @@ -46,7 +54,10 @@ object SparkEBILinksToOaf { .flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)), - targetPath + s"$outputBasePath/$MDSTORE_DATA_PATH" ) + val df = spark.read.text(s"$outputBasePath/$MDSTORE_DATA_PATH") + val mdStoreSize = df.count + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$mdStoreSize", s"$outputBasePath/$MDSTORE_SIZE_PATH") } }