From c6db6335b9b58b0e2cb5b23652a84ad89f9101f8 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 6 Feb 2024 15:10:29 +0100 Subject: [PATCH] prepare filtering for base import --- .../plugin/base/BaseCollectorIterator.java | 33 +++++------------ .../plugin/base/BaseCollectorPlugin.java | 35 +++++++++++++++++-- .../base/BaseCollectorIteratorTest.java | 9 +++-- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java index b424d99ac..c1e660eac 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -2,10 +2,8 @@ package eu.dnetlib.dhp.collection.plugin.base; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.StringWriter; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -17,28 +15,21 @@ import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.commons.io.IOUtils; import org.dom4j.Document; import org.dom4j.DocumentHelper; -import org.dom4j.Node; -import org.dom4j.io.OutputFormat; -import org.dom4j.io.XMLWriter; +import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -import eu.dnetlib.dhp.common.collection.CollectorException; - -public class BaseCollectorIterator implements Iterator { +public class BaseCollectorIterator implements Iterator { private final InputStream origInputStream; - private final AggregatorReport report; private boolean completed; - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); - public BaseCollectorIterator(final InputStream origInputStream, final AggregatorReport report) { + public BaseCollectorIterator(final InputStream origInputStream) { this.origInputStream = origInputStream; - this.report = report; this.completed = false; new Thread(this::importFile).start(); @@ -50,7 +41,7 @@ public class BaseCollectorIterator implements Iterator { } @Override - public String next() { + public Element next() { try { return this.queue.take(); } catch (final InterruptedException e) { @@ -61,7 +52,7 @@ public class BaseCollectorIterator implements Iterator { private void importFile() { log.info("I start to read the TAR stream"); - long count = 0; + final long count = 0; try (final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(this.origInputStream)) { TarArchiveEntry entry; @@ -84,16 +75,8 @@ public class BaseCollectorIterator implements Iterator { final Document doc = DocumentHelper.parseText(xml); for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { - try (final StringWriter sw = new StringWriter()) { - final XMLWriter writer = new XMLWriter(sw, OutputFormat.createPrettyPrint()); - writer.write((Node) o); - - this.queue.add(sw.toString()); - - count += 1; - } catch (final IOException e) { - this.report.put(e.getClass().getName(), e.getMessage()); - throw new CollectorException("Error parsing XML record:\n" + ((Node) o).asXML(), e); + if (o instanceof Element) { + this.queue.add((Element) o); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java index 806657e2a..7d4c90e31 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.collection.plugin.base; +import java.io.IOException; import java.io.InputStream; +import java.io.StringWriter; import java.util.Iterator; import java.util.Optional; import java.util.Spliterator; @@ -10,6 +12,9 @@ import java.util.stream.StreamSupport; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.dom4j.Element; +import org.dom4j.io.OutputFormat; +import org.dom4j.io.XMLWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +30,8 @@ public class BaseCollectorPlugin implements CollectorPlugin { private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); + // MAPPING AND FILTERING ARE DEFINED HERE: https://docs.google.com/document/d/1Aj-ZAV11b44MCrAAUCPiS2TUlXb6PnJEu1utCMAcCOU/edit + public BaseCollectorPlugin(final FileSystem fs) { this.fs = fs; } @@ -46,12 +53,34 @@ public class BaseCollectorPlugin implements CollectorPlugin { } try (InputStream is = this.fs.open(filePath)) { - final Iterator iterator = new BaseCollectorIterator(is, new AggregatorReport()); - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); - return StreamSupport.stream(spliterator, false); + final Iterator iterator = new BaseCollectorIterator(is); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + return StreamSupport.stream(spliterator, false) + .filter(elem -> filterXml(elem, report)) + .map(elem -> xmlToString(report, elem)); } catch (final Throwable e) { + report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } + private boolean filterXml(final Element elem, final AggregatorReport report) { + // TODO Auto-generated method stub + + // HERE THE FILTERS ACCORDING TO THE DOCUMENTATION + + return true; + } + + private String xmlToString(final AggregatorReport report, final Element elem) { + try (final StringWriter sw = new StringWriter()) { + final XMLWriter writer = new XMLWriter(sw, OutputFormat.createPrettyPrint()); + writer.write(elem); + return writer.toString(); + } catch (final IOException e) { + report.put(e.getClass().getName(), e.getMessage()); + throw new RuntimeException("Error indenting XML record", e); + } + } + } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java index e0e9815c2..e4dee8d8c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -5,10 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.InputStream; import java.util.Iterator; +import org.dom4j.Element; import org.junit.jupiter.api.Test; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; - public class BaseCollectorIteratorTest { @Test @@ -16,10 +15,10 @@ public class BaseCollectorIteratorTest { long count = 0; try (final InputStream is = getClass().getResourceAsStream("base-sample.tar")) { - final Iterator iterator = new BaseCollectorIterator(is, new AggregatorReport()); + final Iterator iterator = new BaseCollectorIterator(is); while (iterator.hasNext()) { - final String record = iterator.next(); - System.out.println(record); + final Element record = iterator.next(); + System.out.println(record.asXML()); count++; } }