2020-04-27 14:52:31 +02:00
|
|
|
|
2020-03-27 10:42:17 +01:00
|
|
|
package eu.dnetlib.dhp.oa.dedup;
|
2019-12-06 13:38:00 +01:00
|
|
|
|
2020-04-18 12:42:58 +02:00
|
|
|
import java.util.Map;
|
|
|
|
import java.util.stream.Collectors;
|
2020-04-28 11:23:29 +02:00
|
|
|
|
2019-12-06 13:38:00 +01:00
|
|
|
import org.apache.spark.api.java.JavaPairRDD;
|
|
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
|
|
|
import org.apache.spark.util.LongAccumulator;
|
2020-04-28 11:23:29 +02:00
|
|
|
|
|
|
|
import eu.dnetlib.dhp.oa.dedup.model.Block;
|
|
|
|
import eu.dnetlib.pace.config.DedupConfig;
|
|
|
|
import eu.dnetlib.pace.model.MapDocument;
|
|
|
|
import eu.dnetlib.pace.util.BlockProcessor;
|
2019-12-06 13:38:00 +01:00
|
|
|
import scala.Serializable;
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
public class Deduper implements Serializable {
|
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
public static JavaPairRDD<String, String> computeRelations(
|
|
|
|
JavaSparkContext context, JavaPairRDD<String, Block> blocks, DedupConfig config) {
|
|
|
|
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
2020-04-18 12:42:58 +02:00
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
return blocks
|
|
|
|
.flatMapToPair(
|
|
|
|
it -> {
|
|
|
|
final SparkReporter reporter = new SparkReporter(accumulators);
|
|
|
|
new BlockProcessor(config)
|
|
|
|
.processSortedBlock(it._1(), it._2().getDocuments(), reporter);
|
|
|
|
return reporter.getRelations().iterator();
|
|
|
|
})
|
|
|
|
.mapToPair(it -> new Tuple2<>(it._1() + it._2(), it))
|
|
|
|
.reduceByKey((a, b) -> a)
|
|
|
|
.mapToPair(Tuple2::_2);
|
|
|
|
}
|
2019-12-06 13:38:00 +01:00
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
public static JavaPairRDD<String, Block> createSortedBlocks(
|
|
|
|
JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
|
|
|
final String of = config.getWf().getOrderField();
|
2020-07-02 12:43:51 +02:00
|
|
|
final int maxQueueSize = config.getWf().getQueueMaxSize();
|
2020-04-18 12:06:23 +02:00
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
return mapDocs
|
|
|
|
// the reduce is just to be sure that we haven't document with same id
|
|
|
|
.reduceByKey((a, b) -> a)
|
|
|
|
.map(Tuple2::_2)
|
|
|
|
// Clustering: from <id, doc> to List<groupkey,doc>
|
|
|
|
.flatMap(
|
|
|
|
a -> DedupUtility
|
|
|
|
.getGroupingKeys(config, a)
|
|
|
|
.stream()
|
|
|
|
.map(it -> Block.from(it, a))
|
|
|
|
.collect(Collectors.toList())
|
|
|
|
.iterator())
|
|
|
|
.mapToPair(block -> new Tuple2<>(block.getKey(), block))
|
2020-07-16 10:11:32 +02:00
|
|
|
.reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize))
|
|
|
|
.filter(b -> b._2().getDocuments().size() > 1);
|
2020-04-27 14:52:31 +02:00
|
|
|
}
|
2020-04-18 12:42:58 +02:00
|
|
|
}
|