1
0
Fork 0

filter blocks with one record only

This commit is contained in:
Claudio Atzori 2020-07-15 11:22:20 +02:00
parent 7d6e269b40
commit 4e6f46e8fa
2 changed files with 12 additions and 9 deletions

View File

@ -51,6 +51,7 @@ public class Deduper implements Serializable {
.map(it -> Block.from(it, a)) .map(it -> Block.from(it, a))
.collect(Collectors.toList()) .collect(Collectors.toList())
.iterator()) .iterator())
.filter(b -> b.getDocuments().size() > 1)
.mapToPair(block -> new Tuple2<>(block.getKey(), block)) .mapToPair(block -> new Tuple2<>(block.getKey(), block))
.reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize)); .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize));
} }

View File

@ -9,6 +9,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -100,16 +101,9 @@ public class SparkBlockStats extends AbstractSparkAction {
}); });
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); JavaRDD<BlockStats> blockStats = Deduper.createSortedBlocks(mapDocuments, dedupConf)
JavaRDD<BlockStats> blockStats = blocks
.repartition(numPartitions) .repartition(numPartitions)
.map( .map(b -> asBlockStats(dedupConf, b));
b -> new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())));
// save the blockstats in the workingdir // save the blockstats in the workingdir
spark spark
@ -120,4 +114,12 @@ public class SparkBlockStats extends AbstractSparkAction {
} }
} }
private BlockStats asBlockStats(DedupConfig dedupConf, Tuple2<String, Block> b) {
return new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()));
}
} }