1
0
Fork 0

Patch FCT relations - fixed issue with join

This commit is contained in:
Miriam Baglioni 2021-07-30 10:34:05 +02:00
parent 2fc89fc9b5
commit 9bc4fd3b69
1 changed files with 4 additions and 3 deletions

View File

@ -82,7 +82,7 @@ public class PatchRelationsApplication {
log.info("relations: {}", rels.count()); log.info("relations: {}", rels.count());
log.info("idMapping: {}", idMapping.count()); log.info("idMapping: {}", idMapping.count());
rels Dataset<Relation> fj = rels
.joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left") .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left")
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> { .map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1(); final Relation r = t._1();
@ -90,8 +90,9 @@ public class PatchRelationsApplication {
.map(RelationIdMapping::getNewId) .map(RelationIdMapping::getNewId)
.ifPresent(r::setSource); .ifPresent(r::setSource);
return r; return r;
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class));
.joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left")
fj.joinWith(idMapping, fj.col("target").equalTo(idMapping.col("oldId")), "left")
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> { .map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1(); final Relation r = t._1();
Optional.ofNullable(t._2()) Optional.ofNullable(t._2())