forked from D-Net/dnet-hadoop
Added workflow for transforming BaseLine in production
This commit is contained in:
parent
699736addc
commit
3d115d6912
|
@ -1,6 +1,8 @@
|
|||
package eu.dnetlib.dhp.application
|
||||
|
||||
import scala.io.Source
|
||||
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
|
||||
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
|
||||
|
||||
/** This is the main Interface SparkApplication
|
||||
* where all the Spark Scala class should inherit
|
||||
|
@ -70,4 +72,13 @@ abstract class AbstractScalaApplication(
|
|||
.getOrCreate()
|
||||
}
|
||||
|
||||
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
|
||||
val total_items = spark.read.text(targetPath).count()
|
||||
writeHdfsFile(
|
||||
spark.sparkContext.hadoopConfiguration,
|
||||
s"$total_items",
|
||||
outputBasePath + MDSTORE_SIZE_PATH
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -0,0 +1,127 @@
|
|||
<workflow-app name="Download_Transform_Pubmed_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the Baseline Source Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>The IS lookUp service endopoint</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mdStoreOutputId</name>
|
||||
<description>the identifier of the cleaned MDStore</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mdStoreManagerURI</name>
|
||||
<description>the path of the cleaned mdstore</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="StartTransaction"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="StartTransaction">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<arg>--action</arg><arg>NEW_VERSION</arg>
|
||||
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="TransformJob"/>
|
||||
<error to="EndReadRollBack"/>
|
||||
</action>
|
||||
|
||||
<action name="TransformJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Baseline to OAF Dataset</name>
|
||||
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkTransformBaselineToOAF</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.shuffle.partitions=2000
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
</spark>
|
||||
<ok to="CommitVersion"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CommitVersion">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<arg>--action</arg><arg>COMMIT</arg>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="EndReadRollBack">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<arg>--action</arg><arg>READ_UNLOCK</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="RollBack"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="RollBack">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<arg>--action</arg><arg>ROLLBACK</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
</java>
|
||||
<ok to="Kill"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,6 @@
|
|||
[
|
||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||
{"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
|
||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequential file to read", "paramRequired": true},
|
||||
{"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore Output Version", "paramRequired": true}
|
||||
]
|
|
@ -3,11 +3,10 @@ package eu.dnetlib.dhp.datacite
|
|||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.collection.CollectionUtils
|
||||
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
|
||||
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
@ -46,20 +45,6 @@ class GenerateDataciteDatasetSpark(propertyPath: String, args: Array[String], lo
|
|||
reportTotalSize(targetPath, outputBasePath)
|
||||
}
|
||||
|
||||
/** For working with MDStore we need to store in a file on hdfs the size of
|
||||
* the current dataset
|
||||
* @param targetPath
|
||||
* @param outputBasePath
|
||||
*/
|
||||
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
|
||||
val total_items = spark.read.text(targetPath).count()
|
||||
writeHdfsFile(
|
||||
spark.sparkContext.hadoopConfiguration,
|
||||
s"$total_items",
|
||||
outputBasePath + MDSTORE_SIZE_PATH
|
||||
)
|
||||
}
|
||||
|
||||
/** Generate the transformed and cleaned OAF Dataset from the native one
|
||||
*
|
||||
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package eu.dnetlib.dhp.sx.bio.ebi
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.collection.CollectionUtils
|
||||
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PubMedToOaf}
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
class SparkTransformBaselineToOAF(propertyPath: String, args: Array[String], log: Logger)
|
||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||
|
||||
private def convertBaseLineToOAF(
|
||||
spark: SparkSession,
|
||||
inputPath: String,
|
||||
targetPath: String,
|
||||
vocabularies: VocabularyGroup
|
||||
): Unit = {
|
||||
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
||||
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||||
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
||||
val exported_dataset = spark.read.load(inputPath).as[PMArticle]
|
||||
CollectionUtils.saveDataset(
|
||||
exported_dataset
|
||||
.map(a => PubMedToOaf.convert(a, vocabularies))
|
||||
.as[Oaf]
|
||||
.filter(p => p != null),
|
||||
targetPath
|
||||
)
|
||||
}
|
||||
|
||||
/** Here all the spark applications runs this method
|
||||
* where the whole logic of the spark node is defined
|
||||
*/
|
||||
override def run(): Unit = {
|
||||
val isLookupUrl: String = parser.get("isLookupUrl")
|
||||
log.info("isLookupUrl: {}", isLookupUrl)
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info("sourcePath: {}", sourcePath)
|
||||
|
||||
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
|
||||
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
|
||||
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
|
||||
log.info(s"outputBasePath is '$outputBasePath'")
|
||||
|
||||
val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
|
||||
log.info(s"targetPath is '$targetPath'")
|
||||
|
||||
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
||||
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
||||
|
||||
convertBaseLineToOAF(spark, sourcePath, targetPath, vocabularies)
|
||||
reportTotalSize(targetPath, outputBasePath)
|
||||
}
|
||||
}
|
||||
|
||||
object SparkTransformBaselineToOAF {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(SparkTransformBaselineToOAF.getClass)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
new SparkTransformBaselineToOAF("/eu/dnetlib/dhp/sx/bio/ebi/pubmed_to_oaf_params.json", args, log)
|
||||
.initialize()
|
||||
.run()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue