From bc1bf555076af7e3b7a15c0f292fa056d00b38d5 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 7 Apr 2022 14:31:22 +0300 Subject: [PATCH] Add AbstractSplittedRecordPlugin --- .../dhp/collection/CollectorWorker.java | 2 +- .../file/AbstractSplittedRecordPlugin.java | 54 ++++++ .../plugin/file/FileGZipCollectorPlugin.java | 29 +++ .../fileGZip/FileGZipCollectorPlugin.java | 49 ----- .../collection/plugin/oai/OaiIterator.java | 2 +- .../collection/plugin/rest/RestIterator.java | 2 +- .../{ => plugin/utils}/JsonUtils.java | 2 +- .../collection/plugin/utils/XMLIterator.java | 170 ++++++++++++++++++ .../{ => plugin/utils}/XmlCleaner.java | 2 +- .../FileGZipCollectorPluginTest.java | 7 +- .../{fileGZip => file/gzip}/opendoar.xml.gz | Bin 11 files changed, 260 insertions(+), 59 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{ => plugin/utils}/JsonUtils.java (98%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{ => plugin/utils}/XmlCleaner.java (99%) rename dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/{fileGZip => file}/FileGZipCollectorPluginTest.java (87%) rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/{fileGZip => file/gzip}/opendoar.xml.gz (100%) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 03f592169c..5de4eb84b1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -7,7 +7,7 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import eu.dnetlib.dhp.collection.plugin.fileGZip.FileGZipCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java new file mode 100644 index 0000000000..c569926c08 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.collection.plugin.file; + +import java.io.BufferedInputStream; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin { + + private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); + + public static final String SPLIT_ON_ELEMENT = "splitOnElement"; + + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + + final String baseUrl = Optional + .ofNullable(api.getBaseUrl()) + .orElseThrow( () -> new CollectorException("missing baseUrl, required by the AbstractSplittedRecordPlugin")); + + log.info("baseUrl: {}", baseUrl); + + final String splitOnElement = Optional + .ofNullable(api.getParams().get(SPLIT_ON_ELEMENT)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT))); + + log.info("splitOnElement: {}", splitOnElement); + + final BufferedInputStream bis = getBufferedInputStream(baseUrl); + + Iterator xmlIterator = new XMLIterator(splitOnElement, bis); + + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED), + false + ); + } + + abstract protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException; + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java new file mode 100644 index 0000000000..a4dd65d2af --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.collection.plugin.file; + +import eu.dnetlib.dhp.common.collection.CollectorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.zip.GZIPInputStream; + +public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin { + + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); + + @Override + protected BufferedInputStream getBufferedInputStream(String baseUrl) throws CollectorException { + + log.info("baseUrl: {}", baseUrl); + + try { + GZIPInputStream stream = new GZIPInputStream(new FileInputStream(baseUrl)); + return new BufferedInputStream(stream); + } catch (Exception e) { + e.printStackTrace(); + throw new CollectorException(e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java deleted file mode 100644 index 38f3288904..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java +++ /dev/null @@ -1,49 +0,0 @@ -package eu.dnetlib.dhp.collection.plugin.fileGZip; - -import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -import eu.dnetlib.dhp.common.collection.CollectorException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.net.URL; -import java.util.Optional; -import java.util.stream.Stream; -import java.util.zip.GZIPInputStream; - -public class FileGZipCollectorPlugin implements CollectorPlugin { - - private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); - - public static final String ENCODING = "encoding"; - - @Override - public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - - final String baseUrl = Optional - .ofNullable(api.getBaseUrl()) - .orElseThrow( () -> new CollectorException("missing baseUrl, required by the fileGZip collector plugin")); - - log.info("fileGZip.baseUrl: {}", baseUrl); - - final String encoding = Optional - .ofNullable(api.getParams().get(ENCODING)) - .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the fileGZip collector plugin", ENCODING))); - - log.info("fileGZip.encoding: {}", encoding); - - try { - - InputStream gzipStream = new GZIPInputStream(new FileInputStream(baseUrl)); - Reader decoder = new InputStreamReader(gzipStream, encoding); - BufferedReader reader = new BufferedReader(decoder); - - return reader.lines(); - - } catch (Exception e) { - throw new CollectorException(e); - } - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index 566c6b216c..28b2572fb4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -19,7 +19,7 @@ import org.dom4j.io.XMLWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.dhp.collection.XmlCleaner; +import eu.dnetlib.dhp.collection.plugin.utils.XmlCleaner; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpConnector2; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java index 64a041fd4a..e4bad2f8d6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java @@ -30,7 +30,7 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import eu.dnetlib.dhp.collection.JsonUtils; +import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils; import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpClientParams; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/JsonUtils.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/JsonUtils.java similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/JsonUtils.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/JsonUtils.java index da3768a4ac..15401e2230 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/JsonUtils.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/JsonUtils.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection; +package eu.dnetlib.dhp.collection.plugin.utils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java new file mode 100644 index 0000000000..853973ebbb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java @@ -0,0 +1,170 @@ +package eu.dnetlib.dhp.collection.plugin.utils; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.util.Iterator; + +import javax.xml.stream.XMLEventFactory; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLEventWriter; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class XMLIterator implements Iterator { + + private static final Log log = LogFactory.getLog(XMLIterator.class); + + private ThreadLocal inputFactory = new ThreadLocal() { + + @Override + protected XMLInputFactory initialValue() { + return XMLInputFactory.newInstance(); + } + }; + + private ThreadLocal outputFactory = new ThreadLocal() { + + @Override + protected XMLOutputFactory initialValue() { + return XMLOutputFactory.newInstance(); + } + }; + + private ThreadLocal eventFactory = new ThreadLocal() { + + @Override + protected XMLEventFactory initialValue() { + return XMLEventFactory.newInstance(); + } + }; + + public static final String UTF_8 = "UTF-8"; + + final XMLEventReader parser; + + private XMLEvent current = null; + + private String element; + + private InputStream inputStream; + + public XMLIterator(final String element, final InputStream inputStream) { + super(); + this.element = element; + this.inputStream = inputStream; + this.parser = getParser(); + try { + this.current = findElement(parser); + } catch (XMLStreamException e) { + log.warn("cannot init parser position. No element found: " + element); + current = null; + } + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public String next() { + String result = null; + try { + result = copy(parser); + current = findElement(parser); + return result; + } catch (XMLStreamException e) { + throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("finally") + private String copy(final XMLEventReader parser) throws XMLStreamException { + final StringWriter result = new StringWriter(); + try { + final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result); + final StartElement start = current.asStartElement(); + final StartElement newRecord = eventFactory.get().createStartElement(start.getName(), start.getAttributes(), start.getNamespaces()); + + // new root record + writer.add(newRecord); + + // copy the rest as it is + while (parser.hasNext()) { + final XMLEvent event = parser.nextEvent(); + + // TODO: replace with depth tracking instead of close tag tracking. + if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { + writer.add(event); + break; + } + + writer.add(event); + } + writer.close(); + } finally { + return result.toString(); + } + } + + /** + * Looks for the next occurrence of the splitter element. + * + * @param parser + * @return + * @throws XMLStreamException + */ + private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException { + + /* + * if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; } + */ + + XMLEvent peek = parser.peek(); + if (peek != null && peek.isStartElement()) { + String name = peek.asStartElement().getName().getLocalPart(); + if (element.equals(name)) { return peek; } + } + + while (parser.hasNext()) { + final XMLEvent event = parser.nextEvent(); + if (event != null && event.isStartElement()) { + String name = event.asStartElement().getName().getLocalPart(); + if (element.equals(name)) { return event; } + } + } + return null; + } + + private XMLEventReader getParser() { + try { + return inputFactory.get().createXMLEventReader(sanitize(inputStream)); + } catch (XMLStreamException e) { + throw new RuntimeException(e); + } + } + + private Reader sanitize(final InputStream in) { + final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); + charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); + charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + return new InputStreamReader(in, charsetDecoder); + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XmlCleaner.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XmlCleaner.java index c674031f62..95d1d24029 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XmlCleaner.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection; +package eu.dnetlib.dhp.collection.plugin.utils; import java.util.HashMap; import java.util.HashSet; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java similarity index 87% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java index 793a5b04cd..43f00928f2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.collection.plugin.fileGZip; +package eu.dnetlib.dhp.collection.plugin.file; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; @@ -20,22 +20,19 @@ public class FileGZipCollectorPluginTest { private FileGZipCollectorPlugin plugin; private static final String SPLIT_ON_ELEMENT = "repository"; - private static final String ENCODING = "UTF-8"; @BeforeEach public void setUp() { final String gzipFile = this .getClass() - .getResource("/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz") + .getResource("/eu/dnetlib/dhp/collection/plugin/file/gzip/opendoar.xml.gz") .getFile(); - System.out.println(gzipFile); api.setBaseUrl(gzipFile); HashMap params = new HashMap<>(); params.put("splitOnElement", SPLIT_ON_ELEMENT); - params.put("encoding", ENCODING); api.setParams(params); diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/gzip/opendoar.xml.gz similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/gzip/opendoar.xml.gz