From 67e5f9858b6ece0080deb4ab8a27f3be6c77b086 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 11 Oct 2018 15:19:20 +0200 Subject: [PATCH] Added FSpark Implementation of dedup --- dnet-dedup-test/pom.xml | 6 + .../main/java/eu/dnetlib/BlockProcessor.java | 44 ++++--- .../main/java/eu/dnetlib/DnetAccumulator.java | 67 ++++++++++ .../src/main/java/eu/dnetlib/Reporter.java | 4 +- .../src/main/java/eu/dnetlib/SparkTest.java | 116 ++++++++++-------- .../eu/dnetlib/graph/GraphProcessor.scala | 24 ++++ .../eu/dnetlib/reporter/SparkCounter.java | 36 ++++++ .../eu/dnetlib/reporter/SparkReporter.java | 44 +++++++ .../eu/dnetlib/pace/organization.pace.conf | 6 +- .../eu/dnetlib/pace/config/PaceConfig.java | 3 +- .../java/eu/dnetlib/pace/config/WfConfig.java | 3 +- .../eu/dnetlib/pace/model/ClusteringDef.java | 3 +- .../java/eu/dnetlib/pace/model/CondDef.java | 3 +- .../java/eu/dnetlib/pace/model/FieldDef.java | 3 +- 14 files changed, 285 insertions(+), 77 deletions(-) create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/graph/GraphProcessor.scala create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkCounter.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkReporter.java diff --git a/dnet-dedup-test/pom.xml b/dnet-dedup-test/pom.xml index 885a497..2655fa7 100644 --- a/dnet-dedup-test/pom.xml +++ b/dnet-dedup-test/pom.xml @@ -58,6 +58,12 @@ spark-core_2.11 ${spark.version} + + org.apache.spark + spark-graphx_2.11 + ${spark.version} + + diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java index db212ba..4e75459 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java @@ -12,38 +12,46 @@ import eu.dnetlib.pace.model.MapDocumentComparator; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import scala.Tuple2; -import java.io.IOException; import java.util.*; -import java.util.stream.Stream; public class BlockProcessor { + public static final List accumulators= new ArrayList<>(); + private static final Log log = LogFactory.getLog(BlockProcessor.class); private DedupConfig dedupConf; + + public static void constructAccumulator( final DedupConfig dedupConf) { + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1")); + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField())); + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()))); + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list")); + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)")); + accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold())); + } + public BlockProcessor(DedupConfig dedupConf) { this.dedupConf = dedupConf; } - public List> process(final String key, final Stream documents, final Reporter context) throws IOException, InterruptedException { + public void process(final String key, final Iterable documents, final Reporter context) { final Queue q = prepare(documents); if (q.size() > 1) { log.info("reducing key: '" + key + "' records: " + q.size()); //process(q, context); - return process(simplifyQueue(q, key, context), context); + process(simplifyQueue(q, key, context), context); } else { context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1); - return new ArrayList<>(); } } - private Queue prepare(final Stream documents) { - final Queue queue = new PriorityQueue(100, new MapDocumentComparator(dedupConf.getWf().getOrderField())); + private Queue prepare(final Iterable documents) { + final Queue queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField())); final Set seen = new HashSet(); final int queueMaxSize = dedupConf.getWf().getQueueMaxSize(); @@ -63,7 +71,7 @@ public class BlockProcessor { } private Queue simplifyQueue(final Queue queue, final String ngram, final Reporter context) { - final Queue q = new LinkedList(); + final Queue q = new LinkedList<>(); String fieldRef = ""; final List tempResults = Lists.newArrayList(); @@ -106,10 +114,9 @@ public class BlockProcessor { } } - private List> process(final Queue queue, final Reporter context) throws IOException, InterruptedException { + private void process(final Queue queue, final Reporter context) { final PaceDocumentDistance algo = new PaceDocumentDistance(); - List> resultEmit = new ArrayList<>(); while (!queue.isEmpty()) { @@ -144,21 +151,20 @@ public class BlockProcessor { if (!idCurr.equals(idPivot) && (fieldCurr != null)) { final ScoreResult sr = similarity(algo, pivot, curr); - emitOutput(sr, idPivot, idCurr,context, resultEmit); + emitOutput(sr, idPivot, idCurr, context); i++; } } } } - return resultEmit; } - private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context,List> emitResult) throws IOException, InterruptedException { + private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context) { final double d = sr.getScore(); if (d >= dedupConf.getWf().getThreshold()) { - writeSimilarity(idPivot, idCurr, emitResult); + writeSimilarity(context, idPivot, idCurr); context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1); } else { context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1); @@ -182,9 +188,11 @@ public class BlockProcessor { return StringUtils.substringBetween(id, "|", "::"); } - private void writeSimilarity( final String from, final String to, List> emitResult){ - emitResult.add(new Tuple2<>(from, to)); - emitResult.add(new Tuple2<>( to, from)); + private void writeSimilarity(final Reporter context, final String from, final String to) { + final String type = dedupConf.getWf().getEntityType(); + + context.emit(type, from, to); + context.emit(type, to, from); } } diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java b/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java new file mode 100644 index 0000000..9683770 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java @@ -0,0 +1,67 @@ +package eu.dnetlib; + +import org.apache.spark.util.AccumulatorV2; + +public class DnetAccumulator extends AccumulatorV2 { + + private Long counter= 0L; + + private String group; + + private String name; + + + public DnetAccumulator(final String group, final String name){ + this.group = group; + this.name = name; + } + + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public boolean isZero() { + return counter == 0; + } + + @Override + public AccumulatorV2 copy() { + final DnetAccumulator acc = new DnetAccumulator(group, name); + acc.add(counter); + return acc; + } + + @Override + public void reset() { + counter = 0L; + } + + @Override + public void add(Long aLong) { + counter += aLong; + } + + @Override + public void merge(AccumulatorV2 accumulatorV2) { + add(accumulatorV2.value()); + } + + @Override + public Long value() { + return counter; + } +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java index 6dc675a..74cf5c1 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java @@ -1,13 +1,11 @@ package eu.dnetlib; -import java.io.IOException; import java.io.Serializable; public interface Reporter extends Serializable { void incrementCounter(String counterGroup, String counterName, long delta); - void emit(final String type, final String from, final String to) throws IOException, InterruptedException; - + void emit(String type, String from, String to); } diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java index 14a7abf..d473d6a 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java @@ -1,63 +1,99 @@ package eu.dnetlib; import com.google.common.collect.Sets; +import eu.dnetlib.graph.GraphProcessor; import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.utils.PaceUtils; +import eu.dnetlib.reporter.SparkCounter; +import eu.dnetlib.reporter.SparkReporter; import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; import scala.Tuple2; - import java.io.IOException; import java.io.StringWriter; -import java.util.*; +import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public class SparkTest { - - class Results extends HashMap> { - public Results(Set keys) { - super(keys.size()); - keys.forEach(k -> put(k, new HashSet<>())); - } - } - + public static SparkCounter counter ; + private static final Log log = LogFactory.getLog(SparkTest.class); public static void main(String[] args) { final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]")); - final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations.json"); + final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations_complete.json"); - final Counters c = new Counters(); + counter = new SparkCounter(context); - long count = dataRDD.mapToPair(it -> { - final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); + final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); + BlockProcessor.constructAccumulator(config); + + BlockProcessor.accumulators.forEach(acc -> { + + final String[] values = acc.split("::"); + counter.incrementCounter(values[0], values[1], 0); + + }); + + JavaPairRDD mapDocs = dataRDD.mapToPair(it -> { MapDocument mapDocument = PaceUtils.asMapDocument(config, it); return new Tuple2<>(mapDocument.getIdentifier(), mapDocument); - }).reduceByKey((a, b) -> a).flatMapToPair(a -> { - final MapDocument currentDocument = a._2(); - final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); - return getGroupingKeys(config, currentDocument).stream() - .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator(); - }).groupByKey().flatMapToPair(it -> { - final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); - return process(config, it, c).iterator(); - }).count(); + }); + + + final JavaPairRDD relationRDD = mapDocs.reduceByKey((a, b) -> a) + .flatMapToPair(a -> { + final MapDocument currentDocument = a._2(); + return getGroupingKeys(config, currentDocument).stream() + .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator(); + }).groupByKey().flatMapToPair(it -> { + + final SparkReporter reporter = new SparkReporter(counter); + new BlockProcessor(config).process(it._1(), it._2(), reporter); + return reporter.getReport().iterator(); + }); + + RDD> vertexes = relationRDD.groupByKey().map(it -> { + + Long id = (long) it._1().hashCode(); + return new Tuple2(id, it._1()); + + }).rdd(); + + final RDD> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "similarTo")).rdd(); + + Tuple2> cc = GraphProcessor.findCCs(vertexes, edgeRdd, 20); + + final Long total = (Long) cc._1(); + + + + final JavaRDD map = mapDocs.map(Tuple2::_1); + + + final JavaRDD duplicatesRDD = cc._2().toJavaRDD(); + + + final JavaRDD nonDuplicates = map.subtract(duplicatesRDD); + + + + System.out.println("Non duplicates: "+ nonDuplicates.count()); + System.out.println("Connected Components: "+ total); + - System.out.println("total Element = " + count); -// final MapDocument resA = result(config, "A", "Recent results from CDF"); -// final MapDocument resB = result(config, "B", "Recent results from CDF"); -// -// final ScoreResult sr = new PaceDocumentDistance().between(resA, resB, config); -// final double d = sr.getScore(); -// System.out.println(String.format(" d ---> %s", d)); } @@ -78,24 +114,6 @@ public class SparkTest { } - static List> process(DedupConfig conf, Tuple2> entry, Counters c) { - try { - return new BlockProcessor(conf).process(entry._1(), StreamSupport.stream(entry._2().spliterator(),false), new Reporter() { - @Override - public void incrementCounter(String counterGroup, String counterName, long delta) { - c.get(counterGroup, counterName).addAndGet(delta); - } - - @Override - public void emit(String type, String from, String to) { - - } - }); - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - return new ArrayList<>(); - } - } diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/graph/GraphProcessor.scala b/dnet-dedup-test/src/main/java/eu/dnetlib/graph/GraphProcessor.scala new file mode 100644 index 0000000..76d7571 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/graph/GraphProcessor.scala @@ -0,0 +1,24 @@ +package eu.dnetlib.graph +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD +; + + +object GraphProcessor { + + def findCCs(vertexes: RDD[(VertexId,String)], edges:RDD[Edge[String]], maxIterations: Int): (Long, RDD[String]) = { + val graph: Graph[String, String] = Graph(vertexes, edges) + val cc = graph.connectedComponents(maxIterations).vertices + + + val totalCC =cc.map{ + case (openaireId, ccId) =>ccId + }.distinct().count() + + val connectedComponents: RDD[String] = vertexes.join(cc).map { + case (id, (openaireId, ccId)) => openaireId + }.distinct() + (totalCC, connectedComponents) + } + +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkCounter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkCounter.java new file mode 100644 index 0000000..3782f47 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkCounter.java @@ -0,0 +1,36 @@ +package eu.dnetlib.reporter; + +import eu.dnetlib.DnetAccumulator; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.HashMap; +import java.util.Map; + +public class SparkCounter { + final JavaSparkContext javaSparkContext; + + + public SparkCounter(final JavaSparkContext context){ + this.javaSparkContext = context; + } + + + final Map accumulators = new HashMap<>(); + + public void incrementCounter(String counterGroup, String counterName, long delta) { + final String accumulatorName = String.format("%s::%s", counterGroup, counterName); + DnetAccumulator currentAccumulator = null; + if (!accumulators.containsKey(accumulatorName)) { + currentAccumulator = new DnetAccumulator(counterGroup, counterName); + javaSparkContext.sc().register(currentAccumulator,accumulatorName); + accumulators.put(accumulatorName, currentAccumulator); + } else { + currentAccumulator = accumulators.get(accumulatorName); + } + currentAccumulator.add(delta); + } + + public Map getAccumulators() { + return accumulators; + } +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkReporter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkReporter.java new file mode 100644 index 0000000..7d100d8 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/reporter/SparkReporter.java @@ -0,0 +1,44 @@ +package eu.dnetlib.reporter; + +import eu.dnetlib.DnetAccumulator; +import eu.dnetlib.Reporter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.api.java.JavaSparkContext; +import org.glassfish.jersey.internal.util.collection.StringIgnoreCaseKeyComparator; +import scala.Tuple2; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SparkReporter implements Reporter { + + final SparkCounter counter; + + final List> report = new ArrayList<>(); + private static final Log log = LogFactory.getLog(SparkReporter.class); + + public SparkReporter(SparkCounter counter){ + this.counter = counter; + } + + + @Override + public void incrementCounter(String counterGroup, String counterName, long delta) { + counter.incrementCounter(counterGroup, counterName, delta); + } + + @Override + public void emit(String type, String from, String to) { + + + report.add(new Tuple2<>(from, to)); + } + + public List> getReport() { + return report; + } +} diff --git a/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf index 0dcfe51..e1f2687 100644 --- a/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf +++ b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf @@ -18,9 +18,11 @@ { "name" : "spacetrimmingfieldvalue", "fields" : [ "legalshortname" ], "params" : { "randomLength" : "5" } }, { "name" : "urlclustering", "fields" : [ "websiteurl" ], "params" : { } } ], + "strictConditions":[ + { "name" : "exactMatch", "fields" : [ "gridid" ] } + ], "conditions" : [ - { "name" : "exactMatch", "fields" : [ "country" ] }, - { "name" : "mustBeDifferent", "fields" : [ "gridid" ] } + { "name" : "exactMatch", "fields" : [ "country" ] } ], "model" : [ { "name" : "legalname", "algo" : "LevensteinTitle", "type" : "String", "weight" : "0.2", "ignoreMissing" : "false", "path" : "organization/metadata/legalname/value" }, diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java index ffc67e7..37fea00 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java @@ -1,5 +1,6 @@ package eu.dnetlib.pace.config; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -12,7 +13,7 @@ import eu.dnetlib.pace.model.CondDef; import eu.dnetlib.pace.model.FieldDef; import org.apache.commons.collections.CollectionUtils; -public class PaceConfig { +public class PaceConfig implements Serializable { private List model; private List strictConditions; diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java index 9e836eb..bcf2d65 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java @@ -1,5 +1,6 @@ package eu.dnetlib.pace.config; +import java.io.Serializable; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -9,7 +10,7 @@ import com.google.common.collect.Sets; import com.google.gson.GsonBuilder; import org.apache.commons.lang.StringUtils; -public class WfConfig { +public class WfConfig implements Serializable { /** * Entity type. diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java index 5909788..db7092b 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java @@ -1,12 +1,13 @@ package eu.dnetlib.pace.model; +import java.io.Serializable; import java.util.List; import java.util.Map; import com.google.gson.Gson; import eu.dnetlib.pace.clustering.*; -public class ClusteringDef { +public class ClusteringDef implements Serializable { private Clustering name; diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/CondDef.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/CondDef.java index 33f30fa..747f6c1 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/CondDef.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/CondDef.java @@ -1,12 +1,13 @@ package eu.dnetlib.pace.model; +import java.io.Serializable; import java.util.List; import com.google.gson.Gson; import eu.dnetlib.pace.condition.*; import eu.dnetlib.pace.config.Cond; -public class CondDef { +public class CondDef implements Serializable { private Cond name; diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java index 776c203..8507b0c 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java @@ -1,5 +1,6 @@ package eu.dnetlib.pace.model; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -14,7 +15,7 @@ import eu.dnetlib.pace.distance.algo.*; /** * The schema is composed by field definitions (FieldDef). Each field has a type, a name, and an associated distance algorithm. */ -public class FieldDef { +public class FieldDef implements Serializable { public final static String PATH_SEPARATOR = "/";