forked from D-Net/dnet-hadoop
Deduper class (utilities for dedup) cleaned. Useless methods removed
This commit is contained in:
parent
6a089ec287
commit
0eccbc318b
|
@ -4,8 +4,6 @@ import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.model.MapDocument;
|
import eu.dnetlib.pace.model.MapDocument;
|
||||||
import eu.dnetlib.pace.util.BlockProcessor;
|
import eu.dnetlib.pace.util.BlockProcessor;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
@ -13,59 +11,32 @@ import org.apache.spark.api.java.function.Function2;
|
||||||
import org.apache.spark.api.java.function.PairFlatMapFunction;
|
import org.apache.spark.api.java.function.PairFlatMapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import scala.Serializable;
|
import scala.Serializable;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class Deduper implements Serializable {
|
public class Deduper implements Serializable {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(Deduper.class);
|
private static final Logger log = LoggerFactory.getLogger(Deduper.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the list of relations generated by the deduplication
|
|
||||||
* @param: the spark context
|
|
||||||
* @param: list of JSON entities to be deduped
|
|
||||||
* @param: the dedup configuration
|
|
||||||
*/
|
|
||||||
public static JavaPairRDD<String, String> dedup(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
|
|
||||||
|
|
||||||
|
public static JavaPairRDD<String, String> computeRelations(JavaSparkContext context, JavaPairRDD<String, List<MapDocument>> blocks, DedupConfig config) {
|
||||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
||||||
|
|
||||||
//create vertexes of the graph: <ID, MapDocument>
|
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, List<MapDocument>>, String, String>) it -> {
|
||||||
JavaPairRDD<String, MapDocument> mapDocs = mapToVertexes(context, entities, config);
|
try {
|
||||||
|
|
||||||
|
|
||||||
//create blocks for deduplication
|
|
||||||
JavaPairRDD<String, Iterable<MapDocument>> blocks = createBlocks(context, mapDocs, config);
|
|
||||||
|
|
||||||
//create relations by comparing only elements in the same group
|
|
||||||
return computeRelations(context, blocks, config);
|
|
||||||
|
|
||||||
// final RDD<Edge<String>> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "equalTo")).rdd();
|
|
||||||
//
|
|
||||||
// RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair((PairFunction<Tuple2<String, MapDocument>, Object, MapDocument>) t -> new Tuple2<Object, MapDocument>((long) t._1().hashCode(), t._2())).rdd();
|
|
||||||
// accumulators.forEach((name, acc) -> log.info(name + " -> " + acc.value()));
|
|
||||||
//
|
|
||||||
// return GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the list of relations generated by the deduplication
|
|
||||||
* @param: the spark context
|
|
||||||
* @param: list of blocks
|
|
||||||
* @param: the dedup configuration
|
|
||||||
*/
|
|
||||||
public static JavaPairRDD<String, String> computeRelations(JavaSparkContext context, JavaPairRDD<String, Iterable<MapDocument>> blocks, DedupConfig config) {
|
|
||||||
|
|
||||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
|
||||||
|
|
||||||
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, Iterable<MapDocument>>, String, String>) it -> {
|
|
||||||
final SparkReporter reporter = new SparkReporter(accumulators);
|
final SparkReporter reporter = new SparkReporter(accumulators);
|
||||||
new BlockProcessor(config).process(it._1(), it._2(), reporter);
|
new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter);
|
||||||
return reporter.getRelations().iterator();
|
return reporter.getRelations().iterator();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(it._2().get(0).getIdentifier(), e);
|
||||||
|
}
|
||||||
}).mapToPair(
|
}).mapToPair(
|
||||||
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
|
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
|
||||||
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))
|
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))
|
||||||
|
@ -73,30 +44,7 @@ public class Deduper implements Serializable {
|
||||||
.mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
.mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static JavaPairRDD<String, List<MapDocument>> createSortedBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
||||||
/**
|
|
||||||
* @return the list of blocks based on clustering of dedup configuration
|
|
||||||
* @param: the spark context
|
|
||||||
* @param: list of entities: <id, entity>
|
|
||||||
* @param: the dedup configuration
|
|
||||||
*/
|
|
||||||
public static JavaPairRDD<String, Iterable<MapDocument>> createBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
|
||||||
return mapDocs
|
|
||||||
//the reduce is just to be sure that we haven't document with same id
|
|
||||||
.reduceByKey((a, b) -> a)
|
|
||||||
.map(Tuple2::_2)
|
|
||||||
//Clustering: from <id, doc> to List<groupkey,doc>
|
|
||||||
.flatMapToPair((PairFlatMapFunction<MapDocument, String, MapDocument>) a ->
|
|
||||||
DedupUtility.getGroupingKeys(config, a)
|
|
||||||
.stream()
|
|
||||||
.map(it -> new Tuple2<>(it, a))
|
|
||||||
.collect(Collectors.toList())
|
|
||||||
.iterator())
|
|
||||||
.groupByKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static JavaPairRDD<String, List<MapDocument>> createsortedBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
|
||||||
final String of = config.getWf().getOrderField();
|
final String of = config.getWf().getOrderField();
|
||||||
final int maxQueueSize = config.getWf().getGroupMaxSize();
|
final int maxQueueSize = config.getWf().getGroupMaxSize();
|
||||||
return mapDocs
|
return mapDocs
|
||||||
|
@ -123,39 +71,4 @@ public class Deduper implements Serializable {
|
||||||
return v1;
|
return v1;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the list of vertexes: <id, mapDocument>
|
|
||||||
* @param: the spark context
|
|
||||||
* @param: list of JSON entities
|
|
||||||
* @param: the dedup configuration
|
|
||||||
*/
|
|
||||||
public static JavaPairRDD<String, MapDocument> mapToVertexes(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
|
|
||||||
|
|
||||||
return entities.mapToPair((PairFunction<String, String, MapDocument>) s -> {
|
|
||||||
|
|
||||||
MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(config, s);
|
|
||||||
return new Tuple2<String, MapDocument>(mapDocument.getIdentifier(), mapDocument);
|
|
||||||
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static JavaPairRDD<String, String> computeRelations2(JavaSparkContext context, JavaPairRDD<String, List<MapDocument>> blocks, DedupConfig config) {
|
|
||||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
|
||||||
|
|
||||||
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, List<MapDocument>>, String, String>) it -> {
|
|
||||||
try {
|
|
||||||
final SparkReporter reporter = new SparkReporter(accumulators);
|
|
||||||
new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter);
|
|
||||||
return reporter.getRelations().iterator();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(it._2().get(0).getIdentifier(), e);
|
|
||||||
}
|
|
||||||
}).mapToPair(
|
|
||||||
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
|
|
||||||
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))
|
|
||||||
.reduceByKey((a, b) -> a)
|
|
||||||
.mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -8,21 +8,20 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
public class SparkCreateDedupRecord extends AbstractSparkAction {
|
public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(SparkCreateDedupRecord.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||||
|
|
||||||
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
|
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,8 +11,6 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
@ -24,6 +22,8 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -32,9 +32,9 @@ import java.util.List;
|
||||||
|
|
||||||
public class SparkCreateMergeRels extends AbstractSparkAction {
|
public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(SparkCreateMergeRels.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
|
||||||
|
|
||||||
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
|
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,33 +12,26 @@ import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.model.MapDocument;
|
import eu.dnetlib.pace.model.MapDocument;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.dom4j.Document;
|
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import org.dom4j.Element;
|
import org.slf4j.Logger;
|
||||||
import org.dom4j.io.SAXReader;
|
import org.slf4j.LoggerFactory;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
|
||||||
import java.io.StringReader;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class SparkCreateSimRels extends AbstractSparkAction {
|
public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(SparkCreateSimRels.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
|
||||||
|
|
||||||
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
|
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,10 +74,10 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
});
|
});
|
||||||
|
|
||||||
//create blocks for deduplication
|
//create blocks for deduplication
|
||||||
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
|
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createSortedBlocks(sc, mapDocument, dedupConf);
|
||||||
|
|
||||||
//create relations by comparing only elements in the same group
|
//create relations by comparing only elements in the same group
|
||||||
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
|
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations(sc, blocks, dedupConf);
|
||||||
|
|
||||||
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
|
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -29,11 +31,11 @@ import java.io.Serializable;
|
||||||
|
|
||||||
public class SparkUpdateEntity extends AbstractSparkAction {
|
public class SparkUpdateEntity extends AbstractSparkAction {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
|
||||||
|
|
||||||
final String IDJSONPATH = "$.id";
|
final String IDJSONPATH = "$.id";
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(SparkUpdateEntity.class);
|
public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
|
|
||||||
public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
|
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue