dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpa...

96 lines
3.3 KiB
Scala

package eu.dnetlib.dhp.bioschema
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.CollectionUtils
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
log.info(s"exportLinks is '$exportLinks'")
val datasourceKey = parser.get("datasourceKey").toLowerCase
log.info(s"datasourceKey is '$datasourceKey'")
// val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
// log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
// val mapper = new ObjectMapper()
// val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
// val outputBasePath = cleanedMdStoreVersion.getHdfsPath
// log.info(s"outputBasePath is '$outputBasePath'")
// val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
// log.info(s"targetPath is '$targetPath'")
val targetPath = parser.get("targetPath")
generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, spark)
// reportTotalSize(targetPath, outputBasePath)
}
/** For working with MDStore we need to store in a file on hdfs the size of
* the current dataset
* @param targetPath
* @param outputBasePath
*/
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
val total_items = spark.read.text(targetPath).count()
writeHdfsFile(
spark.sparkContext.hadoopConfiguration,
s"$total_items",
outputBasePath + MDSTORE_SIZE_PATH
)
}
/** Generate the transformed and cleaned OAF Dataset from the native one
*
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite
* @param exportLinks If true it generates unresolved links
* @param targetPath the targetPath of the result Dataset
*/
def generateBioschemaDataset(
sourcePath: String,
exportLinks: Boolean,
targetPath: String,
datasourceKey: String,
spark: SparkSession
): Unit = {
require(spark != null)
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
CollectionUtils.saveDataset(
spark.createDataset(
spark.sparkContext
.textFile(sourcePath)
.flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey))
),
targetPath
)
}
}
object GenerateBioschemaDatasetSpark {
val log: Logger = LoggerFactory.getLogger(GenerateBioschemaDatasetSpark.getClass)
def main(args: Array[String]): Unit = {
new GenerateBioschemaDatasetSpark(
"/eu/dnetlib/dhp/bioschema/generate_dataset_params.json",
args,
log
).initialize().run()
}
}