From 3129c1c48b520c872cb228ada6556947dda596a7 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 26 Jun 2023 20:59:18 +0200 Subject: [PATCH] Allow processing of immutable sorted blocks in dedup --- .../eu/dnetlib/pace/util/BlockProcessor.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java index fae749383..95918a7c7 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java @@ -58,7 +58,7 @@ public class BlockProcessor { this.orderFieldPos = orderFieldPos; } - public void processSortedRows(final Collection documents, final Reporter context) { + public void processSortedRows(final List documents, final Reporter context) { if (documents.size() > 1) { // log.info("reducing key: '" + key + "' records: " + q.size()); processRows(documents, context); @@ -68,13 +68,10 @@ public class BlockProcessor { } } - private void processRows(final Collection queue, final Reporter context) { + private void processRows(final List queue, final Reporter context) { - Iterator it = queue.iterator(); - while (it.hasNext()) { - - final Row pivot = it.next(); - it.remove(); + for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) { + final Row pivot = queue.get(pivotPos); final String idPivot = pivot.getString(identifierFieldPos); // identifier final Object fieldsPivot = getJavaValue(pivot, orderFieldPos); @@ -83,13 +80,12 @@ public class BlockProcessor { if (fieldPivot != null) { int i = 0; - for (final Row curr : queue) { + for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) { + final Row curr = queue.get(windowPos); final String idCurr = curr.getString(identifierFieldPos); // identifier if (mustSkip(idCurr)) { - context.incrementCounter(wf.getEntityType(), "skip list", 1); - break; } @@ -105,7 +101,6 @@ public class BlockProcessor { final TreeProcessor treeProcessor = new TreeProcessor(dedupConf); emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context); - } } }