forked from D-Net/dnet-hadoop
Allow processing of immutable sorted blocks in dedup
This commit is contained in:
parent
cb7ad9889c
commit
3129c1c48b
|
@ -58,7 +58,7 @@ public class BlockProcessor {
|
||||||
this.orderFieldPos = orderFieldPos;
|
this.orderFieldPos = orderFieldPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processSortedRows(final Collection<Row> documents, final Reporter context) {
|
public void processSortedRows(final List<Row> documents, final Reporter context) {
|
||||||
if (documents.size() > 1) {
|
if (documents.size() > 1) {
|
||||||
// log.info("reducing key: '" + key + "' records: " + q.size());
|
// log.info("reducing key: '" + key + "' records: " + q.size());
|
||||||
processRows(documents, context);
|
processRows(documents, context);
|
||||||
|
@ -68,13 +68,10 @@ public class BlockProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRows(final Collection<Row> queue, final Reporter context) {
|
private void processRows(final List<Row> queue, final Reporter context) {
|
||||||
|
|
||||||
Iterator<Row> it = queue.iterator();
|
for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) {
|
||||||
while (it.hasNext()) {
|
final Row pivot = queue.get(pivotPos);
|
||||||
|
|
||||||
final Row pivot = it.next();
|
|
||||||
it.remove();
|
|
||||||
|
|
||||||
final String idPivot = pivot.getString(identifierFieldPos); // identifier
|
final String idPivot = pivot.getString(identifierFieldPos); // identifier
|
||||||
final Object fieldsPivot = getJavaValue(pivot, orderFieldPos);
|
final Object fieldsPivot = getJavaValue(pivot, orderFieldPos);
|
||||||
|
@ -83,13 +80,12 @@ public class BlockProcessor {
|
||||||
|
|
||||||
if (fieldPivot != null) {
|
if (fieldPivot != null) {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (final Row curr : queue) {
|
for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) {
|
||||||
|
final Row curr = queue.get(windowPos);
|
||||||
final String idCurr = curr.getString(identifierFieldPos); // identifier
|
final String idCurr = curr.getString(identifierFieldPos); // identifier
|
||||||
|
|
||||||
if (mustSkip(idCurr)) {
|
if (mustSkip(idCurr)) {
|
||||||
|
|
||||||
context.incrementCounter(wf.getEntityType(), "skip list", 1);
|
context.incrementCounter(wf.getEntityType(), "skip list", 1);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +101,6 @@ public class BlockProcessor {
|
||||||
final TreeProcessor treeProcessor = new TreeProcessor(dedupConf);
|
final TreeProcessor treeProcessor = new TreeProcessor(dedupConf);
|
||||||
|
|
||||||
emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context);
|
emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue