Add AbstractSplittedRecordPlugin

This commit is contained in:
Serafeim Chatzopoulos 2022-04-07 14:31:22 +03:00
parent e612489670
commit bc1bf55507
11 changed files with 260 additions and 59 deletions

View File

@ -7,7 +7,7 @@ import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import eu.dnetlib.dhp.collection.plugin.fileGZip.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.collection.plugin.file;
import java.io.BufferedInputStream;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin {
private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class);
public static final String SPLIT_ON_ELEMENT = "splitOnElement";
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
final String baseUrl = Optional
.ofNullable(api.getBaseUrl())
.orElseThrow( () -> new CollectorException("missing baseUrl, required by the AbstractSplittedRecordPlugin"));
log.info("baseUrl: {}", baseUrl);
final String splitOnElement = Optional
.ofNullable(api.getParams().get(SPLIT_ON_ELEMENT))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT)));
log.info("splitOnElement: {}", splitOnElement);
final BufferedInputStream bis = getBufferedInputStream(baseUrl);
Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis);
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED),
false
);
}
abstract protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException;
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.zip.GZIPInputStream;
public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
@Override
protected BufferedInputStream getBufferedInputStream(String baseUrl) throws CollectorException {
log.info("baseUrl: {}", baseUrl);
try {
GZIPInputStream stream = new GZIPInputStream(new FileInputStream(baseUrl));
return new BufferedInputStream(stream);
} catch (Exception e) {
e.printStackTrace();
throw new CollectorException(e);
}
}
}

View File

@ -1,49 +0,0 @@
package eu.dnetlib.dhp.collection.plugin.fileGZip;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URL;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
public class FileGZipCollectorPlugin implements CollectorPlugin {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
public static final String ENCODING = "encoding";
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
final String baseUrl = Optional
.ofNullable(api.getBaseUrl())
.orElseThrow( () -> new CollectorException("missing baseUrl, required by the fileGZip collector plugin"));
log.info("fileGZip.baseUrl: {}", baseUrl);
final String encoding = Optional
.ofNullable(api.getParams().get(ENCODING))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the fileGZip collector plugin", ENCODING)));
log.info("fileGZip.encoding: {}", encoding);
try {
InputStream gzipStream = new GZIPInputStream(new FileInputStream(baseUrl));
Reader decoder = new InputStreamReader(gzipStream, encoding);
BufferedReader reader = new BufferedReader(decoder);
return reader.lines();
} catch (Exception e) {
throw new CollectorException(e);
}
}
}

View File

@ -19,7 +19,7 @@ import org.dom4j.io.XMLWriter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.XmlCleaner; import eu.dnetlib.dhp.collection.plugin.utils.XmlCleaner;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpConnector2; import eu.dnetlib.dhp.common.collection.HttpConnector2;

View File

@ -30,7 +30,7 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList; import org.w3c.dom.NodeList;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import eu.dnetlib.dhp.collection.JsonUtils; import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams; import eu.dnetlib.dhp.common.collection.HttpClientParams;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection.plugin.utils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -0,0 +1,170 @@
package eu.dnetlib.dhp.collection.plugin.utils;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.Iterator;
import javax.xml.stream.XMLEventFactory;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class XMLIterator implements Iterator<String> {
private static final Log log = LogFactory.getLog(XMLIterator.class);
private ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
@Override
protected XMLInputFactory initialValue() {
return XMLInputFactory.newInstance();
}
};
private ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
@Override
protected XMLOutputFactory initialValue() {
return XMLOutputFactory.newInstance();
}
};
private ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
@Override
protected XMLEventFactory initialValue() {
return XMLEventFactory.newInstance();
}
};
public static final String UTF_8 = "UTF-8";
final XMLEventReader parser;
private XMLEvent current = null;
private String element;
private InputStream inputStream;
public XMLIterator(final String element, final InputStream inputStream) {
super();
this.element = element;
this.inputStream = inputStream;
this.parser = getParser();
try {
this.current = findElement(parser);
} catch (XMLStreamException e) {
log.warn("cannot init parser position. No element found: " + element);
current = null;
}
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public String next() {
String result = null;
try {
result = copy(parser);
current = findElement(parser);
return result;
} catch (XMLStreamException e) {
throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("finally")
private String copy(final XMLEventReader parser) throws XMLStreamException {
final StringWriter result = new StringWriter();
try {
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result);
final StartElement start = current.asStartElement();
final StartElement newRecord = eventFactory.get().createStartElement(start.getName(), start.getAttributes(), start.getNamespaces());
// new root record
writer.add(newRecord);
// copy the rest as it is
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking.
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) {
writer.add(event);
break;
}
writer.add(event);
}
writer.close();
} finally {
return result.toString();
}
}
/**
* Looks for the next occurrence of the splitter element.
*
* @param parser
* @return
* @throws XMLStreamException
*/
private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException {
/*
* if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; }
*/
XMLEvent peek = parser.peek();
if (peek != null && peek.isStartElement()) {
String name = peek.asStartElement().getName().getLocalPart();
if (element.equals(name)) { return peek; }
}
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
if (event != null && event.isStartElement()) {
String name = event.asStartElement().getName().getLocalPart();
if (element.equals(name)) { return event; }
}
}
return null;
}
private XMLEventReader getParser() {
try {
return inputFactory.get().createXMLEventReader(sanitize(inputStream));
} catch (XMLStreamException e) {
throw new RuntimeException(e);
}
}
private Reader sanitize(final InputStream in) {
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
return new InputStreamReader(in, charsetDecoder);
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection.plugin.utils;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.fileGZip; package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
@ -20,22 +20,19 @@ public class FileGZipCollectorPluginTest {
private FileGZipCollectorPlugin plugin; private FileGZipCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "repository"; private static final String SPLIT_ON_ELEMENT = "repository";
private static final String ENCODING = "UTF-8";
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
final String gzipFile = this final String gzipFile = this
.getClass() .getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz") .getResource("/eu/dnetlib/dhp/collection/plugin/file/gzip/opendoar.xml.gz")
.getFile(); .getFile();
System.out.println(gzipFile);
api.setBaseUrl(gzipFile); api.setBaseUrl(gzipFile);
HashMap<String, String> params = new HashMap<>(); HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT); params.put("splitOnElement", SPLIT_ON_ELEMENT);
params.put("encoding", ENCODING);
api.setParams(params); api.setParams(params);