changed the way to save the results on filesystem

This commit is contained in:
Miriam Baglioni 2020-04-11 16:47:34 +02:00
parent 87f802821e
commit 8f12292daa
1 changed files with 16 additions and 7 deletions

View File

@ -100,7 +100,7 @@ public class SparkCountryPropagationJob2 {
broadcast_datasourcecountryassoc).as(Encoders.bean(ResultCountrySet.class));
if(writeUpdates){
writeUpdates(potentialUpdates.toJavaRDD(), outputPath + "/update_" + resultType);
writeUpdates(potentialUpdates, outputPath + "/update_" + resultType);
}
if(saveGraph){
@ -147,9 +147,13 @@ public class SparkCountryPropagationJob2 {
log.info("Saving graph table to path: {}", outputPath);
//log.info("number of saved recordsa: {}", new_table.count());
new_table
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath , GzipCodec.class);
.toJSON()
.write()
.option("compression", "gzip")
.text(outputPath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(outputPath , GzipCodec.class);
}
@ -219,9 +223,14 @@ public class SparkCountryPropagationJob2 {
.map(value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), Encoders.bean(DatasourceCountry.class));
}
private static void writeUpdates(JavaRDD<ResultCountrySet> potentialUpdates, String outputPath){
potentialUpdates.map(u -> OBJECT_MAPPER.writeValueAsString(u))
.saveAsTextFile(outputPath, GzipCodec.class);
private static void writeUpdates(Dataset<ResultCountrySet> potentialUpdates, String outputPath){
potentialUpdates
.toJSON()
.write()
.option("compression", "gzip")
.text(outputPath);
// map(u -> OBJECT_MAPPER.writeValueAsString(u))
// .saveAsTextFile(outputPath, GzipCodec.class);
}