forked from D-Net/dnet-hadoop
Implement multiple node name splitter on GZipCollectorPlugin and all nodes that use XMLIterator. If the splitter name contains is a comma separated values it splits for all the values
This commit is contained in:
parent
92f018d196
commit
ca9414b737
|
@ -0,0 +1,16 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.gzip;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class GzipCollectorPlugin implements CollectorPlugin {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
||||||
|
return Stream.empty();
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,7 +8,10 @@ import java.io.StringWriter;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.CharsetDecoder;
|
import java.nio.charset.CharsetDecoder;
|
||||||
import java.nio.charset.CodingErrorAction;
|
import java.nio.charset.CodingErrorAction;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.xml.stream.XMLEventFactory;
|
import javax.xml.stream.XMLEventFactory;
|
||||||
import javax.xml.stream.XMLEventReader;
|
import javax.xml.stream.XMLEventReader;
|
||||||
|
@ -19,6 +22,7 @@ import javax.xml.stream.XMLStreamException;
|
||||||
import javax.xml.stream.events.StartElement;
|
import javax.xml.stream.events.StartElement;
|
||||||
import javax.xml.stream.events.XMLEvent;
|
import javax.xml.stream.events.XMLEvent;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -58,13 +62,22 @@ public class XMLIterator implements Iterator<String> {
|
||||||
|
|
||||||
private String element;
|
private String element;
|
||||||
|
|
||||||
|
private List<String> elements;
|
||||||
|
|
||||||
private InputStream inputStream;
|
private InputStream inputStream;
|
||||||
|
|
||||||
public XMLIterator(final String element, final InputStream inputStream) {
|
public XMLIterator(final String element, final InputStream inputStream) {
|
||||||
super();
|
super();
|
||||||
this.element = element;
|
this.element = element;
|
||||||
|
if (element.contains(",")) {
|
||||||
|
elements= Arrays.stream(element.split(","))
|
||||||
|
.filter(StringUtils::isNoneBlank)
|
||||||
|
.map(String::toLowerCase)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
this.inputStream = inputStream;
|
this.inputStream = inputStream;
|
||||||
this.parser = getParser();
|
this.parser = getParser();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.current = findElement(parser);
|
this.current = findElement(parser);
|
||||||
} catch (XMLStreamException e) {
|
} catch (XMLStreamException e) {
|
||||||
|
@ -113,7 +126,7 @@ public class XMLIterator implements Iterator<String> {
|
||||||
final XMLEvent event = parser.nextEvent();
|
final XMLEvent event = parser.nextEvent();
|
||||||
|
|
||||||
// TODO: replace with depth tracking instead of close tag tracking.
|
// TODO: replace with depth tracking instead of close tag tracking.
|
||||||
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) {
|
if (event.isEndElement() && isCheckTag(event.asEndElement().getName().getLocalPart())) {
|
||||||
writer.add(event);
|
writer.add(event);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -142,18 +155,16 @@ public class XMLIterator implements Iterator<String> {
|
||||||
XMLEvent peek = parser.peek();
|
XMLEvent peek = parser.peek();
|
||||||
if (peek != null && peek.isStartElement()) {
|
if (peek != null && peek.isStartElement()) {
|
||||||
String name = peek.asStartElement().getName().getLocalPart();
|
String name = peek.asStartElement().getName().getLocalPart();
|
||||||
if (element.equals(name)) {
|
if( isCheckTag(name))
|
||||||
return peek;
|
return peek;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (parser.hasNext()) {
|
while (parser.hasNext()) {
|
||||||
final XMLEvent event = parser.nextEvent();
|
XMLEvent event= parser.nextEvent();
|
||||||
if (event != null && event.isStartElement()) {
|
if (event != null && event.isStartElement()) {
|
||||||
String name = event.asStartElement().getName().getLocalPart();
|
String name = event.asStartElement().getName().getLocalPart();
|
||||||
if (element.equals(name)) {
|
if( isCheckTag(name))
|
||||||
return event;
|
return event;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -161,12 +172,30 @@ public class XMLIterator implements Iterator<String> {
|
||||||
|
|
||||||
private XMLEventReader getParser() {
|
private XMLEventReader getParser() {
|
||||||
try {
|
try {
|
||||||
return inputFactory.get().createXMLEventReader(sanitize(inputStream));
|
XMLInputFactory xif = inputFactory.get();
|
||||||
|
xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
|
||||||
|
return xif.createXMLEventReader(sanitize(inputStream));
|
||||||
} catch (XMLStreamException e) {
|
} catch (XMLStreamException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isCheckTag(final String tagName) {
|
||||||
|
if (elements!= null) {
|
||||||
|
final String found =elements.stream()
|
||||||
|
.filter(e -> e.equalsIgnoreCase(tagName))
|
||||||
|
.findFirst()
|
||||||
|
.orElse(null);
|
||||||
|
if (found!= null)
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (element.equalsIgnoreCase(tagName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private Reader sanitize(final InputStream in) {
|
private Reader sanitize(final InputStream in) {
|
||||||
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
|
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
|
||||||
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
|
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class FileGZipMultipleNodeTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
|
||||||
|
|
||||||
|
private final ApiDescriptor api = new ApiDescriptor();
|
||||||
|
|
||||||
|
private FileGZipCollectorPlugin plugin;
|
||||||
|
|
||||||
|
private static final String SPLIT_ON_ELEMENT = "incollection,article";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
|
||||||
|
final String gzipFile = Objects
|
||||||
|
.requireNonNull(
|
||||||
|
this
|
||||||
|
.getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/collection/plugin/file/dblp.gz"))
|
||||||
|
.getFile();
|
||||||
|
|
||||||
|
api.setBaseUrl(gzipFile);
|
||||||
|
|
||||||
|
HashMap<String, String> params = new HashMap<>();
|
||||||
|
params.put("splitOnElement", SPLIT_ON_ELEMENT);
|
||||||
|
|
||||||
|
api.setParams(params);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
plugin = new FileGZipCollectorPlugin(fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() throws CollectorException {
|
||||||
|
|
||||||
|
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
|
||||||
|
|
||||||
|
stream.limit(10).forEach(s -> {
|
||||||
|
Assertions.assertTrue(s.length() > 0);
|
||||||
|
log.info(s);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
Loading…
Reference in New Issue