OAF-store-graph mdstores: save them in text format

This commit is contained in:
Claudio Atzori 2022-01-04 14:40:16 +01:00
parent 045d767013
commit 174c3037e1
2 changed files with 5 additions and 7 deletions

View File

@ -48,18 +48,16 @@ object CollectionUtils {
List() List()
} }
def saveDataset(d: Dataset[Oaf], targetPath: String):Unit = { def saveDataset(dataset: Dataset[Oaf], targetPath: String): Unit = {
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
val mapper = new ObjectMapper
d dataset
.flatMap(i => CollectionUtils.fixRelations(i)) .flatMap(i => CollectionUtils.fixRelations(i))
.filter(i => i != null) .filter(i => i != null)
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.text(targetPath) .json(targetPath)
} }
} }

View File

@ -38,7 +38,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'") log.info(s"outputBasePath is '$outputBasePath'")
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
log.info(s"targetPath is '$targetPath'") log.info(s"targetPath is '$targetPath'")
generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark) generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark)
@ -54,7 +54,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log
* @param outputBasePath * @param outputBasePath
*/ */
def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = { def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = {
val total_items = spark.read.load(targetPath).count() val total_items = spark.read.text(targetPath).count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
} }