WIP aggregator to make relationships unique

This commit is contained in:
Claudio Atzori 2020-07-10 19:35:10 +02:00
parent ecf119f37a
commit 770adc26e9
1 changed files with 4 additions and 6 deletions

View File

@ -96,14 +96,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET, FieldType.TARGET,
getDeletedFn()); getDeletedFn());
save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite);
} }
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels return rels
.groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) .groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING())
.agg(new RelationAggregator().toColumn()) .agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
class RelationAggregator extends Aggregator<Relation, Relation, Relation> { class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
@ -115,13 +115,11 @@ public class SparkPropagateRelation extends AbstractSparkAction {
@Override @Override
public Relation reduce(Relation b, Relation a) { public Relation reduce(Relation b, Relation a) {
b.mergeFrom(a);
return b; return b;
} }
@Override @Override
public Relation merge(Relation b, Relation a) { public Relation merge(Relation b, Relation a) {
b.mergeFrom(a);
return b; return b;
} }