diff --git a/dnet-dedup-test/pom.xml b/dnet-dedup-test/pom.xml
index 0b01f96..885a497 100644
--- a/dnet-dedup-test/pom.xml
+++ b/dnet-dedup-test/pom.xml
@@ -15,15 +15,76 @@
1.0-SNAPSHOT
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib
+ dnet-openaire-data-protos
+ 3.9.3-proto250
+
+
eu.dnetlib
dnet-pace-core
2.6.8-SNAPSHOT
+
+ org.apache.spark
+ spark-core_2.11
+ ${spark.version}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 2.2.0
+
+
\ No newline at end of file
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java
new file mode 100644
index 0000000..db212ba
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java
@@ -0,0 +1,190 @@
+package eu.dnetlib;
+
+import com.google.common.collect.Lists;
+import eu.dnetlib.pace.clustering.NGramUtils;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.config.WfConfig;
+import eu.dnetlib.pace.distance.PaceDocumentDistance;
+import eu.dnetlib.pace.distance.eval.ScoreResult;
+import eu.dnetlib.pace.model.Field;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.model.MapDocumentComparator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Stream;
+
+public class BlockProcessor {
+
+ private static final Log log = LogFactory.getLog(BlockProcessor.class);
+
+ private DedupConfig dedupConf;
+
+ public BlockProcessor(DedupConfig dedupConf) {
+ this.dedupConf = dedupConf;
+ }
+
+ public List> process(final String key, final Stream documents, final Reporter context) throws IOException, InterruptedException {
+
+ final Queue q = prepare(documents);
+
+ if (q.size() > 1) {
+ log.info("reducing key: '" + key + "' records: " + q.size());
+ //process(q, context);
+ return process(simplifyQueue(q, key, context), context);
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
+ return new ArrayList<>();
+ }
+ }
+
+ private Queue prepare(final Stream documents) {
+ final Queue queue = new PriorityQueue(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
+
+ final Set seen = new HashSet();
+ final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
+
+ documents.forEach(doc -> {
+ if (queue.size() <= queueMaxSize) {
+ final String id = doc.getIdentifier();
+
+ if (!seen.contains(id)) {
+ seen.add(id);
+ queue.add(doc);
+ }
+ }
+ });
+
+ return queue;
+ }
+
+ private Queue simplifyQueue(final Queue queue, final String ngram, final Reporter context) {
+ final Queue q = new LinkedList();
+
+ String fieldRef = "";
+ final List tempResults = Lists.newArrayList();
+
+ while (!queue.isEmpty()) {
+ final MapDocument result = queue.remove();
+
+ final String orderFieldName = dedupConf.getWf().getOrderField();
+ final Field orderFieldValue = result.values(orderFieldName);
+ if (!orderFieldValue.isEmpty()) {
+ final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
+ if (field.equals(fieldRef)) {
+ tempResults.add(result);
+ } else {
+ populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
+ tempResults.clear();
+ tempResults.add(result);
+ fieldRef = field;
+ }
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1);
+ }
+ }
+ populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
+
+ return q;
+ }
+
+ private void populateSimplifiedQueue(final Queue q,
+ final List tempResults,
+ final Reporter context,
+ final String fieldRef,
+ final String ngram) {
+ WfConfig wf = dedupConf.getWf();
+ if (tempResults.size() < wf.getGroupMaxSize()) {
+ q.addAll(tempResults);
+ } else {
+ context.incrementCounter(wf.getEntityType(), String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()), tempResults.size());
+ log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
+ }
+ }
+
+ private List> process(final Queue queue, final Reporter context) throws IOException, InterruptedException {
+
+ final PaceDocumentDistance algo = new PaceDocumentDistance();
+ List> resultEmit = new ArrayList<>();
+
+ while (!queue.isEmpty()) {
+
+ final MapDocument pivot = queue.remove();
+ final String idPivot = pivot.getIdentifier();
+
+ WfConfig wf = dedupConf.getWf();
+ final Field fieldsPivot = pivot.values(wf.getOrderField());
+ final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
+
+ if (fieldPivot != null) {
+ // System.out.println(idPivot + " --> " + fieldPivot);
+
+ int i = 0;
+ for (final MapDocument curr : queue) {
+ final String idCurr = curr.getIdentifier();
+
+ if (mustSkip(idCurr)) {
+
+ context.incrementCounter(wf.getEntityType(), "skip list", 1);
+
+ break;
+ }
+
+ if (i > wf.getSlidingWindowSize()) {
+ break;
+ }
+
+ final Field fieldsCurr = curr.values(wf.getOrderField());
+ final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
+
+ if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
+
+ final ScoreResult sr = similarity(algo, pivot, curr);
+ emitOutput(sr, idPivot, idCurr,context, resultEmit);
+ i++;
+ }
+ }
+ }
+ }
+ return resultEmit;
+ }
+
+ private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context,List> emitResult) throws IOException, InterruptedException {
+ final double d = sr.getScore();
+
+ if (d >= dedupConf.getWf().getThreshold()) {
+
+ writeSimilarity(idPivot, idCurr, emitResult);
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
+ } else {
+ context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
+ }
+ }
+
+ private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
+ try {
+ return algo.between(a, b, dedupConf);
+ } catch(Throwable e) {
+ log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private boolean mustSkip(final String idPivot) {
+ return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
+ }
+
+ private String getNsPrefix(final String id) {
+ return StringUtils.substringBetween(id, "|", "::");
+ }
+
+ private void writeSimilarity( final String from, final String to, List> emitResult){
+ emitResult.add(new Tuple2<>(from, to));
+ emitResult.add(new Tuple2<>( to, from));
+ }
+
+}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java
new file mode 100644
index 0000000..f9ce0c6
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java
@@ -0,0 +1,28 @@
+package eu.dnetlib;
+
+import org.apache.commons.logging.Log;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+class Counters extends HashMap> implements Serializable {
+
+ public AtomicLong get(String counterGroup, String counterName) {
+ if (!super.containsKey(counterGroup)) {
+ super.put(counterGroup, new HashMap<>());
+ }
+ if (!super.get(counterGroup).containsKey(counterName)) {
+ super.get(counterGroup).put(counterName, new AtomicLong(0));
+ }
+ return super.get(counterGroup).get(counterName);
+ }
+
+ public void print(final Log log) {
+ entrySet().forEach(cg -> {
+ cg.getValue().entrySet().forEach(cn -> {
+ log.info(cg.getKey() + " " + cn.getKey() + " " + cn.getValue());
+ });
+ });
+ }
+}
\ No newline at end of file
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java
new file mode 100644
index 0000000..6dc675a
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java
@@ -0,0 +1,13 @@
+package eu.dnetlib;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface Reporter extends Serializable {
+
+ void incrementCounter(String counterGroup, String counterName, long delta);
+
+ void emit(final String type, final String from, final String to) throws IOException, InterruptedException;
+
+}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java
new file mode 100644
index 0000000..14a7abf
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java
@@ -0,0 +1,108 @@
+package eu.dnetlib;
+
+import com.google.common.collect.Sets;
+import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.utils.PaceUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+
+public class SparkTest {
+
+ class Results extends HashMap> {
+ public Results(Set keys) {
+ super(keys.size());
+ keys.forEach(k -> put(k, new HashSet<>()));
+ }
+ }
+
+
+ public static void main(String[] args) {
+ final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]"));
+ final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations.json");
+
+ final Counters c = new Counters();
+
+ long count = dataRDD.mapToPair(it -> {
+ final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
+ MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
+ return new Tuple2<>(mapDocument.getIdentifier(), mapDocument);
+ }).reduceByKey((a, b) -> a).flatMapToPair(a -> {
+ final MapDocument currentDocument = a._2();
+ final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
+ return getGroupingKeys(config, currentDocument).stream()
+ .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
+ }).groupByKey().flatMapToPair(it -> {
+ final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
+ return process(config, it, c).iterator();
+ }).count();
+
+
+ System.out.println("total Element = " + count);
+
+// final MapDocument resA = result(config, "A", "Recent results from CDF");
+// final MapDocument resB = result(config, "B", "Recent results from CDF");
+//
+// final ScoreResult sr = new PaceDocumentDistance().between(resA, resB, config);
+// final double d = sr.getScore();
+// System.out.println(String.format(" d ---> %s", d));
+
+ }
+
+
+ static String readFromClasspath(final String filename) {
+ final StringWriter sw = new StringWriter();
+ try {
+ IOUtils.copy(SparkTest.class.getResourceAsStream(filename), sw);
+ return sw.toString();
+ } catch (final IOException e) {
+ throw new RuntimeException("cannot load resource from classpath: " + filename);
+ }
+ }
+
+
+ static Set getGroupingKeys(DedupConfig conf, MapDocument doc) {
+ return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
+ }
+
+
+ static List> process(DedupConfig conf, Tuple2> entry, Counters c) {
+ try {
+ return new BlockProcessor(conf).process(entry._1(), StreamSupport.stream(entry._2().spliterator(),false), new Reporter() {
+ @Override
+ public void incrementCounter(String counterGroup, String counterName, long delta) {
+ c.get(counterGroup, counterName).addAndGet(delta);
+ }
+
+ @Override
+ public void emit(String type, String from, String to) {
+
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ return new ArrayList<>();
+ }
+ }
+
+
+
+
+
+
+
+
+
+}
diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java b/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java
new file mode 100644
index 0000000..2d5a6e0
--- /dev/null
+++ b/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java
@@ -0,0 +1,150 @@
+package eu.dnetlib.data.transform;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.protobuf.Descriptors.EnumValueDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import com.googlecode.protobuf.format.JsonFormat;
+
+import eu.dnetlib.pace.config.Type;
+
+/**
+ * AbstractProtoMapper provide common navigation methods on the protocolbuffers Messages.
+ *
+ * @author claudio
+ */
+public abstract class AbstractProtoMapper {
+
+ private static final String COND_WRAPPER = "\\{|\\}";
+ private static final String COND_SEPARATOR = "#";
+ /** The Constant PATH_SEPARATOR. */
+ private static final String PATH_SEPARATOR = "/";
+
+ /**
+ * Process multi path.
+ *
+ * @param proto
+ * the proto
+ * @param paths
+ * the paths
+ * @return the list
+ */
+ protected List