From 886617afd0243075836815cb6cef59e9e2c096ae Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Jul 2021 18:15:35 +0200 Subject: [PATCH] One result linked to more than on project is saved just once --- .../graph/dump/funderresults/SparkResultLinkedToProject.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index 9c4c73d85..e0355d6d6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -97,6 +97,11 @@ public class SparkResultLinkedToProject implements Serializable { "on rel.target = p.id " + "") .as(Encoders.bean(inputClazz)) + .groupByKey( + (MapFunction< R, String>) value -> value + .getId(), + Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(inputClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")