diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java index c1e660eac..46401a5b5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -13,35 +13,39 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.CompressorInputStream; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BaseCollectorIterator implements Iterator { +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; - private final InputStream origInputStream; - private boolean completed; +public class BaseCollectorIterator implements Iterator { private final BlockingQueue queue = new LinkedBlockingQueue<>(); private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); - public BaseCollectorIterator(final InputStream origInputStream) { - this.origInputStream = origInputStream; - this.completed = false; + private boolean completed = false; - new Thread(this::importFile).start(); + public BaseCollectorIterator(final FileSystem fs, final Path filePath, final AggregatorReport report) { + new Thread(() -> importHadoopFile(fs, filePath, report)).start(); + } + + protected BaseCollectorIterator(final String resourcePath, final AggregatorReport report) { + new Thread(() -> importTestFile(resourcePath, report)).start(); } @Override - public boolean hasNext() { - return !this.completed || !this.queue.isEmpty(); + public synchronized boolean hasNext() { + return !this.queue.isEmpty() || !isCompleted(); } @Override - public Element next() { + public synchronized Element next() { try { return this.queue.take(); } catch (final InterruptedException e) { @@ -49,46 +53,73 @@ public class BaseCollectorIterator implements Iterator { } } - private void importFile() { + private void importHadoopFile(final FileSystem fs, final Path filePath, final AggregatorReport report) { log.info("I start to read the TAR stream"); - final long count = 0; - try (final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(this.origInputStream)) { + try (InputStream origInputStream = fs.open(filePath); + final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { + importTarStream(tarInputStream); + } catch (final Throwable e) { + log.error("Error processing BASE records", e); + report.put(e.getClass().getName(), e.getMessage()); + throw new RuntimeException("Error processing BASE records", e); + } finally { + setCompleted(true); + } + } - TarArchiveEntry entry; - while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { - final String name = entry.getName(); + private void importTestFile(final String resourcePath, final AggregatorReport report) { + try (final InputStream origInputStream = BaseCollectorIterator.class.getResourceAsStream(resourcePath); + final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { + importTarStream(tarInputStream); + } catch (final Throwable e) { + log.error("Error processing BASE records", e); + report.put(e.getClass().getName(), e.getMessage()); + throw new RuntimeException("Error processing BASE records", e); + } finally { + setCompleted(true); + } + } - if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { + private void importTarStream(final TarArchiveInputStream tarInputStream) throws Exception { + TarArchiveEntry entry; + long count = 0; + while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { + final String name = entry.getName(); - log.info("Processing file (BZIP): " + name); + if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { - final byte[] bzipData = new byte[(int) entry.getSize()]; - IOUtils.readFully(tarInputStream, bzipData); + log.info("Processing file (BZIP): " + name); - try (InputStream bzipIs = new ByteArrayInputStream(bzipData); - final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); - final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) { + final byte[] bzipData = new byte[(int) entry.getSize()]; + IOUtils.readFully(tarInputStream, bzipData); - final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); + try (InputStream bzipIs = new ByteArrayInputStream(bzipData); + final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); + final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) { - final Document doc = DocumentHelper.parseText(xml); + final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); - for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { - if (o instanceof Element) { - this.queue.add((Element) o); - } + final Document doc = DocumentHelper.parseText(xml); + + for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { + if (o instanceof Element) { + this.queue.add((Element) o); + count++; } } } } - } catch (final Throwable e) { - log.error("Error processing BASE records", e); - throw new RuntimeException("Error processing BASE records", e); - } finally { - this.completed = true; - log.info("Total records (written in queue): " + count); } + log.info("Total records (written in queue): " + count); + } + + private synchronized boolean isCompleted() { + return this.completed; + } + + private synchronized void setCompleted(final boolean completed) { + this.completed = completed; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java index 7d4c90e31..d80144932 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.collection.plugin.base; import java.io.IOException; -import java.io.InputStream; import java.io.StringWriter; import java.util.Iterator; import java.util.Optional; @@ -52,16 +51,11 @@ public class BaseCollectorPlugin implements CollectorPlugin { throw new CollectorException(e); } - try (InputStream is = this.fs.open(filePath)) { - final Iterator iterator = new BaseCollectorIterator(is); - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); - return StreamSupport.stream(spliterator, false) - .filter(elem -> filterXml(elem, report)) - .map(elem -> xmlToString(report, elem)); - } catch (final Throwable e) { - report.put(e.getClass().getName(), e.getMessage()); - throw new CollectorException(e); - } + final Iterator iterator = new BaseCollectorIterator(this.fs, filePath, report); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + return StreamSupport.stream(spliterator, false) + .filter(elem -> filterXml(elem, report)) + .map(elem -> xmlToString(report, elem)); } private boolean filterXml(final Element elem, final AggregatorReport report) { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java index e4dee8d8c..dfefa082f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -2,27 +2,28 @@ package eu.dnetlib.dhp.collection.plugin.base; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.io.InputStream; -import java.util.Iterator; - import org.dom4j.Element; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; + +@ExtendWith(MockitoExtension.class) public class BaseCollectorIteratorTest { @Test void testImportFile() throws Exception { + long count = 0; - try (final InputStream is = getClass().getResourceAsStream("base-sample.tar")) { - final Iterator iterator = new BaseCollectorIterator(is); - while (iterator.hasNext()) { - final Element record = iterator.next(); - System.out.println(record.asXML()); - count++; - } - } + final BaseCollectorIterator iterator = new BaseCollectorIterator("base-sample.tar", new AggregatorReport()); + while (iterator.hasNext()) { + final Element record = iterator.next(); + // System.out.println(record.asXML()); + count++; + } assertEquals(30000, count); }