[Actionmanager] remove invalid records from the input graph before groupGraphTableByIdAndMerge

This commit is contained in:
Claudio Atzori 2021-03-23 13:39:24 +01:00
parent e5ebb500cf
commit 1e423fdc07
1 changed files with 2 additions and 0 deletions

View File

@ -111,7 +111,9 @@ public class PromoteActionPayloadFunctions {
SerializableSupplier<Function<G, Boolean>> isNotZeroFn,
Class<G> rowClazz) {
TypedColumn<G, G> aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn();
return rowDS
.filter((FilterFunction<G>) o -> isNotZeroFn.get().apply(o))
.groupByKey((MapFunction<G, String>) x -> rowIdFn.get().apply(x), Encoders.STRING())
.agg(aggregator)
.map((MapFunction<Tuple2<String, G>, G>) Tuple2::_2, Encoders.kryo(rowClazz));