1
0
Fork 0

updated workflow baseline to direct transform on OAF

This commit is contained in:
Sandro La Bruzzo 2024-11-11 10:27:23 +01:00
parent c1cef5d685
commit 0d0904f4ec
3 changed files with 27 additions and 27 deletions

View File

@ -2,6 +2,6 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the baseline path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the mdstore path to save", "paramRequired": true}
{"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore path to save", "paramRequired": true}
]

View File

@ -16,11 +16,6 @@
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>skipUpdate</name>
<value>false</value>
<description>The request block size</description>
</property>
</parameters>
<start to="StartTransaction"/>
@ -44,16 +39,16 @@
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="ConvertDataset"/>
<ok to="TransformPubMed"/>
<error to="RollBack"/>
</action>
<action name="ConvertDataset">
<action name="TransformPubMed">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Convert Baseline to OAF Dataset</name>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<name>Convert Baseline Pubmed to OAF Dataset</name>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkCreatePubmedDump</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -65,12 +60,10 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${baselineWorkingPath}</arg>
<arg>--sourcePath</arg><arg>${baselineWorkingPath}</arg>
<arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--master</arg><arg>yarn</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--skipUpdate</arg><arg>${skipUpdate}</arg>
</spark>
<ok to="CommitVersion"/>
<error to="RollBack"/>

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.sx.bio.ebi
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.{AbstractScalaApplication, ArgumentApplicationParser}
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PMParser2, PubMedToOaf}
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import java.io.ByteArrayInputStream
import javax.xml.stream.XMLInputFactory
class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
@ -24,16 +20,26 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg
log.info("isLookupUrl: {}", isLookupUrl)
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
val targetPath = parser.get("targetPath")
log.info(s"TargetPath is '$targetPath'")
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'")
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
createPubmedDump(spark, sourcePath, targetPath, vocabularies)
createPubmedDump(spark, sourcePath, outputBasePath, vocabularies)
}
/** This method creates a dump of the pubmed articles
* @param spark the spark session
* @param sourcePath the path of the source file
* @param targetPath the path of the target file
* @param vocabularies the vocabularies
*/
def createPubmedDump(
spark: SparkSession,
sourcePath: String,
@ -54,6 +60,7 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg
})
.filter(s => s != null)
.map { i =>
//remove try catch
try {
new PMParser2().parse(i)
} catch {