From 5f82c9849486c57c8b92cfcb11140b3c7bef4406 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 26 Jan 2023 18:03:14 +0100 Subject: [PATCH] Updated all aggregation workflow to use mdstore manager in this way the commit/rollback mechanism is implemented. --- .../bio/baseline_oaf/oozie_app/workflow.xml | 2 +- .../dhp/sx/bio/db/oozie_app/workflow.xml | 137 ++++++++++++++---- .../dhp/sx/bio/ebi/bio_to_oaf_params.json | 2 +- .../transform/oozie_app/config-default.xml | 68 +++++++++ .../bio/ebi/transform/oozie_app/workflow.xml | 118 +++++++++++++++ .../bio/ebi/transform_ebi_to_df_params.json | 5 + .../eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala | 6 +- .../bio/SparkTransformBioDatabaseToOAF.scala | 82 +++++++---- .../bio/ebi/SparkTransformEBILinksToOaf.scala | 66 +++++++++ 9 files changed, 421 insertions(+), 65 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml index 4f9871a5c..05bc854d9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml @@ -65,7 +65,7 @@ --masteryarn - + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml index 071d202b6..08693a987 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml @@ -8,44 +8,121 @@ database the PDB Database Working Path - - targetPath - the Target Working dir path + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - Convert Bio DB to OAF Dataset - eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --dbPath${sourcePath} - --database${database} - --targetPath${targetPath} - - - - - + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + yarn + cluster + Convert Bio DB to OAF Dataset + eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --dbPath${sourcePath} + --database${database} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json index 76d0bfd6d..ec4af6ac9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json @@ -2,5 +2,5 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"db", "paramLongName":"database", "paramDescription": "should be PDB or UNIPROT", "paramRequired": true}, {"paramName":"p", "paramLongName":"dbPath", "paramDescription": "the path of the database to transform", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the OAF target path ", "paramRequired": true} + {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore Output Version", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml new file mode 100644 index 000000000..17cd6c9a3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml @@ -0,0 +1,68 @@ + + + + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + + + + + + + + + + + + + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml new file mode 100644 index 000000000..502ac0149 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml @@ -0,0 +1,118 @@ + + + + sourcePath + the PDB Database Working Path + + + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + yarn-cluster + cluster + Create OAF DataSet + eu.dnetlib.dhp.sx.bio.ebi.SparkTransformEBILinksToOaf + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=2000 + ${sparkExtraOPT} + + --sourcePath${sourcePath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --masteryarn + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json new file mode 100644 index 000000000..9aa5b178c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore Output Version", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala index ffdab1799..ceee14155 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala @@ -82,8 +82,8 @@ object BioDBToOAF { "Springer Nature" ) val EBICollectedFrom: KeyValue = OafMapperUtils.keyValue( - "10|opendoar____::83e60e09c222f206c725385f53d7e567c", - "EMBL-EBIs Protein Data Bank in Europe (PDBe)" + "10|fairsharing_::96c5c28becf18e71190460a9955aa4d8", + "Protein Data Bank in Europe" ) val pubmedCollectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central") @@ -120,7 +120,7 @@ object BioDBToOAF { val relation_semantic = (json \ "RelationshipType" \ "Name").extract[String] - val date = GraphCleaningFunctions.cleanDate((json \ "LinkedPublicationDate").extract[String]) + val date = GraphCleaningFunctions.cleanDate((json \ "LinkPublicationDate").extract[String]) createRelation( target_pid, diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 96075b4f3..10bfdce3c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -1,42 +1,26 @@ package eu.dnetlib.dhp.sx.bio -import eu.dnetlib.dhp.application.ArgumentApplicationParser +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved -import org.apache.commons.io.IOUtils -import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} -object SparkTransformBioDatabaseToOAF { +class SparkTransformBioDatabaseToOAF(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { - def main(args: Array[String]): Unit = { - val conf: SparkConf = new SparkConf() - val log: Logger = LoggerFactory.getLogger(getClass) - val parser = new ArgumentApplicationParser( - IOUtils.toString( - getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json") - ) - ) - parser.parseArgument(args) - val database: String = parser.get("database") - log.info("database: {}", database) - - val dbPath: String = parser.get("dbPath") - log.info("dbPath: {}", database) - val targetPath: String = parser.get("targetPath") - log.info("targetPath: {}", database) - - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")) - .getOrCreate() + private def convertDatabase( + spark: SparkSession, + dbPath: String, + database: String, + targetPath: String, + outputBasePath: String + ): Unit = { val sc = spark.sparkContext - implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) import spark.implicits._ database.toUpperCase() match { @@ -45,22 +29,60 @@ object SparkTransformBioDatabaseToOAF { spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))), targetPath ) + reportTotalSize(targetPath, outputBasePath) case "PDB" => CollectionUtils.saveDataset( spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))), targetPath ) + reportTotalSize(targetPath, outputBasePath) case "SCHOLIX" => CollectionUtils.saveDataset( spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)), targetPath ) + reportTotalSize(targetPath, outputBasePath) case "CROSSREF_LINKS" => CollectionUtils.saveDataset( - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))), + spark.read.load(dbPath).select("json").as[String].map(i => BioDBToOAF.crossrefLinksToOaf(i)), targetPath ) + reportTotalSize(targetPath, outputBasePath) } } + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val database: String = parser.get("database") + log.info("database: {}", database) + + val dbPath: String = parser.get("dbPath") + log.info("dbPath: {}", database) + + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" + log.info(s"targetPath is '$targetPath'") + convertDatabase(spark, dbPath, database, targetPath, outputBasePath) + + } +} + +object SparkTransformBioDatabaseToOAF { + val log: Logger = LoggerFactory.getLogger(SparkTransformBioDatabaseToOAF.getClass) + + def main(args: Array[String]): Unit = { + new SparkTransformBioDatabaseToOAF("/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json", args, log) + .initialize() + .run() + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala new file mode 100644 index 000000000..ff6f0563a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala @@ -0,0 +1,66 @@ +package eu.dnetlib.dhp.sx.bio.ebi + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.sx.bio.BioDBToOAF +import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +class SparkTransformEBILinksToOaf(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + def transformLinks(spark: SparkSession, sourcePath: String, outputBasePath: String, targetPath: String) = { + implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + import spark.implicits._ + val ebLinks: Dataset[EBILinkItem] = spark.read + .load(sourcePath) + .as[EBILinkItem] + .filter(l => l.links != null && l.links.startsWith("{")) + + CollectionUtils.saveDataset( + ebLinks + .flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) + .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) + .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)), + targetPath + ) + reportTotalSize(targetPath, outputBasePath) + } + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath is '$sourcePath'") + + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" + log.info(s"targetPath is '$targetPath'") + transformLinks(spark, sourcePath, outputBasePath, targetPath) + + } +} + +object SparkTransformEBILinksToOaf { + val log: Logger = LoggerFactory.getLogger(SparkTransformEBILinksToOaf.getClass) + + def main(args: Array[String]): Unit = { + new SparkTransformEBILinksToOaf("/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json", args, log) + .initialize() + .run() + } +}