diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 46c29494e..191870d3b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -213,7 +213,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .withColumn("pivot", functions.first("id").over(w)) .withColumn("position", functions.row_number().over(w)) - .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot .flatMap( (FlatMapFunction>) (Row r) -> { String id = r.getAs("id"); @@ -249,7 +248,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } // add merge relations - res.add(new Tuple3<>(id, pivotDedupId, pivot)); + if (cut <=0 || r.getAs("position") <= cut) { + res.add(new Tuple3<>(id, pivotDedupId, pivot)); + } return res.iterator(); }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))