diff --git a/dnet-pace-core/pom.xml b/dnet-pace-core/pom.xml
index cb31ed689..d35fd68f1 100644
--- a/dnet-pace-core/pom.xml
+++ b/dnet-pace-core/pom.xml
@@ -1,70 +1,60 @@
+
+ 4.0.0
+
eu.dnetlib
- dnet45-parent
- 1.0.0
-
+ dnet-dedup
+ 3.0.0-SNAPSHOT
+ ../pom.xml
- 4.0.0
- eu.dnetlib
+
dnet-pace-core
- jar
- 2.6.8-SNAPSHOT
-
- scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-pace-core/trunk
-
+ jar
+
edu.cmu
secondstring
- 1.0.0
com.google.guava
guava
- 15.0
com.google.code.gson
gson
- ${google.gson.version}
commons-lang
commons-lang
- ${commons.lang.version}
commons-io
commons-io
- ${commons.io.version}
commons-collections
commons-collections
- ${commons.collections.version}
com.googlecode.protobuf-java-format
protobuf-java-format
- 1.2
org.antlr
stringtemplate
- 3.2
commons-logging
commons-logging
- ${commons.logging.version}
junit
junit
- ${junit.version}
test
+
diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java
new file mode 100644
index 000000000..a9979f5ed
--- /dev/null
+++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java
@@ -0,0 +1,199 @@
+package eu.dnetlib.pace.util;
+
+import com.google.common.collect.Lists;
+import eu.dnetlib.pace.clustering.NGramUtils;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.config.WfConfig;
+import eu.dnetlib.pace.distance.PaceDocumentDistance;
+import eu.dnetlib.pace.distance.eval.ScoreResult;
+import eu.dnetlib.pace.model.Field;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.model.MapDocumentComparator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+
+public class BlockProcessor {
+
+ public static final List accumulators= new ArrayList<>();
+
+ private static final Log log = LogFactory.getLog(BlockProcessor.class);
+
+ private DedupConfig dedupConf;
+
+
+ public static void constructAccumulator( final DedupConfig dedupConf) {
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize())));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"));
+ accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
+ }
+
+ public BlockProcessor(DedupConfig dedupConf) {
+ this.dedupConf = dedupConf;
+ }
+
+ public void process(final String key, final Iterable documents, final Reporter context) {
+
+ final Queue q = prepare(documents);
+
+ if (q.size() > 1) {
+ log.info("reducing key: '" + key + "' records: " + q.size());
+ //process(q, context);
+ process(simplifyQueue(q, key, context), context);
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
+ }
+ }
+
+ private Queue prepare(final Iterable documents) {
+ final Queue queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
+
+ final Set seen = new HashSet();
+ final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
+
+ documents.forEach(doc -> {
+ if (queue.size() <= queueMaxSize) {
+ final String id = doc.getIdentifier();
+
+ if (!seen.contains(id)) {
+ seen.add(id);
+ queue.add(doc);
+ }
+ }
+ });
+
+ return queue;
+ }
+
+ private Queue simplifyQueue(final Queue queue, final String ngram, final Reporter context) {
+ final Queue q = new LinkedList<>();
+
+ String fieldRef = "";
+ final List tempResults = Lists.newArrayList();
+
+ while (!queue.isEmpty()) {
+ final MapDocument result = queue.remove();
+
+ final String orderFieldName = dedupConf.getWf().getOrderField();
+ final Field orderFieldValue = result.values(orderFieldName);
+ if (!orderFieldValue.isEmpty()) {
+ final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
+ if (field.equals(fieldRef)) {
+ tempResults.add(result);
+ } else {
+ populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
+ tempResults.clear();
+ tempResults.add(result);
+ fieldRef = field;
+ }
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1);
+ }
+ }
+ populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
+
+ return q;
+ }
+
+ private void populateSimplifiedQueue(final Queue q,
+ final List tempResults,
+ final Reporter context,
+ final String fieldRef,
+ final String ngram) {
+ WfConfig wf = dedupConf.getWf();
+ if (tempResults.size() < wf.getGroupMaxSize()) {
+ q.addAll(tempResults);
+ } else {
+ context.incrementCounter(wf.getEntityType(), String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()), tempResults.size());
+ log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
+ }
+ }
+
+ private void process(final Queue queue, final Reporter context) {
+
+ final PaceDocumentDistance algo = new PaceDocumentDistance();
+
+ while (!queue.isEmpty()) {
+
+ final MapDocument pivot = queue.remove();
+ final String idPivot = pivot.getIdentifier();
+
+ WfConfig wf = dedupConf.getWf();
+ final Field fieldsPivot = pivot.values(wf.getOrderField());
+ final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
+
+ if (fieldPivot != null) {
+ // System.out.println(idPivot + " --> " + fieldPivot);
+
+ int i = 0;
+ for (final MapDocument curr : queue) {
+ final String idCurr = curr.getIdentifier();
+
+ if (mustSkip(idCurr)) {
+
+ context.incrementCounter(wf.getEntityType(), "skip list", 1);
+
+ break;
+ }
+
+ if (i > wf.getSlidingWindowSize()) {
+ break;
+ }
+
+ final Field fieldsCurr = curr.values(wf.getOrderField());
+ final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
+
+ if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
+
+ final ScoreResult sr = similarity(algo, pivot, curr);
+ log.info(sr.toString()+"SCORE "+ sr.getScore());
+ emitOutput(sr, idPivot, idCurr, context);
+ i++;
+ }
+ }
+ }
+ }
+ }
+
+ private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context) {
+ final double d = sr.getScore();
+
+ if (d >= dedupConf.getWf().getThreshold()) {
+
+ writeSimilarity(context, idPivot, idCurr);
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
+ }
+ }
+
+ private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
+ try {
+ return algo.between(a, b, dedupConf);
+ } catch(Throwable e) {
+ log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private boolean mustSkip(final String idPivot) {
+ return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
+ }
+
+ private String getNsPrefix(final String id) {
+ return StringUtils.substringBetween(id, "|", "::");
+ }
+
+ private void writeSimilarity(final Reporter context, final String from, final String to) {
+ final String type = dedupConf.getWf().getEntityType();
+
+ context.emit(type, from, to);
+ context.emit(type, to, from);
+ }
+
+}
diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/Reporter.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/Reporter.java
new file mode 100644
index 000000000..10c886cb5
--- /dev/null
+++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/util/Reporter.java
@@ -0,0 +1,11 @@
+package eu.dnetlib.pace.util;
+
+
+import java.io.Serializable;
+
+public interface Reporter extends Serializable {
+
+ void incrementCounter(String counterGroup, String counterName, long delta);
+
+ void emit(String type, String from, String to);
+}