From 0d0904f4ec2b1d4424d5d8d5d6d782049c0544cd Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 11 Nov 2024 10:27:23 +0100 Subject: [PATCH] updated workflow baseline to direct transform on OAF --- .../sx/bio/ebi/baseline_to_oaf_params.json | 8 ++--- .../dhp/sx/bio/pubmed/oozie_app/workflow.xml | 17 ++++------- .../sx/bio/ebi/SparkCreatePubmedDump.scala | 29 ++++++++++++------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json index 8326fab0f3..0fcc03266e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json @@ -1,7 +1,7 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the baseline path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the mdstore path to save", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the baseline path", "paramRequired": true}, + {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore path to save", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 30eb414698..0f4c5c2495 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -16,11 +16,6 @@ mdStoreManagerURI the path of the cleaned mdstore - - skipUpdate - false - The request block size - @@ -44,16 +39,16 @@ --mdStoreManagerURI${mdStoreManagerURI} - + - + yarn cluster - Convert Baseline to OAF Dataset - eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame + Convert Baseline Pubmed to OAF Dataset + eu.dnetlib.dhp.sx.bio.ebi.SparkCreatePubmedDump dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -65,12 +60,10 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --workingPath${baselineWorkingPath} + --sourcePath${baselineWorkingPath} --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --masteryarn --isLookupUrl${isLookupUrl} - --hdfsServerUri${nameNode} - --skipUpdate${skipUpdate} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala index c21bfd7c3b..1bdd2a4bcc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala @@ -1,18 +1,14 @@ package eu.dnetlib.dhp.sx.bio.ebi import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.{AbstractScalaApplication, ArgumentApplicationParser} +import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PMParser2, PubMedToOaf} +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf} import eu.dnetlib.dhp.utils.ISLookupClientFactory -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import java.io.ByteArrayInputStream -import javax.xml.stream.XMLInputFactory - class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logger) extends AbstractScalaApplication(propertyPath, args, log: Logger) { @@ -24,16 +20,26 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg log.info("isLookupUrl: {}", isLookupUrl) val sourcePath = parser.get("sourcePath") log.info(s"SourcePath is '$sourcePath'") - val targetPath = parser.get("targetPath") - log.info(s"TargetPath is '$targetPath'") + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) - createPubmedDump(spark, sourcePath, targetPath, vocabularies) + createPubmedDump(spark, sourcePath, outputBasePath, vocabularies) } + /** This method creates a dump of the pubmed articles + * @param spark the spark session + * @param sourcePath the path of the source file + * @param targetPath the path of the target file + * @param vocabularies the vocabularies + */ def createPubmedDump( spark: SparkSession, sourcePath: String, @@ -54,6 +60,7 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg }) .filter(s => s != null) .map { i => + //remove try catch try { new PMParser2().parse(i) } catch {