diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 990e50abd..1023e2d19 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; @@ -19,6 +20,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.rdd.SequenceFileRDDFunctions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -122,7 +124,17 @@ public class SparkAtomicActionJob { return null; }, Encoders.bean(Project.class)) - .filter(p -> !(p == null)) + .filter(Objects::nonNull) + .groupByKey( + (MapFunction) p -> p.getId(), + Encoders.STRING()) + .mapGroups((MapGroupsFunction) (s, it) -> { + Project first = it.next(); + it.forEachRemaining(p -> { + first.mergeFrom(p); + }); + return first; + }, Encoders.bean(Project.class)) .toJavaRDD() .map(p -> new AtomicAction(Project.class, p)) .mapToPair(