diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java new file mode 100644 index 000000000..b424d99ac --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -0,0 +1,111 @@ +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.io.IOUtils; +import org.dom4j.Document; +import org.dom4j.DocumentHelper; +import org.dom4j.Node; +import org.dom4j.io.OutputFormat; +import org.dom4j.io.XMLWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class BaseCollectorIterator implements Iterator { + + private final InputStream origInputStream; + private final AggregatorReport report; + private boolean completed; + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + + private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); + + public BaseCollectorIterator(final InputStream origInputStream, final AggregatorReport report) { + this.origInputStream = origInputStream; + this.report = report; + this.completed = false; + + new Thread(this::importFile).start(); + } + + @Override + public boolean hasNext() { + return !this.completed || !this.queue.isEmpty(); + } + + @Override + public String next() { + try { + return this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void importFile() { + log.info("I start to read the TAR stream"); + + long count = 0; + try (final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(this.origInputStream)) { + + TarArchiveEntry entry; + while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { + final String name = entry.getName(); + + if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { + + log.info("Processing file (BZIP): " + name); + + final byte[] bzipData = new byte[(int) entry.getSize()]; + IOUtils.readFully(tarInputStream, bzipData); + + try (InputStream bzipIs = new ByteArrayInputStream(bzipData); + final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); + final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) { + + final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); + + final Document doc = DocumentHelper.parseText(xml); + + for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { + try (final StringWriter sw = new StringWriter()) { + final XMLWriter writer = new XMLWriter(sw, OutputFormat.createPrettyPrint()); + writer.write((Node) o); + + this.queue.add(sw.toString()); + + count += 1; + } catch (final IOException e) { + this.report.put(e.getClass().getName(), e.getMessage()); + throw new CollectorException("Error parsing XML record:\n" + ((Node) o).asXML(), e); + } + } + } + } + } + } catch (final Throwable e) { + log.error("Error processing BASE records", e); + throw new RuntimeException("Error processing BASE records", e); + } finally { + this.completed = true; + log.info("Total records (written in queue): " + count); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java new file mode 100644 index 000000000..806657e2a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.InputStream; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.file.AbstractSplittedRecordPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class BaseCollectorPlugin implements CollectorPlugin { + + private final FileSystem fs; + + private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); + + public BaseCollectorPlugin(final FileSystem fs) { + this.fs = fs; + } + + @Override + public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { + // get path to file + final Path filePath = Optional + .ofNullable(api.getBaseUrl()) + .map(Path::new) + .orElseThrow(() -> new CollectorException("missing baseUrl")); + + log.info("baseUrl: {}", filePath); + + try { + if (!this.fs.exists(filePath)) { throw new CollectorException("path does not exist: " + filePath); } + } catch (final Throwable e) { + throw new CollectorException(e); + } + + try (InputStream is = this.fs.open(filePath)) { + final Iterator iterator = new BaseCollectorIterator(is, new AggregatorReport()); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + return StreamSupport.stream(spliterator, false); + } catch (final Throwable e) { + throw new CollectorException(e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java new file mode 100644 index 000000000..e0e9815c2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -0,0 +1,30 @@ +package eu.dnetlib.dhp.collection.plugin.base; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.InputStream; +import java.util.Iterator; + +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; + +public class BaseCollectorIteratorTest { + + @Test + void testImportFile() throws Exception { + long count = 0; + + try (final InputStream is = getClass().getResourceAsStream("base-sample.tar")) { + final Iterator iterator = new BaseCollectorIterator(is, new AggregatorReport()); + while (iterator.hasNext()) { + final String record = iterator.next(); + System.out.println(record); + count++; + } + } + + assertEquals(30000, count); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar new file mode 100644 index 000000000..c575fe147 Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar differ