diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java new file mode 100644 index 000000000..44b1eeb18 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java @@ -0,0 +1,16 @@ +package eu.dnetlib.dhp.collection.plugin.gzip; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +import java.util.stream.Stream; + +public class GzipCollectorPlugin implements CollectorPlugin { + + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + return Stream.empty(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java index e05fe263a..ca351346c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java @@ -8,7 +8,10 @@ import java.io.StringWriter; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLEventReader; @@ -19,6 +22,7 @@ import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.StartElement; import javax.xml.stream.events.XMLEvent; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,13 +62,22 @@ public class XMLIterator implements Iterator { private String element; + private List elements; + private InputStream inputStream; public XMLIterator(final String element, final InputStream inputStream) { super(); this.element = element; + if (element.contains(",")) { + elements= Arrays.stream(element.split(",")) + .filter(StringUtils::isNoneBlank) + .map(String::toLowerCase) + .collect(Collectors.toList()); + } this.inputStream = inputStream; this.parser = getParser(); + try { this.current = findElement(parser); } catch (XMLStreamException e) { @@ -113,7 +126,7 @@ public class XMLIterator implements Iterator { final XMLEvent event = parser.nextEvent(); // TODO: replace with depth tracking instead of close tag tracking. - if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { + if (event.isEndElement() && isCheckTag(event.asEndElement().getName().getLocalPart())) { writer.add(event); break; } @@ -142,18 +155,16 @@ public class XMLIterator implements Iterator { XMLEvent peek = parser.peek(); if (peek != null && peek.isStartElement()) { String name = peek.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { - return peek; - } + if( isCheckTag(name)) + return peek; } while (parser.hasNext()) { - final XMLEvent event = parser.nextEvent(); + XMLEvent event= parser.nextEvent(); if (event != null && event.isStartElement()) { String name = event.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { - return event; - } + if( isCheckTag(name)) + return event; } } return null; @@ -161,12 +172,30 @@ public class XMLIterator implements Iterator { private XMLEventReader getParser() { try { - return inputFactory.get().createXMLEventReader(sanitize(inputStream)); + XMLInputFactory xif = inputFactory.get(); + xif.setProperty(XMLInputFactory.SUPPORT_DTD, false); + return xif.createXMLEventReader(sanitize(inputStream)); } catch (XMLStreamException e) { throw new RuntimeException(e); } } + private boolean isCheckTag(final String tagName) { + if (elements!= null) { + final String found =elements.stream() + .filter(e -> e.equalsIgnoreCase(tagName)) + .findFirst() + .orElse(null); + if (found!= null) + return true; + } else { + if (element.equalsIgnoreCase(tagName)) { + return true; + } + } + return false; + } + private Reader sanitize(final InputStream in) { final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java new file mode 100644 index 000000000..2ed199156 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.collection.plugin.file; + + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Objects; +import java.util.stream.Stream; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@ExtendWith(MockitoExtension.class) +public class FileGZipMultipleNodeTest { + + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); + + private final ApiDescriptor api = new ApiDescriptor(); + + private FileGZipCollectorPlugin plugin; + + private static final String SPLIT_ON_ELEMENT = "incollection,article"; + + @BeforeEach + public void setUp() throws IOException { + + final String gzipFile = Objects + .requireNonNull( + this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/file/dblp.gz")) + .getFile(); + + api.setBaseUrl(gzipFile); + + HashMap params = new HashMap<>(); + params.put("splitOnElement", SPLIT_ON_ELEMENT); + + api.setParams(params); + + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileGZipCollectorPlugin(fs); + } + + @Test + void test() throws CollectorException { + + final Stream stream = plugin.collect(api, new AggregatorReport()); + + stream.limit(10).forEach(s -> { + Assertions.assertTrue(s.length() > 0); + log.info(s); + }); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz new file mode 100644 index 000000000..979bcbed2 Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz differ