diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java index fae749383..95918a7c7 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java @@ -58,7 +58,7 @@ public class BlockProcessor { this.orderFieldPos = orderFieldPos; } - public void processSortedRows(final Collection documents, final Reporter context) { + public void processSortedRows(final List documents, final Reporter context) { if (documents.size() > 1) { // log.info("reducing key: '" + key + "' records: " + q.size()); processRows(documents, context); @@ -68,13 +68,10 @@ public class BlockProcessor { } } - private void processRows(final Collection queue, final Reporter context) { + private void processRows(final List queue, final Reporter context) { - Iterator it = queue.iterator(); - while (it.hasNext()) { - - final Row pivot = it.next(); - it.remove(); + for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) { + final Row pivot = queue.get(pivotPos); final String idPivot = pivot.getString(identifierFieldPos); // identifier final Object fieldsPivot = getJavaValue(pivot, orderFieldPos); @@ -83,13 +80,12 @@ public class BlockProcessor { if (fieldPivot != null) { int i = 0; - for (final Row curr : queue) { + for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) { + final Row curr = queue.get(windowPos); final String idCurr = curr.getString(identifierFieldPos); // identifier if (mustSkip(idCurr)) { - context.incrementCounter(wf.getEntityType(), "skip list", 1); - break; } @@ -105,7 +101,6 @@ public class BlockProcessor { final TreeProcessor treeProcessor = new TreeProcessor(dedupConf); emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context); - } } }