make the relations produced by the dedup SparkPropagateRelation jon unique
This commit is contained in:
parent
3c728aaa0c
commit
752d28f8eb
|
@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -95,7 +96,49 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
|||
FieldType.TARGET,
|
||||
getDeletedFn());
|
||||
|
||||
save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite);
|
||||
save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite);
|
||||
}
|
||||
|
||||
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
|
||||
return rels
|
||||
.groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING())
|
||||
.agg(new RelationAggregator().toColumn())
|
||||
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
|
||||
}
|
||||
|
||||
class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
|
||||
|
||||
@Override
|
||||
public Relation zero() {
|
||||
return new Relation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Relation reduce(Relation b, Relation a) {
|
||||
b.mergeFrom(a);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Relation merge(Relation b, Relation a) {
|
||||
b.mergeFrom(a);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Relation finish(Relation r) {
|
||||
return r;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<Relation> bufferEncoder() {
|
||||
return Encoders.bean(Relation.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<Relation> outputEncoder() {
|
||||
return Encoders.bean(Relation.class);
|
||||
}
|
||||
}
|
||||
|
||||
private static Dataset<Relation> processDataset(
|
||||
|
|
Loading…
Reference in New Issue