From 1143f426aaf2eadfa8b55dcb62fabf8b52bcb503 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 16:13:36 +0200 Subject: [PATCH] WIP SparkCreateMergeRels distinct relations --- .../dhp/oa/dedup/RelationAggregator.java | 17 ++++++++++++++--- .../dhp/oa/dedup/SparkPropagateRelation.java | 3 ++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java index 7935fe1ca..6fb7b844b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java @@ -20,13 +20,12 @@ public class RelationAggregator extends Aggregator @Override public Relation reduce(Relation b, Relation a) { - return Objects.equals(a, ZERO) ? b : a; + return mergeRel(b, a); } @Override public Relation merge(Relation b, Relation a) { - b.mergeFrom(a); - return b; + return mergeRel(b, a); } @Override @@ -34,6 +33,18 @@ public class RelationAggregator extends Aggregator return r; } + private Relation mergeRel(Relation b, Relation a) { + if (Objects.equals(b, ZERO)) { + return a; + } + if (Objects.equals(a, ZERO)) { + return b; + } + + b.mergeFrom(a); + return b; + } + @Override public Encoder bufferEncoder() { return Encoders.kryo(Relation.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index baba3bc87..1073adbea 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; +import com.google.common.base.Joiner; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -108,7 +109,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { private Dataset distinctRelations(Dataset rels) { return rels .filter(getRelationFilterFunction()) - .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .groupByKey((MapFunction) r -> String.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) .agg(new RelationAggregator().toColumn()) .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); }