1
0
Fork 0

using Encoders.bean instead of kryo

This commit is contained in:
Claudio Atzori 2020-05-07 11:41:41 +02:00
parent 73243793b2
commit 5b3f8a0e90
1 changed files with 7 additions and 12 deletions

View File

@ -71,20 +71,15 @@ public class SparkCountryPropagationJob3 {
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> execPropagation(
spark,
execPropagation( inputPath,
spark, outputPath,
inputPath, resultClazz,
outputPath, saveGraph));
resultClazz,
saveGraph);
});
} }
private static <R extends Result> void execPropagation( private static <R extends Result> void execPropagation(
@ -101,7 +96,7 @@ public class SparkCountryPropagationJob3 {
spark spark
.read() .read()
.json(inputPath) .json(inputPath)
.as(Encoders.kryo(resultClazz)) .as(Encoders.bean(resultClazz))
.groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING()) .groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING())
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz)) .mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
.write() .write()