diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index 9c4c73d85..e0355d6d6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -97,6 +97,11 @@ public class SparkResultLinkedToProject implements Serializable { "on rel.target = p.id " + "") .as(Encoders.bean(inputClazz)) + .groupByKey( + (MapFunction< R, String>) value -> value + .getId(), + Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(inputClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")