From 4e6f46e8fa1f8cc419487d5f9423757129cfb3d7 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 15 Jul 2020 11:22:20 +0200 Subject: [PATCH] filter blocks with one record only --- .../java/eu/dnetlib/dhp/oa/dedup/Deduper.java | 1 + .../dnetlib/dhp/oa/dedup/SparkBlockStats.java | 20 ++++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index 180f9f8460..5e8a50fccb 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -51,6 +51,7 @@ public class Deduper implements Serializable { .map(it -> Block.from(it, a)) .collect(Collectors.toList()) .iterator()) + .filter(b -> b.getDocuments().size() > 1) .mapToPair(block -> new Tuple2<>(block.getKey(), block)) .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize)); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java index d5de309670..8016361dbb 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java @@ -9,6 +9,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -100,16 +101,9 @@ public class SparkBlockStats extends AbstractSparkAction { }); // create blocks for deduplication - JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); - - JavaRDD blockStats = blocks + JavaRDD blockStats = Deduper.createSortedBlocks(mapDocuments, dedupConf) .repartition(numPartitions) - .map( - b -> new BlockStats( - b._1(), - (long) b._2().getDocuments().size(), - computeComparisons( - (long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()))); + .map(b -> asBlockStats(dedupConf, b)); // save the blockstats in the workingdir spark @@ -120,4 +114,12 @@ public class SparkBlockStats extends AbstractSparkAction { } } + private BlockStats asBlockStats(DedupConfig dedupConf, Tuple2 b) { + return new BlockStats( + b._1(), + (long) b._2().getDocuments().size(), + computeComparisons( + (long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())); + } + }