From 265bfd364dc5fa803bfaf7a25bb75b7c686f33a8 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 12 Feb 2024 15:35:36 +0100 Subject: [PATCH] refactoing --- .../plugin/base/BaseCollectorIterator.java | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java index 80881acba..1ed1059f7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -73,10 +73,8 @@ public class BaseCollectorIterator implements Iterator { try (InputStream origInputStream = fs.open(filePath); final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { - importTarStream(tarInputStream); + importTarStream(tarInputStream, report); } catch (final Throwable e) { - log.error("Error processing BASE records", e); - report.put(e.getClass().getName(), e.getMessage()); throw new RuntimeException("Error processing BASE records", e); } } @@ -84,48 +82,53 @@ public class BaseCollectorIterator implements Iterator { private void importTestFile(final String resourcePath, final AggregatorReport report) { try (final InputStream origInputStream = BaseCollectorIterator.class.getResourceAsStream(resourcePath); final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { - importTarStream(tarInputStream); + importTarStream(tarInputStream, report); } catch (final Throwable e) { - log.error("Error processing BASE records", e); - report.put(e.getClass().getName(), e.getMessage()); throw new RuntimeException("Error processing BASE records", e); } } - private void importTarStream(final TarArchiveInputStream tarInputStream) throws Exception { - TarArchiveEntry entry; + private void importTarStream(final TarArchiveInputStream tarInputStream, final AggregatorReport report) { long count = 0; - while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { - final String name = entry.getName(); - if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { + try { + TarArchiveEntry entry; + while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { + final String name = entry.getName(); - log.info("Processing file (BZIP): " + name); + if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { - final byte[] bzipData = new byte[(int) entry.getSize()]; - IOUtils.readFully(tarInputStream, bzipData); + log.info("Processing file (BZIP): " + name); - try (InputStream bzipIs = new ByteArrayInputStream(bzipData); - final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); - final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) { + final byte[] bzipData = new byte[(int) entry.getSize()]; + IOUtils.readFully(tarInputStream, bzipData); - final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); + try (InputStream bzipIs = new ByteArrayInputStream(bzipData); + final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); + final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) { - final Document doc = DocumentHelper.parseText(xml); + final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); - for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { - if (o instanceof Element) { - this.queue.put(((Element) o).detach()); - count++; + final Document doc = DocumentHelper.parseText(xml); + + for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { + if (o instanceof Element) { + this.queue.put(((Element) o).detach()); + count++; + } } } } } + + this.queue.put("__END__"); // I ADD A NOT ELEMENT OBJECT TO INDICATE THE END OF THE QUEUE + } catch (final Throwable e) { + log.error("Error processing BASE records", e); + report.put(e.getClass().getName(), e.getMessage()); + throw new RuntimeException("Error processing BASE records", e); + } finally { + log.info("Total records (written in queue): " + count); } - - this.queue.put("__END__"); // I ADD A NOT ELEMENT OBJECT TO INDICATE THE END OF THE QUEUE - - log.info("Total records (written in queue): " + count); } }