From 831cc1fddececffc80701931ad9dab4d9926192b Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 14 Dec 2023 11:51:02 +0100 Subject: [PATCH] Generate "merged" dedup id relations also for records that are filtered out by the cut parameters --- .../java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 46c29494e..191870d3b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -213,7 +213,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .withColumn("pivot", functions.first("id").over(w)) .withColumn("position", functions.row_number().over(w)) - .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot .flatMap( (FlatMapFunction>) (Row r) -> { String id = r.getAs("id"); @@ -249,7 +248,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } // add merge relations - res.add(new Tuple3<>(id, pivotDedupId, pivot)); + if (cut <=0 || r.getAs("position") <= cut) { + res.add(new Tuple3<>(id, pivotDedupId, pivot)); + } return res.iterator(); }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))