diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index b4a415bd75..dcdefaf102 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -100,7 +100,7 @@ public class SparkCountryPropagationJob2 { broadcast_datasourcecountryassoc).as(Encoders.bean(ResultCountrySet.class)); if(writeUpdates){ - writeUpdates(potentialUpdates.toJavaRDD(), outputPath + "/update_" + resultType); + writeUpdates(potentialUpdates, outputPath + "/update_" + resultType); } if(saveGraph){ @@ -147,9 +147,13 @@ public class SparkCountryPropagationJob2 { log.info("Saving graph table to path: {}", outputPath); //log.info("number of saved recordsa: {}", new_table.count()); new_table - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath , GzipCodec.class); + .toJSON() + .write() + .option("compression", "gzip") + .text(outputPath); +// .toJavaRDD() +// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) +// .saveAsTextFile(outputPath , GzipCodec.class); } @@ -219,9 +223,14 @@ public class SparkCountryPropagationJob2 { .map(value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), Encoders.bean(DatasourceCountry.class)); } - private static void writeUpdates(JavaRDD potentialUpdates, String outputPath){ - potentialUpdates.map(u -> OBJECT_MAPPER.writeValueAsString(u)) - .saveAsTextFile(outputPath, GzipCodec.class); + private static void writeUpdates(Dataset potentialUpdates, String outputPath){ + potentialUpdates + .toJSON() + .write() + .option("compression", "gzip") + .text(outputPath); +// map(u -> OBJECT_MAPPER.writeValueAsString(u)) +// .saveAsTextFile(outputPath, GzipCodec.class); }