Generate "merged" dedup id relations also for records that are filtered out by the cut parameters

This commit is contained in:
Giambattista Bloisi 2023-12-14 11:51:02 +01:00 committed by Giambattista Bloisi
parent 1287315ffb
commit 831cc1fdde
1 changed files with 3 additions and 2 deletions

View File

@ -213,7 +213,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
.join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.withColumn("pivot", functions.first("id").over(w)) .withColumn("pivot", functions.first("id").over(w))
.withColumn("position", functions.row_number().over(w)) .withColumn("position", functions.row_number().over(w))
.filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot
.flatMap( .flatMap(
(FlatMapFunction<Row, Tuple3<String, String, String>>) (Row r) -> { (FlatMapFunction<Row, Tuple3<String, String, String>>) (Row r) -> {
String id = r.getAs("id"); String id = r.getAs("id");
@ -249,7 +248,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
} }
// add merge relations // add merge relations
res.add(new Tuple3<>(id, pivotDedupId, pivot)); if (cut <=0 || r.<Integer>getAs("position") <= cut) {
res.add(new Tuple3<>(id, pivotDedupId, pivot));
}
return res.iterator(); return res.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())) }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))