package eu.dnetlib.dedup; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.util.LongAccumulator; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.BlockProcessor; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Serializable; import scala.Tuple2; public class Deduper implements Serializable { private static final Log log = LogFactory.getLog(Deduper.class); /** * @return the list of relations generated by the deduplication * @param: the spark context * @param: list of JSON entities to be deduped * @param: the dedup configuration */ public static JavaPairRDD dedup( JavaSparkContext context, JavaRDD entities, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); // create vertexes of the graph: JavaPairRDD mapDocs = mapToVertexes(context, entities, config); // create blocks for deduplication JavaPairRDD> blocks = createBlocks(context, mapDocs, config); // create relations by comparing only elements in the same group return computeRelations(context, blocks, config); // final RDD> edgeRdd = relationRDD.map(it -> new // Edge<>(it._1().hashCode(), // it._2().hashCode(), "equalTo")).rdd(); // // RDD> vertexes = // mapDocs.mapToPair((PairFunction, Object, MapDocument>) t -> // new // Tuple2((long) t._1().hashCode(), t._2())).rdd(); // accumulators.forEach((name, acc) -> log.info(name + " -> " + acc.value())); // // return GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD(); } /** * @return the list of relations generated by the deduplication * @param: the spark context * @param: list of blocks * @param: the dedup configuration */ public static JavaPairRDD computeRelations( JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); return blocks .flatMapToPair( (PairFlatMapFunction>, String, String>) it -> { final SparkReporter reporter = new SparkReporter(accumulators); new BlockProcessor(config).process(it._1(), it._2(), reporter); return reporter.getRelations().iterator(); }) .mapToPair( (PairFunction, String, Tuple2>) item -> new Tuple2>( item._1() + item._2(), item)) .reduceByKey((a, b) -> a) .mapToPair( (PairFunction>, String, String>) Tuple2::_2); } /** * @return the list of blocks based on clustering of dedup configuration * @param: the spark context * @param: list of entities: * @param: the dedup configuration */ public static JavaPairRDD> createBlocks( JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { return mapDocs // the reduce is just to be sure that we haven't document with same id .reduceByKey((a, b) -> a) .map(Tuple2::_2) // Clustering: from to List .flatMapToPair( (PairFlatMapFunction) a -> DedupUtility .getGroupingKeys(config, a) .stream() .map(it -> new Tuple2<>(it, a)) .collect(Collectors.toList()) .iterator()) .groupByKey(); } public static JavaPairRDD> createsortedBlocks( JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); final int maxQueueSize = config.getWf().getGroupMaxSize(); return mapDocs // the reduce is just to be sure that we haven't document with same id .reduceByKey((a, b) -> a) .map(Tuple2::_2) // Clustering: from to List .flatMapToPair( (PairFlatMapFunction>) a -> DedupUtility .getGroupingKeys(config, a) .stream() .map( it -> { List tmp = new ArrayList<>(); tmp.add(a); return new Tuple2<>(it, tmp); }) .collect(Collectors.toList()) .iterator()) .reduceByKey( (Function2, List, List>) (v1, v2) -> { v1.addAll(v2); v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue())); if (v1.size() > maxQueueSize) return new ArrayList<>(v1.subList(0, maxQueueSize)); return v1; }); } /** * @return the list of vertexes: * @param: the spark context * @param: list of JSON entities * @param: the dedup configuration */ public static JavaPairRDD mapToVertexes( JavaSparkContext context, JavaRDD entities, DedupConfig config) { return entities .mapToPair( (PairFunction) s -> { MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(config, s); return new Tuple2(mapDocument.getIdentifier(), mapDocument); }); } public static JavaPairRDD computeRelations2( JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); return blocks .flatMapToPair( (PairFlatMapFunction>, String, String>) it -> { try { final SparkReporter reporter = new SparkReporter(accumulators); new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); return reporter.getRelations().iterator(); } catch (Exception e) { throw new RuntimeException(it._2().get(0).getIdentifier(), e); } }) .mapToPair( (PairFunction, String, Tuple2>) item -> new Tuple2>( item._1() + item._2(), item)) .reduceByKey((a, b) -> a) .mapToPair( (PairFunction>, String, String>) Tuple2::_2); } }