1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2021-04-27 10:09:31 +02:00
parent faa8f6f4e2
commit ef4bfd82e2
1 changed files with 11 additions and 10 deletions

View File

@ -73,7 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(Encoders.bean(Relation.class));
//<mergedObjectID, dedupID>
// <mergedObjectID, dedupID>
Dataset<Tuple2<String, String>> mergedIds = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source"), col("target"))
@ -116,31 +116,32 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
}
//redirect the relations to the dedupID
// redirect the relations to the dedupID
private static Dataset<Relation> createNewRels(
Dataset<Relation> rels, //all the relations to be redirected
Dataset<Tuple2<String, String>> mergedIds, //merge rels: <mergedObjectID, dedupID>
Dataset<Relation> rels, // all the relations to be redirected
Dataset<Tuple2<String, String>> mergedIds, // merge rels: <mergedObjectID, dedupID>
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
//<sourceID, relation, targetID>
// <sourceID, relation, targetID>
Dataset<Tuple3<String, Relation, String>> mapped = rels
.map(
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
r, getId(r, FieldType.TARGET)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
//< <sourceID, relation, target>, <sourceID, dedupID> >
// < <sourceID, relation, target>, <sourceID, dedupID> >
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
//< <<sourceID, relation, targetID>, <sourceID, dedupID>>, <targetID, dedupID> >
// < <<sourceID, relation, targetID>, <sourceID, dedupID>>, <targetID, dedupID> >
Dataset<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>> relSourceTarget = relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer");
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer");
return relSourceTarget
.filter(
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>)
r -> r._1()._1() != null || r._2() != null)
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r
._1()
._1() != null || r._2() != null)
.map(mapRel, Encoders.bean(Relation.class))
.distinct();
}