From ca9414b737a1841eacf5cc7cea6c48f065ab3afc Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 21 May 2024 09:11:13 +0200 Subject: [PATCH 1/2] Implement multiple node name splitter on GZipCollectorPlugin and all nodes that use XMLIterator. If the splitter name contains is a comma separated values it splits for all the values --- .../plugin/gzip/GzipCollectorPlugin.java | 16 +++++ .../collection/plugin/utils/XMLIterator.java | 47 ++++++++++--- .../plugin/file/FileGZipMultipleNodeTest.java | 63 ++++++++++++++++++ .../dhp/collection/plugin/file/dblp.gz | Bin 0 -> 1097 bytes 4 files changed, 117 insertions(+), 9 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java new file mode 100644 index 000000000..44b1eeb18 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java @@ -0,0 +1,16 @@ +package eu.dnetlib.dhp.collection.plugin.gzip; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +import java.util.stream.Stream; + +public class GzipCollectorPlugin implements CollectorPlugin { + + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + return Stream.empty(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java index e05fe263a..ca351346c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java @@ -8,7 +8,10 @@ import java.io.StringWriter; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLEventReader; @@ -19,6 +22,7 @@ import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.StartElement; import javax.xml.stream.events.XMLEvent; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,13 +62,22 @@ public class XMLIterator implements Iterator { private String element; + private List elements; + private InputStream inputStream; public XMLIterator(final String element, final InputStream inputStream) { super(); this.element = element; + if (element.contains(",")) { + elements= Arrays.stream(element.split(",")) + .filter(StringUtils::isNoneBlank) + .map(String::toLowerCase) + .collect(Collectors.toList()); + } this.inputStream = inputStream; this.parser = getParser(); + try { this.current = findElement(parser); } catch (XMLStreamException e) { @@ -113,7 +126,7 @@ public class XMLIterator implements Iterator { final XMLEvent event = parser.nextEvent(); // TODO: replace with depth tracking instead of close tag tracking. - if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { + if (event.isEndElement() && isCheckTag(event.asEndElement().getName().getLocalPart())) { writer.add(event); break; } @@ -142,18 +155,16 @@ public class XMLIterator implements Iterator { XMLEvent peek = parser.peek(); if (peek != null && peek.isStartElement()) { String name = peek.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { - return peek; - } + if( isCheckTag(name)) + return peek; } while (parser.hasNext()) { - final XMLEvent event = parser.nextEvent(); + XMLEvent event= parser.nextEvent(); if (event != null && event.isStartElement()) { String name = event.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { - return event; - } + if( isCheckTag(name)) + return event; } } return null; @@ -161,12 +172,30 @@ public class XMLIterator implements Iterator { private XMLEventReader getParser() { try { - return inputFactory.get().createXMLEventReader(sanitize(inputStream)); + XMLInputFactory xif = inputFactory.get(); + xif.setProperty(XMLInputFactory.SUPPORT_DTD, false); + return xif.createXMLEventReader(sanitize(inputStream)); } catch (XMLStreamException e) { throw new RuntimeException(e); } } + private boolean isCheckTag(final String tagName) { + if (elements!= null) { + final String found =elements.stream() + .filter(e -> e.equalsIgnoreCase(tagName)) + .findFirst() + .orElse(null); + if (found!= null) + return true; + } else { + if (element.equalsIgnoreCase(tagName)) { + return true; + } + } + return false; + } + private Reader sanitize(final InputStream in) { final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java new file mode 100644 index 000000000..2ed199156 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipMultipleNodeTest.java @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.collection.plugin.file; + + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Objects; +import java.util.stream.Stream; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@ExtendWith(MockitoExtension.class) +public class FileGZipMultipleNodeTest { + + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); + + private final ApiDescriptor api = new ApiDescriptor(); + + private FileGZipCollectorPlugin plugin; + + private static final String SPLIT_ON_ELEMENT = "incollection,article"; + + @BeforeEach + public void setUp() throws IOException { + + final String gzipFile = Objects + .requireNonNull( + this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/file/dblp.gz")) + .getFile(); + + api.setBaseUrl(gzipFile); + + HashMap params = new HashMap<>(); + params.put("splitOnElement", SPLIT_ON_ELEMENT); + + api.setParams(params); + + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileGZipCollectorPlugin(fs); + } + + @Test + void test() throws CollectorException { + + final Stream stream = plugin.collect(api, new AggregatorReport()); + + stream.limit(10).forEach(s -> { + Assertions.assertTrue(s.length() > 0); + log.info(s); + }); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/file/dblp.gz new file mode 100644 index 0000000000000000000000000000000000000000..979bcbed2845c7c7995db4f1fdc10069f89f1bcb GIT binary patch literal 1097 zcmV-P1h)GhiwFotJWOT)17u=sZ~)DhT~pgQ6hPnmE41#+&h8AhtbCBTYutfADHIAr zQrJH2$hKmu$dX5vv(5hcy|P0?0RyFF1|EVfTUW|CS652-_G=-~g6WDYx#bYgcMy|_ zO1aFooP+6!J01@MmpE^uSK)8FCzILd(>;{NqD0fr)7joJa_o(ln$&?e>~-`C!C%NB ziV!R@5EvCHHEhf2`DEn!BbW3Xbjg-mj%IVFVLXNx+|B_1O$Kr5pt6pIS(eZR9xfB1 zN|ti!w4s~uh8w}6Nv#(wm8`0~5VuY1S4x_(Dk8%m#O>w!(UMUe!E6R0ZZ6kPVx=xu zu~kvlhG}%(Ol<>gqqD2&qWzkuEX3=#*AW zWr~^xQ75*c)8m*7-7$H| zE{8VLo+NTb`-v(aUQ(Pteo0MhJ*PnBL~|w$ZN_@fE2A<^3-re>uM*7+L+5SHKx&Tm z_7DE-^fnrk7win0d%dS>YsfyEwBqs3w#yPIQVV99P$?muI(_svU$zc1{s|q$^Vqmh zGW(%oID@t0&MGdE?b=xh4+;?B?$C;*=!j>zxmTR!6!0{HE{|AKlFcPRX#YAvaV&R7MCk{sxlN*jdH*&%!!>>R`AluC4{DzMnaHaL~bR-LW#Oy(b-|pABK2!w<5m6`y|-E zj`cOb5RuBPhHOXgR4r4$4~y314B&}oW1_;6N@`Kxb^dz>=7+Ud-sb&{>Nx0sPqkae zgS-K@J)!k)@P1E)7!B@VdrFPw$tBwM(7A`~Yn_at7{FJsfZ5kl!{ON$!-EvIdp_q$ zyX8aa+Gmn!K)fw77Hi5AG+kDP75A~8u_TueLs-}z`t)Hsfy3aYztQ0SB)I{LAE3;o zs(`(Uuf{|ZxX|opl6~+)bnnUL&EOYiGt+!QV=lN^ZUB-L(wJTydnYg=9ITpSFr1>U zlDdM@qBLLv1xHjv_OI52ynOa7|CY(aAh;uwTlU*^R#&Fe#Aa}->?EmHMa1J`c82vA zi0zf>Jj8ADB4Pi%Y84Mrk>zF4wWdcE;ZecsW)1v(A=+svAi;D}i!(+ki z*|*G>w)|KZF_fpOQFx>>NZwQ39F7ZaxB4zW%?>P7hCZ@?E5VGh$$Z Date: Tue, 21 May 2024 13:45:29 +0200 Subject: [PATCH 2/2] removed plugin, use only FileGZip plugin --- .../plugin/gzip/GzipCollectorPlugin.java | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java deleted file mode 100644 index 44b1eeb18..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gzip/GzipCollectorPlugin.java +++ /dev/null @@ -1,16 +0,0 @@ -package eu.dnetlib.dhp.collection.plugin.gzip; - -import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -import eu.dnetlib.dhp.common.collection.CollectorException; - -import java.util.stream.Stream; - -public class GzipCollectorPlugin implements CollectorPlugin { - - @Override - public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - return Stream.empty(); - } -}