diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 5168085116..e65eb7ab57 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,49 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite); + save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + } + + private Dataset distinctRelations(Dataset rels) { + return rels + .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .agg(new RelationAggregator().toColumn()) + .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); + } + + class RelationAggregator extends Aggregator { + + @Override + public Relation zero() { + return new Relation(); + } + + @Override + public Relation reduce(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation merge(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation finish(Relation r) { + return r; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(Relation.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(Relation.class); + } } private static Dataset processDataset(