dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpa...

78 lines
2.8 KiB
Scala

package eu.dnetlib.dhp.bioschema
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.CollectionUtils
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.Logger
class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
log.info(s"exportLinks is '$exportLinks'")
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'")
val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
log.info(s"targetPath is '$targetPath'")
generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark)
reportTotalSize(targetPath, outputBasePath)
}
/** For working with MDStore we need to store in a file on hdfs the size of
* the current dataset
* @param targetPath
* @param outputBasePath
*/
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
val total_items = spark.read.text(targetPath).count()
writeHdfsFile(
spark.sparkContext.hadoopConfiguration,
s"$total_items",
outputBasePath + MDSTORE_SIZE_PATH
)
}
/** Generate the transformed and cleaned OAF Dataset from the native one
*
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite
* @param exportLinks If true it generates unresolved links
* @param targetPath the targetPath of the result Dataset
*/
def generateBioschemaDataset(
sourcePath: String,
exportLinks: Boolean,
targetPath: String,
spark: SparkSession
): Unit = {
require(spark != null)
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
CollectionUtils.saveDataset(
spark.createDataset(
spark.sparkContext
.textFile(sourcePath)
.flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks))
),
targetPath
)
}
}