Merge pull request 'Updated XMLIterator for splitting on different nodes' (#436) from dblp_collection_plugin into beta

Reviewed-on: D-Net/dnet-hadoop#436
This commit is contained in:
Alessia Bardi 2024-05-29 16:05:06 +02:00
commit eadfd8d71d
3 changed files with 101 additions and 9 deletions

View File

@ -8,7 +8,10 @@ import java.io.StringWriter;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLEventFactory;
import javax.xml.stream.XMLEventReader; import javax.xml.stream.XMLEventReader;
@ -19,6 +22,7 @@ import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.StartElement; import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent; import javax.xml.stream.events.XMLEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -58,13 +62,22 @@ public class XMLIterator implements Iterator<String> {
private String element; private String element;
private List<String> elements;
private InputStream inputStream; private InputStream inputStream;
public XMLIterator(final String element, final InputStream inputStream) { public XMLIterator(final String element, final InputStream inputStream) {
super(); super();
this.element = element; this.element = element;
if (element.contains(",")) {
elements= Arrays.stream(element.split(","))
.filter(StringUtils::isNoneBlank)
.map(String::toLowerCase)
.collect(Collectors.toList());
}
this.inputStream = inputStream; this.inputStream = inputStream;
this.parser = getParser(); this.parser = getParser();
try { try {
this.current = findElement(parser); this.current = findElement(parser);
} catch (XMLStreamException e) { } catch (XMLStreamException e) {
@ -113,7 +126,7 @@ public class XMLIterator implements Iterator<String> {
final XMLEvent event = parser.nextEvent(); final XMLEvent event = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking. // TODO: replace with depth tracking instead of close tag tracking.
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { if (event.isEndElement() && isCheckTag(event.asEndElement().getName().getLocalPart())) {
writer.add(event); writer.add(event);
break; break;
} }
@ -142,18 +155,16 @@ public class XMLIterator implements Iterator<String> {
XMLEvent peek = parser.peek(); XMLEvent peek = parser.peek();
if (peek != null && peek.isStartElement()) { if (peek != null && peek.isStartElement()) {
String name = peek.asStartElement().getName().getLocalPart(); String name = peek.asStartElement().getName().getLocalPart();
if (element.equals(name)) { if( isCheckTag(name))
return peek; return peek;
}
} }
while (parser.hasNext()) { while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent(); XMLEvent event= parser.nextEvent();
if (event != null && event.isStartElement()) { if (event != null && event.isStartElement()) {
String name = event.asStartElement().getName().getLocalPart(); String name = event.asStartElement().getName().getLocalPart();
if (element.equals(name)) { if( isCheckTag(name))
return event; return event;
}
} }
} }
return null; return null;
@ -161,12 +172,30 @@ public class XMLIterator implements Iterator<String> {
private XMLEventReader getParser() { private XMLEventReader getParser() {
try { try {
return inputFactory.get().createXMLEventReader(sanitize(inputStream)); XMLInputFactory xif = inputFactory.get();
xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
return xif.createXMLEventReader(sanitize(inputStream));
} catch (XMLStreamException e) { } catch (XMLStreamException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private boolean isCheckTag(final String tagName) {
if (elements!= null) {
final String found =elements.stream()
.filter(e -> e.equalsIgnoreCase(tagName))
.findFirst()
.orElse(null);
if (found!= null)
return true;
} else {
if (element.equalsIgnoreCase(tagName)) {
return true;
}
}
return false;
}
private Reader sanitize(final InputStream in) { private Reader sanitize(final InputStream in) {
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Stream;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class FileGZipMultipleNodeTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor();
private FileGZipCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "incollection,article";
@BeforeEach
public void setUp() throws IOException {
final String gzipFile = Objects
.requireNonNull(
this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/dblp.gz"))
.getFile();
api.setBaseUrl(gzipFile);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
api.setParams(params);
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileGZipCollectorPlugin(fs);
}
@Test
void test() throws CollectorException {
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
}