diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala index 509e464e5..e7de60e07 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -48,18 +48,16 @@ object CollectionUtils { List() } - def saveDataset(d: Dataset[Oaf], targetPath: String):Unit = { + def saveDataset(dataset: Dataset[Oaf], targetPath: String): Unit = { implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - val mapper = new ObjectMapper - d + dataset .flatMap(i => CollectionUtils.fixRelations(i)) .filter(i => i != null) - .map(r => mapper.writeValueAsString(r))(Encoders.STRING) .write .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(targetPath) + .json(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index d11c33fb4..a205edcf2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -38,7 +38,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) val outputBasePath = cleanedMdStoreVersion.getHdfsPath log.info(s"outputBasePath is '$outputBasePath'") - val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" log.info(s"targetPath is '$targetPath'") generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark) @@ -54,7 +54,7 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log * @param outputBasePath */ def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = { - val total_items = spark.read.load(targetPath).count() + val total_items = spark.read.text(targetPath).count() writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) }