diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java new file mode 100644 index 0000000000..0a29aa51b2 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java @@ -0,0 +1,46 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.util.Objects; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationAggregator extends Aggregator { + + private static Relation ZERO = new Relation(); + + @Override + public Relation zero() { + return ZERO; + } + + @Override + public Relation reduce(Relation b, Relation a) { + return Objects.equals(a, ZERO) ? b : a; + } + + @Override + public Relation merge(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation finish(Relation r) { + return r; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(Relation.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(Relation.class); + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 13d2e4cd77..c197697494 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -7,7 +7,6 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +95,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite); + save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { @@ -106,39 +105,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } - class RelationAggregator extends Aggregator { - - @Override - public Relation zero() { - return new Relation(); - } - - @Override - public Relation reduce(Relation b, Relation a) { - return b; - } - - @Override - public Relation merge(Relation b, Relation a) { - return b; - } - - @Override - public Relation finish(Relation r) { - return r; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.bean(Relation.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.bean(Relation.class); - } - } - private static Dataset processDataset( Dataset rels, Dataset> mergedIds,