diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index e65eb7ab5..13d2e4cd7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -96,14 +96,14 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { return rels - .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) - .agg(new RelationAggregator().toColumn()) - .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); + .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .agg(new RelationAggregator().toColumn()) + .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } class RelationAggregator extends Aggregator { @@ -115,13 +115,11 @@ public class SparkPropagateRelation extends AbstractSparkAction { @Override public Relation reduce(Relation b, Relation a) { - b.mergeFrom(a); return b; } @Override public Relation merge(Relation b, Relation a) { - b.mergeFrom(a); return b; }