diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java index 46401a5b5..579ef6127 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -25,32 +25,47 @@ import eu.dnetlib.dhp.common.aggregation.AggregatorReport; public class BaseCollectorIterator implements Iterator { - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private Object nextElement; + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); - private boolean completed = false; - public BaseCollectorIterator(final FileSystem fs, final Path filePath, final AggregatorReport report) { new Thread(() -> importHadoopFile(fs, filePath, report)).start(); + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } } protected BaseCollectorIterator(final String resourcePath, final AggregatorReport report) { new Thread(() -> importTestFile(resourcePath, report)).start(); + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } } @Override public synchronized boolean hasNext() { - return !this.queue.isEmpty() || !isCompleted(); + return (this.nextElement != null) && (this.nextElement instanceof Element); } @Override public synchronized Element next() { try { - return this.queue.take(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); + return this.nextElement instanceof Element ? (Element) this.nextElement : null; + } finally { + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } } + } private void importHadoopFile(final FileSystem fs, final Path filePath, final AggregatorReport report) { @@ -64,7 +79,7 @@ public class BaseCollectorIterator implements Iterator { report.put(e.getClass().getName(), e.getMessage()); throw new RuntimeException("Error processing BASE records", e); } finally { - setCompleted(true); + this.queue.add("__END__"); // I ADD A NOT ELEMENT OBJECT TO INDICATE THE END OF THE QUEUE } } @@ -77,7 +92,7 @@ public class BaseCollectorIterator implements Iterator { report.put(e.getClass().getName(), e.getMessage()); throw new RuntimeException("Error processing BASE records", e); } finally { - setCompleted(true); + this.queue.add("__END__"); // I ADD A NOT ELEMENT OBJECT TO INDICATE THE END OF THE QUEUE } } @@ -104,7 +119,7 @@ public class BaseCollectorIterator implements Iterator { for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { if (o instanceof Element) { - this.queue.add((Element) o); + this.queue.add(o); count++; } } @@ -114,12 +129,4 @@ public class BaseCollectorIterator implements Iterator { log.info("Total records (written in queue): " + count); } - private synchronized boolean isCompleted() { - return this.completed; - } - - private synchronized void setCompleted(final boolean completed) { - this.completed = completed; - } - }