From 3dc48c7ab51fcf843c615614db9e5ea9eec6cb4b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 4 Jan 2022 14:40:16 +0100 Subject: [PATCH] OAF-store-graph mdstores: save them in text format --- .../java/eu/dnetlib/dhp/collection/CollectionUtils.scala | 8 +++----- .../dhp/datacite/GenerateDataciteDatasetSpark.scala | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala index 509e464e5a..e7de60e077 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -48,18 +48,16 @@ object CollectionUtils { List() } - def saveDataset(d: Dataset[Oaf], targetPath: String):Unit = { + def saveDataset(dataset: Dataset[Oaf], targetPath: String): Unit = { implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - val mapper = new ObjectMapper - d + dataset .flatMap(i => CollectionUtils.fixRelations(i)) .filter(i => i != null) - .map(r => mapper.writeValueAsString(r))(Encoders.STRING) .write .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(targetPath) + .json(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index d11c33fb49..a205edcf28 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -38,7 +38,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) val outputBasePath = cleanedMdStoreVersion.getHdfsPath log.info(s"outputBasePath is '$outputBasePath'") - val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" log.info(s"targetPath is '$targetPath'") generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark) @@ -54,7 +54,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log * @param outputBasePath */ def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = { - val total_items = spark.read.load(targetPath).count() + val total_items = spark.read.text(targetPath).count() writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) }