forked from D-Net/dnet-hadoop
This commit is contained in:
commit
e124278934
7
:
7
:
|
@ -1,7 +0,0 @@
|
||||||
Merge remote-tracking branch 'upstream/master'
|
|
||||||
|
|
||||||
# Please enter a commit message to explain why this merge is necessary,
|
|
||||||
# especially if it merges an updated upstream into a topic branch.
|
|
||||||
#
|
|
||||||
# Lines starting with '#' will be ignored, and an empty message aborts
|
|
||||||
# the commit.
|
|
|
@ -71,6 +71,7 @@ public class SparkCountryPropagationJob3 {
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||||
|
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
|
@ -100,7 +101,7 @@ public class SparkCountryPropagationJob3 {
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.as(Encoders.bean(resultClazz))
|
.as(Encoders.kryo(resultClazz))
|
||||||
.groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING())
|
.groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING())
|
||||||
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
|
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
|
||||||
.write()
|
.write()
|
||||||
|
|
Loading…
Reference in New Issue