diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index 56c8dd05a..c0192cddb 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -111,7 +111,9 @@ public class PromoteActionPayloadFunctions { SerializableSupplier> isNotZeroFn, Class rowClazz) { TypedColumn aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn(); + return rowDS + .filter((FilterFunction) o -> isNotZeroFn.get().apply(o)) .groupByKey((MapFunction) x -> rowIdFn.get().apply(x), Encoders.STRING()) .agg(aggregator) .map((MapFunction, G>) Tuple2::_2, Encoders.kryo(rowClazz));