From 1e423fdc0768ea1a3d1b9a7bee4bd50a77a8f9b7 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Mar 2021 13:39:24 +0100 Subject: [PATCH] [Actionmanager] remove invalid records from the input graph before groupGraphTableByIdAndMerge --- .../actionmanager/promote/PromoteActionPayloadFunctions.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index 56c8dd05a..c0192cddb 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -111,7 +111,9 @@ public class PromoteActionPayloadFunctions { SerializableSupplier> isNotZeroFn, Class rowClazz) { TypedColumn aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn(); + return rowDS + .filter((FilterFunction) o -> isNotZeroFn.get().apply(o)) .groupByKey((MapFunction) x -> rowIdFn.get().apply(x), Encoders.STRING()) .agg(aggregator) .map((MapFunction, G>) Tuple2::_2, Encoders.kryo(rowClazz));