From 752d28f8eb955128eb59c5f71e3f0b652f003319 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 15:09:50 +0200 Subject: [PATCH] make the relations produced by the dedup SparkPropagateRelation jon unique --- .../dhp/oa/dedup/SparkPropagateRelation.java | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 516808511..e65eb7ab5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,49 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite); + save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + } + + private Dataset distinctRelations(Dataset rels) { + return rels + .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .agg(new RelationAggregator().toColumn()) + .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); + } + + class RelationAggregator extends Aggregator { + + @Override + public Relation zero() { + return new Relation(); + } + + @Override + public Relation reduce(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation merge(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation finish(Relation r) { + return r; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(Relation.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(Relation.class); + } } private static Dataset processDataset(