WIP SparkCreateMergeRels distinct relations

This commit is contained in:
Claudio Atzori 2020-07-13 16:13:36 +02:00
parent 8c67938ad0
commit 1143f426aa
2 changed files with 16 additions and 4 deletions

View File

@ -20,13 +20,12 @@ public class RelationAggregator extends Aggregator<Relation, Relation, Relation>
@Override @Override
public Relation reduce(Relation b, Relation a) { public Relation reduce(Relation b, Relation a) {
return Objects.equals(a, ZERO) ? b : a; return mergeRel(b, a);
} }
@Override @Override
public Relation merge(Relation b, Relation a) { public Relation merge(Relation b, Relation a) {
b.mergeFrom(a); return mergeRel(b, a);
return b;
} }
@Override @Override
@ -34,6 +33,18 @@ public class RelationAggregator extends Aggregator<Relation, Relation, Relation>
return r; return r;
} }
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override @Override
public Encoder<Relation> bufferEncoder() { public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class); return Encoders.kryo(Relation.class);

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import com.google.common.base.Joiner;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -108,7 +109,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels return rels
.filter(getRelationFilterFunction()) .filter(getRelationFilterFunction())
.groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) .groupByKey((MapFunction<Relation, String>) r -> String.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING())
.agg(new RelationAggregator().toColumn()) .agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }