1
0
Fork 0

[BIP! Scores integration] Create updates as Result rather than subclasses; Result considers also metrics in the mergeFrom operation

This commit is contained in:
Claudio Atzori 2021-01-08 16:29:17 +01:00
parent 1361c9eb0c
commit 34229970e6
1 changed files with 4 additions and 3 deletions

View File

@ -125,16 +125,17 @@ public class SparkAtomicActionScoreJob implements Serializable {
return ret; return ret;
}, Encoders.bean(BipScore.class)) }, Encoders.bean(BipScore.class))
.groupByKey((MapFunction<BipScore, String>) value -> value.getId(), Encoders.STRING()) .groupByKey((MapFunction<BipScore, String>) value -> value.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, BipScore, I>) (k, it) -> { .mapGroups((MapGroupsFunction<String, BipScore, Result>) (k, it) -> {
Result ret = new Result(); Result ret = new Result();
ret.setDataInfo(getDataInfo());
BipScore first = it.next(); BipScore first = it.next();
ret.setId(first.getId()); ret.setId(first.getId());
ret.setMeasures(getMeasure(first)); ret.setMeasures(getMeasure(first));
it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value))); it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value)));
return (I) ret; return ret;
}, Encoders.bean(inputClazz)) }, Encoders.bean(Result.class))
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(inputClazz, p)) .map(p -> new AtomicAction(inputClazz, p))
.mapToPair( .mapToPair(