forked from antonis.lempesis/dnet-hadoop
merged with master
This commit is contained in:
commit
fb405275f7
|
@ -71,20 +71,15 @@ public class SparkCountryPropagationJob3 {
|
|||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
execPropagation(
|
||||
spark,
|
||||
inputPath,
|
||||
outputPath,
|
||||
resultClazz,
|
||||
saveGraph);
|
||||
});
|
||||
spark -> execPropagation(
|
||||
spark,
|
||||
inputPath,
|
||||
outputPath,
|
||||
resultClazz,
|
||||
saveGraph));
|
||||
}
|
||||
|
||||
private static <R extends Result> void execPropagation(
|
||||
|
@ -101,8 +96,8 @@ public class SparkCountryPropagationJob3 {
|
|||
spark
|
||||
.read()
|
||||
.json(inputPath)
|
||||
.as(Encoders.kryo(resultClazz))
|
||||
.groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING())
|
||||
.as(Encoders.bean(resultClazz))
|
||||
.groupByKey((MapFunction<R, String>) r -> r.getId(), Encoders.STRING())
|
||||
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
|
|
Loading…
Reference in New Issue