Added FSpark Implementation of dedup

This commit is contained in:
Sandro La Bruzzo 2018-10-11 15:19:20 +02:00
parent d0edb7b773
commit 67e5f9858b
14 changed files with 285 additions and 77 deletions

View File

@ -58,6 +58,12 @@
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>-->

View File

@ -12,38 +12,46 @@ import eu.dnetlib.pace.model.MapDocumentComparator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
import java.util.stream.Stream;
public class BlockProcessor {
public static final List<String> accumulators= new ArrayList<>();
private static final Log log = LogFactory.getLog(BlockProcessor.class);
private DedupConfig dedupConf;
public static void constructAccumulator( final DedupConfig dedupConf) {
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1"));
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()));
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize())));
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list"));
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"));
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
}
public BlockProcessor(DedupConfig dedupConf) {
this.dedupConf = dedupConf;
}
public List<Tuple2<String, String>> process(final String key, final Stream<MapDocument> documents, final Reporter context) throws IOException, InterruptedException {
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context) {
final Queue<MapDocument> q = prepare(documents);
if (q.size() > 1) {
log.info("reducing key: '" + key + "' records: " + q.size());
//process(q, context);
return process(simplifyQueue(q, key, context), context);
process(simplifyQueue(q, key, context), context);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
return new ArrayList<>();
}
}
private Queue<MapDocument> prepare(final Stream<MapDocument> documents) {
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
private Queue<MapDocument> prepare(final Iterable<MapDocument> documents) {
final Queue<MapDocument> queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
final Set<String> seen = new HashSet<String>();
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
@ -63,7 +71,7 @@ public class BlockProcessor {
}
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Reporter context) {
final Queue<MapDocument> q = new LinkedList<MapDocument>();
final Queue<MapDocument> q = new LinkedList<>();
String fieldRef = "";
final List<MapDocument> tempResults = Lists.newArrayList();
@ -106,10 +114,9 @@ public class BlockProcessor {
}
}
private List<Tuple2<String, String>> process(final Queue<MapDocument> queue, final Reporter context) throws IOException, InterruptedException {
private void process(final Queue<MapDocument> queue, final Reporter context) {
final PaceDocumentDistance algo = new PaceDocumentDistance();
List<Tuple2<String, String>> resultEmit = new ArrayList<>();
while (!queue.isEmpty()) {
@ -144,21 +151,20 @@ public class BlockProcessor {
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
final ScoreResult sr = similarity(algo, pivot, curr);
emitOutput(sr, idPivot, idCurr,context, resultEmit);
emitOutput(sr, idPivot, idCurr, context);
i++;
}
}
}
}
return resultEmit;
}
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context,List<Tuple2<String, String>> emitResult) throws IOException, InterruptedException {
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context) {
final double d = sr.getScore();
if (d >= dedupConf.getWf().getThreshold()) {
writeSimilarity(idPivot, idCurr, emitResult);
writeSimilarity(context, idPivot, idCurr);
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
@ -182,9 +188,11 @@ public class BlockProcessor {
return StringUtils.substringBetween(id, "|", "::");
}
private void writeSimilarity( final String from, final String to, List<Tuple2<String, String>> emitResult){
emitResult.add(new Tuple2<>(from, to));
emitResult.add(new Tuple2<>( to, from));
private void writeSimilarity(final Reporter context, final String from, final String to) {
final String type = dedupConf.getWf().getEntityType();
context.emit(type, from, to);
context.emit(type, to, from);
}
}

View File

@ -0,0 +1,67 @@
package eu.dnetlib;
import org.apache.spark.util.AccumulatorV2;
public class DnetAccumulator extends AccumulatorV2<Long, Long> {
private Long counter= 0L;
private String group;
private String name;
public DnetAccumulator(final String group, final String name){
this.group = group;
this.name = name;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean isZero() {
return counter == 0;
}
@Override
public AccumulatorV2<Long, Long> copy() {
final DnetAccumulator acc = new DnetAccumulator(group, name);
acc.add(counter);
return acc;
}
@Override
public void reset() {
counter = 0L;
}
@Override
public void add(Long aLong) {
counter += aLong;
}
@Override
public void merge(AccumulatorV2<Long, Long> accumulatorV2) {
add(accumulatorV2.value());
}
@Override
public Long value() {
return counter;
}
}

View File

@ -1,13 +1,11 @@
package eu.dnetlib;
import java.io.IOException;
import java.io.Serializable;
public interface Reporter extends Serializable {
void incrementCounter(String counterGroup, String counterName, long delta);
void emit(final String type, final String from, final String to) throws IOException, InterruptedException;
void emit(String type, String from, String to);
}

View File

@ -1,63 +1,99 @@
package eu.dnetlib;
import com.google.common.collect.Sets;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.utils.PaceUtils;
import eu.dnetlib.reporter.SparkCounter;
import eu.dnetlib.reporter.SparkReporter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class SparkTest {
class Results extends HashMap<String, Set<String>> {
public Results(Set<String> keys) {
super(keys.size());
keys.forEach(k -> put(k, new HashSet<>()));
}
}
public static SparkCounter counter ;
private static final Log log = LogFactory.getLog(SparkTest.class);
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]"));
final JavaRDD<String> dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations.json");
final JavaRDD<String> dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations_complete.json");
final Counters c = new Counters();
counter = new SparkCounter(context);
long count = dataRDD.mapToPair(it -> {
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
BlockProcessor.constructAccumulator(config);
BlockProcessor.accumulators.forEach(acc -> {
final String[] values = acc.split("::");
counter.incrementCounter(values[0], values[1], 0);
});
JavaPairRDD<String, MapDocument> mapDocs = dataRDD.mapToPair(it -> {
MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
return new Tuple2<>(mapDocument.getIdentifier(), mapDocument);
}).reduceByKey((a, b) -> a).flatMapToPair(a -> {
final MapDocument currentDocument = a._2();
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
return getGroupingKeys(config, currentDocument).stream()
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey().flatMapToPair(it -> {
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
return process(config, it, c).iterator();
}).count();
});
final JavaPairRDD<String, String> relationRDD = mapDocs.reduceByKey((a, b) -> a)
.flatMapToPair(a -> {
final MapDocument currentDocument = a._2();
return getGroupingKeys(config, currentDocument).stream()
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey().flatMapToPair(it -> {
final SparkReporter reporter = new SparkReporter(counter);
new BlockProcessor(config).process(it._1(), it._2(), reporter);
return reporter.getReport().iterator();
});
RDD<Tuple2<Object, String>> vertexes = relationRDD.groupByKey().map(it -> {
Long id = (long) it._1().hashCode();
return new Tuple2<Object, String>(id, it._1());
}).rdd();
final RDD<Edge<String>> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "similarTo")).rdd();
Tuple2<Object, RDD<String>> cc = GraphProcessor.findCCs(vertexes, edgeRdd, 20);
final Long total = (Long) cc._1();
final JavaRDD<String> map = mapDocs.map(Tuple2::_1);
final JavaRDD<String> duplicatesRDD = cc._2().toJavaRDD();
final JavaRDD<String> nonDuplicates = map.subtract(duplicatesRDD);
System.out.println("Non duplicates: "+ nonDuplicates.count());
System.out.println("Connected Components: "+ total);
System.out.println("total Element = " + count);
// final MapDocument resA = result(config, "A", "Recent results from CDF");
// final MapDocument resB = result(config, "B", "Recent results from CDF");
//
// final ScoreResult sr = new PaceDocumentDistance().between(resA, resB, config);
// final double d = sr.getScore();
// System.out.println(String.format(" d ---> %s", d));
}
@ -78,24 +114,6 @@ public class SparkTest {
}
static List<Tuple2<String, String>> process(DedupConfig conf, Tuple2<String, Iterable<MapDocument>> entry, Counters c) {
try {
return new BlockProcessor(conf).process(entry._1(), StreamSupport.stream(entry._2().spliterator(),false), new Reporter() {
@Override
public void incrementCounter(String counterGroup, String counterName, long delta) {
c.get(counterGroup, counterName).addAndGet(delta);
}
@Override
public void emit(String type, String from, String to) {
}
});
} catch (IOException | InterruptedException e) {
e.printStackTrace();
return new ArrayList<>();
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.graph
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
;
object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId,String)], edges:RDD[Edge[String]], maxIterations: Int): (Long, RDD[String]) = {
val graph: Graph[String, String] = Graph(vertexes, edges)
val cc = graph.connectedComponents(maxIterations).vertices
val totalCC =cc.map{
case (openaireId, ccId) =>ccId
}.distinct().count()
val connectedComponents: RDD[String] = vertexes.join(cc).map {
case (id, (openaireId, ccId)) => openaireId
}.distinct()
(totalCC, connectedComponents)
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.reporter;
import eu.dnetlib.DnetAccumulator;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.HashMap;
import java.util.Map;
public class SparkCounter {
final JavaSparkContext javaSparkContext;
public SparkCounter(final JavaSparkContext context){
this.javaSparkContext = context;
}
final Map<String, DnetAccumulator> accumulators = new HashMap<>();
public void incrementCounter(String counterGroup, String counterName, long delta) {
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
DnetAccumulator currentAccumulator = null;
if (!accumulators.containsKey(accumulatorName)) {
currentAccumulator = new DnetAccumulator(counterGroup, counterName);
javaSparkContext.sc().register(currentAccumulator,accumulatorName);
accumulators.put(accumulatorName, currentAccumulator);
} else {
currentAccumulator = accumulators.get(accumulatorName);
}
currentAccumulator.add(delta);
}
public Map<String, DnetAccumulator> getAccumulators() {
return accumulators;
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.reporter;
import eu.dnetlib.DnetAccumulator;
import eu.dnetlib.Reporter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaSparkContext;
import org.glassfish.jersey.internal.util.collection.StringIgnoreCaseKeyComparator;
import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SparkReporter implements Reporter {
final SparkCounter counter;
final List<Tuple2<String, String>> report = new ArrayList<>();
private static final Log log = LogFactory.getLog(SparkReporter.class);
public SparkReporter(SparkCounter counter){
this.counter = counter;
}
@Override
public void incrementCounter(String counterGroup, String counterName, long delta) {
counter.incrementCounter(counterGroup, counterName, delta);
}
@Override
public void emit(String type, String from, String to) {
report.add(new Tuple2<>(from, to));
}
public List<Tuple2<String, String>> getReport() {
return report;
}
}

View File

@ -18,9 +18,11 @@
{ "name" : "spacetrimmingfieldvalue", "fields" : [ "legalshortname" ], "params" : { "randomLength" : "5" } },
{ "name" : "urlclustering", "fields" : [ "websiteurl" ], "params" : { } }
],
"strictConditions":[
{ "name" : "exactMatch", "fields" : [ "gridid" ] }
],
"conditions" : [
{ "name" : "exactMatch", "fields" : [ "country" ] },
{ "name" : "mustBeDifferent", "fields" : [ "gridid" ] }
{ "name" : "exactMatch", "fields" : [ "country" ] }
],
"model" : [
{ "name" : "legalname", "algo" : "LevensteinTitle", "type" : "String", "weight" : "0.2", "ignoreMissing" : "false", "path" : "organization/metadata/legalname/value" },

View File

@ -1,5 +1,6 @@
package eu.dnetlib.pace.config;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -12,7 +13,7 @@ import eu.dnetlib.pace.model.CondDef;
import eu.dnetlib.pace.model.FieldDef;
import org.apache.commons.collections.CollectionUtils;
public class PaceConfig {
public class PaceConfig implements Serializable {
private List<FieldDef> model;
private List<CondDef> strictConditions;

View File

@ -1,5 +1,6 @@
package eu.dnetlib.pace.config;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -9,7 +10,7 @@ import com.google.common.collect.Sets;
import com.google.gson.GsonBuilder;
import org.apache.commons.lang.StringUtils;
public class WfConfig {
public class WfConfig implements Serializable {
/**
* Entity type.

View File

@ -1,12 +1,13 @@
package eu.dnetlib.pace.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.google.gson.Gson;
import eu.dnetlib.pace.clustering.*;
public class ClusteringDef {
public class ClusteringDef implements Serializable {
private Clustering name;

View File

@ -1,12 +1,13 @@
package eu.dnetlib.pace.model;
import java.io.Serializable;
import java.util.List;
import com.google.gson.Gson;
import eu.dnetlib.pace.condition.*;
import eu.dnetlib.pace.config.Cond;
public class CondDef {
public class CondDef implements Serializable {
private Cond name;

View File

@ -1,5 +1,6 @@
package eu.dnetlib.pace.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -14,7 +15,7 @@ import eu.dnetlib.pace.distance.algo.*;
/**
* The schema is composed by field definitions (FieldDef). Each field has a type, a name, and an associated distance algorithm.
*/
public class FieldDef {
public class FieldDef implements Serializable {
public final static String PATH_SEPARATOR = "/";