code formatting

This commit is contained in:
Claudio Atzori 2021-04-16 17:28:25 +02:00
parent 7ad573d023
commit 45057440c1
1 changed files with 12 additions and 9 deletions

View File

@ -117,22 +117,25 @@ public class SparkPropagateRelation extends AbstractSparkAction {
private static Dataset<Relation> createNewRels( private static Dataset<Relation> createNewRels(
Dataset<Relation> rels, Dataset<Relation> rels,
Dataset<Tuple2<String,String>> mergedIds, Dataset<Tuple2<String, String>> mergedIds,
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) { MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
Dataset<Tuple3<String, Relation, String>> mapped = rels Dataset<Tuple3<String, Relation, String>> mapped = rels
.map( .map(
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)), (MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); r, getId(r, FieldType.TARGET)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
return relSource return relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer")
.filter((FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r._2() != null || r._1() != null) .filter(
.map(mapRel, Encoders.bean(Relation.class)) (FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r
.distinct(); ._2() != null || r._1() != null)
.map(mapRel, Encoders.bean(Relation.class))
.distinct();
} }
private static Dataset<Relation> processDataset( private static Dataset<Relation> processDataset(