dedup relation aggregator moved into dedicated class

This commit is contained in:
Claudio Atzori 2020-07-13 10:11:36 +02:00
parent 770adc26e9
commit 7a3fd9f54c
2 changed files with 47 additions and 35 deletions

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return Objects.equals(a, ZERO) ? b : a;
}
@Override
public Relation merge(Relation b, Relation a) {
b.mergeFrom(a);
return b;
}
@Override
public Relation finish(Relation r) {
return r;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.bean(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.bean(Relation.class);
}
}

View File

@ -7,7 +7,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -96,7 +95,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET, FieldType.TARGET,
getDeletedFn()); getDeletedFn());
save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite); save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite);
} }
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
@ -106,39 +105,6 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
@Override
public Relation zero() {
return new Relation();
}
@Override
public Relation reduce(Relation b, Relation a) {
return b;
}
@Override
public Relation merge(Relation b, Relation a) {
return b;
}
@Override
public Relation finish(Relation r) {
return r;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.bean(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.bean(Relation.class);
}
}
private static Dataset<Relation> processDataset( private static Dataset<Relation> processDataset(
Dataset<Relation> rels, Dataset<Relation> rels,
Dataset<Tuple2<String, String>> mergedIds, Dataset<Tuple2<String, String>> mergedIds,