diff --git a/dnet-dedup-test/pom.xml b/dnet-dedup-test/pom.xml
index 885a497..2655fa7 100644
--- a/dnet-dedup-test/pom.xml
+++ b/dnet-dedup-test/pom.xml
@@ -58,6 +58,12 @@
spark-core_2.11
${spark.version}
+
+ org.apache.spark
+ spark-graphx_2.11
+ ${spark.version}
+
+
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java
index db212ba..4e75459 100644
--- a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java
@@ -12,38 +12,46 @@ import eu.dnetlib.pace.model.MapDocumentComparator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import scala.Tuple2;
-import java.io.IOException;
import java.util.*;
-import java.util.stream.Stream;
public class BlockProcessor {
+ public static final List accumulators= new ArrayList<>();
+
private static final Log log = LogFactory.getLog(BlockProcessor.class);
private DedupConfig dedupConf;
+
+ public static void constructAccumulator( final DedupConfig dedupConf) {
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize())));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
+ }
+
public BlockProcessor(DedupConfig dedupConf) {
this.dedupConf = dedupConf;
}
- public List> process(final String key, final Stream documents, final Reporter context) throws IOException, InterruptedException {
+ public void process(final String key, final Iterable documents, final Reporter context) {
final Queue q = prepare(documents);
if (q.size() > 1) {
log.info("reducing key: '" + key + "' records: " + q.size());
//process(q, context);
- return process(simplifyQueue(q, key, context), context);
+ process(simplifyQueue(q, key, context), context);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
- return new ArrayList<>();
}
}
- private Queue prepare(final Stream documents) {
- final Queue queue = new PriorityQueue(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
+ private Queue prepare(final Iterable documents) {
+ final Queue queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
final Set seen = new HashSet();
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
@@ -63,7 +71,7 @@ public class BlockProcessor {
}
private Queue simplifyQueue(final Queue queue, final String ngram, final Reporter context) {
- final Queue q = new LinkedList();
+ final Queue q = new LinkedList<>();
String fieldRef = "";
final List tempResults = Lists.newArrayList();
@@ -106,10 +114,9 @@ public class BlockProcessor {
}
}
- private List> process(final Queue queue, final Reporter context) throws IOException, InterruptedException {
+ private void process(final Queue queue, final Reporter context) {
final PaceDocumentDistance algo = new PaceDocumentDistance();
- List> resultEmit = new ArrayList<>();
while (!queue.isEmpty()) {
@@ -144,21 +151,20 @@ public class BlockProcessor {
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
final ScoreResult sr = similarity(algo, pivot, curr);
- emitOutput(sr, idPivot, idCurr,context, resultEmit);
+ emitOutput(sr, idPivot, idCurr, context);
i++;
}
}
}
}
- return resultEmit;
}
- private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context,List> emitResult) throws IOException, InterruptedException {
+ private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context) {
final double d = sr.getScore();
if (d >= dedupConf.getWf().getThreshold()) {
- writeSimilarity(idPivot, idCurr, emitResult);
+ writeSimilarity(context, idPivot, idCurr);
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
@@ -182,9 +188,11 @@ public class BlockProcessor {
return StringUtils.substringBetween(id, "|", "::");
}
- private void writeSimilarity( final String from, final String to, List> emitResult){
- emitResult.add(new Tuple2<>(from, to));
- emitResult.add(new Tuple2<>( to, from));
+ private void writeSimilarity(final Reporter context, final String from, final String to) {
+ final String type = dedupConf.getWf().getEntityType();
+
+ context.emit(type, from, to);
+ context.emit(type, to, from);
}
}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java b/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java
new file mode 100644
index 0000000..9683770
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/DnetAccumulator.java
@@ -0,0 +1,67 @@
+package eu.dnetlib;
+
+import org.apache.spark.util.AccumulatorV2;
+
+public class DnetAccumulator extends AccumulatorV2 {
+
+ private Long counter= 0L;
+
+ private String group;
+
+ private String name;
+
+
+ public DnetAccumulator(final String group, final String name){
+ this.group = group;
+ this.name = name;
+ }
+
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean isZero() {
+ return counter == 0;
+ }
+
+ @Override
+ public AccumulatorV2 copy() {
+ final DnetAccumulator acc = new DnetAccumulator(group, name);
+ acc.add(counter);
+ return acc;
+ }
+
+ @Override
+ public void reset() {
+ counter = 0L;
+ }
+
+ @Override
+ public void add(Long aLong) {
+ counter += aLong;
+ }
+
+ @Override
+ public void merge(AccumulatorV2 accumulatorV2) {
+ add(accumulatorV2.value());
+ }
+
+ @Override
+ public Long value() {
+ return counter;
+ }
+}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java
index 6dc675a..74cf5c1 100644
--- a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java
@@ -1,13 +1,11 @@
package eu.dnetlib;
-import java.io.IOException;
import java.io.Serializable;
public interface Reporter extends Serializable {
void incrementCounter(String counterGroup, String counterName, long delta);
- void emit(final String type, final String from, final String to) throws IOException, InterruptedException;
-
+ void emit(String type, String from, String to);
}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java
index 14a7abf..d473d6a 100644
--- a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java
@@ -1,63 +1,99 @@
package eu.dnetlib;
import com.google.common.collect.Sets;
+import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.utils.PaceUtils;
+import eu.dnetlib.reporter.SparkCounter;
+import eu.dnetlib.reporter.SparkReporter;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.rdd.RDD;
import scala.Tuple2;
-
import java.io.IOException;
import java.io.StringWriter;
-import java.util.*;
+import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
public class SparkTest {
-
- class Results extends HashMap> {
- public Results(Set keys) {
- super(keys.size());
- keys.forEach(k -> put(k, new HashSet<>()));
- }
- }
-
+ public static SparkCounter counter ;
+ private static final Log log = LogFactory.getLog(SparkTest.class);
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]"));
- final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations.json");
+ final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations_complete.json");
- final Counters c = new Counters();
+ counter = new SparkCounter(context);
- long count = dataRDD.mapToPair(it -> {
- final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
+ final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
+ BlockProcessor.constructAccumulator(config);
+
+ BlockProcessor.accumulators.forEach(acc -> {
+
+ final String[] values = acc.split("::");
+ counter.incrementCounter(values[0], values[1], 0);
+
+ });
+
+ JavaPairRDD mapDocs = dataRDD.mapToPair(it -> {
MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
return new Tuple2<>(mapDocument.getIdentifier(), mapDocument);
- }).reduceByKey((a, b) -> a).flatMapToPair(a -> {
- final MapDocument currentDocument = a._2();
- final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
- return getGroupingKeys(config, currentDocument).stream()
- .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
- }).groupByKey().flatMapToPair(it -> {
- final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
- return process(config, it, c).iterator();
- }).count();
+ });
+
+
+ final JavaPairRDD relationRDD = mapDocs.reduceByKey((a, b) -> a)
+ .flatMapToPair(a -> {
+ final MapDocument currentDocument = a._2();
+ return getGroupingKeys(config, currentDocument).stream()
+ .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
+ }).groupByKey().flatMapToPair(it -> {
+
+ final SparkReporter reporter = new SparkReporter(counter);
+ new BlockProcessor(config).process(it._1(), it._2(), reporter);
+ return reporter.getReport().iterator();
+ });
+
+ RDD> vertexes = relationRDD.groupByKey().map(it -> {
+
+ Long id = (long) it._1().hashCode();
+ return new Tuple2