1
0
Fork 0

replaced full join + filtering with a left join

This commit is contained in:
Claudio Atzori 2021-07-29 11:36:20 +02:00
parent d267dce520
commit 1923c1ce21
1 changed files with 2 additions and 4 deletions

View File

@ -80,8 +80,7 @@ public class PatchRelationsApplication {
final Dataset<RelationIdMapping> idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class); final Dataset<RelationIdMapping> idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class);
rels rels
.joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "full") .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left")
.filter((FilterFunction<Tuple2<Relation, RelationIdMapping>>) t -> Objects.nonNull(t._1()))
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> { .map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1(); final Relation r = t._1();
Optional.ofNullable(t._2()) Optional.ofNullable(t._2())
@ -89,8 +88,7 @@ public class PatchRelationsApplication {
.ifPresent(r::setSource); .ifPresent(r::setSource);
return r; return r;
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class))
.joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "full") .joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left")
.filter((FilterFunction<Tuple2<Relation, RelationIdMapping>>) t -> Objects.nonNull(t._1()))
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> { .map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1(); final Relation r = t._1();
Optional.ofNullable(t._2()) Optional.ofNullable(t._2())