forked from D-Net/dnet-hadoop
Merge pull request '7096-fileGZip-collector-plugin' (#211) from 7096-fileGZip-collector-plugin into beta
Reviewed-on: D-Net/dnet-hadoop#211
This commit is contained in:
commit
c76ff6c613
|
@ -7,6 +7,8 @@ import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
|
||||||
|
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
@ -114,6 +116,10 @@ public class CollectorWorker extends ReportingJob {
|
||||||
return new OaiCollectorPlugin(clientParams);
|
return new OaiCollectorPlugin(clientParams);
|
||||||
case rest_json2xml:
|
case rest_json2xml:
|
||||||
return new RestCollectorPlugin(clientParams);
|
return new RestCollectorPlugin(clientParams);
|
||||||
|
case file:
|
||||||
|
return new FileCollectorPlugin(fileSystem);
|
||||||
|
case fileGZip:
|
||||||
|
return new FileGZipCollectorPlugin(fileSystem);
|
||||||
case other:
|
case other:
|
||||||
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
||||||
.ofNullable(api.getParams().get("other_plugin_type"))
|
.ofNullable(api.getParams().get("other_plugin_type"))
|
||||||
|
|
|
@ -10,7 +10,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
public interface CollectorPlugin {
|
public interface CollectorPlugin {
|
||||||
|
|
||||||
enum NAME {
|
enum NAME {
|
||||||
oai, other, rest_json2xml;
|
oai, other, rest_json2xml, file, fileGZip;
|
||||||
|
|
||||||
public enum OTHER_NAME {
|
public enum OTHER_NAME {
|
||||||
mdstore_mongodb_dump, mdstore_mongodb
|
mdstore_mongodb_dump, mdstore_mongodb
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.Spliterators;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||||
|
import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class);
|
||||||
|
|
||||||
|
public static final String SPLIT_ON_ELEMENT = "splitOnElement";
|
||||||
|
|
||||||
|
private final FileSystem fileSystem;
|
||||||
|
|
||||||
|
public AbstractSplittedRecordPlugin(FileSystem fileSystem) {
|
||||||
|
this.fileSystem = fileSystem;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
||||||
|
|
||||||
|
// get path to file
|
||||||
|
final Path filePath = Optional
|
||||||
|
.ofNullable(api.getBaseUrl())
|
||||||
|
.map(Path::new)
|
||||||
|
.orElseThrow( () -> new CollectorException("missing baseUrl"));
|
||||||
|
|
||||||
|
log.info("baseUrl: {}", filePath);
|
||||||
|
|
||||||
|
// check that path to file exists
|
||||||
|
try {
|
||||||
|
if (!fileSystem.exists(filePath)) {
|
||||||
|
throw new CollectorException("path does not exist: " + filePath);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new CollectorException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// get split element
|
||||||
|
final String splitOnElement = Optional
|
||||||
|
.ofNullable(api.getParams().get(SPLIT_ON_ELEMENT))
|
||||||
|
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT)));
|
||||||
|
|
||||||
|
log.info("splitOnElement: {}", splitOnElement);
|
||||||
|
|
||||||
|
final BufferedInputStream bis = getBufferedInputStream(filePath);
|
||||||
|
|
||||||
|
Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis);
|
||||||
|
|
||||||
|
return StreamSupport.stream(
|
||||||
|
Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException;
|
||||||
|
|
||||||
|
public FileSystem getFileSystem() {
|
||||||
|
return fileSystem;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
|
||||||
|
public class FileCollectorPlugin extends AbstractSplittedRecordPlugin {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class);
|
||||||
|
|
||||||
|
public FileCollectorPlugin(FileSystem fileSystem) {
|
||||||
|
super(fileSystem);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
|
||||||
|
|
||||||
|
log.info("filePath: {}", filePath);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileSystem fs = super.getFileSystem();
|
||||||
|
return new BufferedInputStream(fs.open(filePath));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new CollectorException("Error reading file " + filePath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
|
||||||
|
|
||||||
|
public FileGZipCollectorPlugin(FileSystem fileSystem) {
|
||||||
|
super(fileSystem);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
|
||||||
|
|
||||||
|
log.info("filePath: {}", filePath);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileSystem fs = super.getFileSystem();
|
||||||
|
GZIPInputStream stream = new GZIPInputStream(fs.open(filePath));
|
||||||
|
return new BufferedInputStream(stream);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new CollectorException("Error reading file " + filePath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,7 @@ import org.dom4j.io.XMLWriter;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.collection.XmlCleaner;
|
import eu.dnetlib.dhp.collection.plugin.utils.XmlCleaner;
|
||||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
import eu.dnetlib.dhp.common.collection.HttpConnector2;
|
import eu.dnetlib.dhp.common.collection.HttpConnector2;
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.w3c.dom.Node;
|
||||||
import org.w3c.dom.NodeList;
|
import org.w3c.dom.NodeList;
|
||||||
import org.xml.sax.InputSource;
|
import org.xml.sax.InputSource;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.collection.JsonUtils;
|
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
|
||||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.collection;
|
package eu.dnetlib.dhp.collection.plugin.utils;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
|
@ -0,0 +1,170 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.utils;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.Reader;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.CharsetDecoder;
|
||||||
|
import java.nio.charset.CodingErrorAction;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import javax.xml.stream.XMLEventFactory;
|
||||||
|
import javax.xml.stream.XMLEventReader;
|
||||||
|
import javax.xml.stream.XMLEventWriter;
|
||||||
|
import javax.xml.stream.XMLInputFactory;
|
||||||
|
import javax.xml.stream.XMLOutputFactory;
|
||||||
|
import javax.xml.stream.XMLStreamException;
|
||||||
|
import javax.xml.stream.events.StartElement;
|
||||||
|
import javax.xml.stream.events.XMLEvent;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
public class XMLIterator implements Iterator<String> {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(XMLIterator.class);
|
||||||
|
|
||||||
|
private ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected XMLInputFactory initialValue() {
|
||||||
|
return XMLInputFactory.newInstance();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected XMLOutputFactory initialValue() {
|
||||||
|
return XMLOutputFactory.newInstance();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected XMLEventFactory initialValue() {
|
||||||
|
return XMLEventFactory.newInstance();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static final String UTF_8 = "UTF-8";
|
||||||
|
|
||||||
|
final XMLEventReader parser;
|
||||||
|
|
||||||
|
private XMLEvent current = null;
|
||||||
|
|
||||||
|
private String element;
|
||||||
|
|
||||||
|
private InputStream inputStream;
|
||||||
|
|
||||||
|
public XMLIterator(final String element, final InputStream inputStream) {
|
||||||
|
super();
|
||||||
|
this.element = element;
|
||||||
|
this.inputStream = inputStream;
|
||||||
|
this.parser = getParser();
|
||||||
|
try {
|
||||||
|
this.current = findElement(parser);
|
||||||
|
} catch (XMLStreamException e) {
|
||||||
|
log.warn("cannot init parser position. No element found: " + element);
|
||||||
|
current = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return current != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String next() {
|
||||||
|
String result = null;
|
||||||
|
try {
|
||||||
|
result = copy(parser);
|
||||||
|
current = findElement(parser);
|
||||||
|
return result;
|
||||||
|
} catch (XMLStreamException e) {
|
||||||
|
throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("finally")
|
||||||
|
private String copy(final XMLEventReader parser) throws XMLStreamException {
|
||||||
|
final StringWriter result = new StringWriter();
|
||||||
|
try {
|
||||||
|
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result);
|
||||||
|
final StartElement start = current.asStartElement();
|
||||||
|
final StartElement newRecord = eventFactory.get().createStartElement(start.getName(), start.getAttributes(), start.getNamespaces());
|
||||||
|
|
||||||
|
// new root record
|
||||||
|
writer.add(newRecord);
|
||||||
|
|
||||||
|
// copy the rest as it is
|
||||||
|
while (parser.hasNext()) {
|
||||||
|
final XMLEvent event = parser.nextEvent();
|
||||||
|
|
||||||
|
// TODO: replace with depth tracking instead of close tag tracking.
|
||||||
|
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) {
|
||||||
|
writer.add(event);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.add(event);
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
} finally {
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks for the next occurrence of the splitter element.
|
||||||
|
*
|
||||||
|
* @param parser
|
||||||
|
* @return
|
||||||
|
* @throws XMLStreamException
|
||||||
|
*/
|
||||||
|
private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException {
|
||||||
|
|
||||||
|
/*
|
||||||
|
* if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; }
|
||||||
|
*/
|
||||||
|
|
||||||
|
XMLEvent peek = parser.peek();
|
||||||
|
if (peek != null && peek.isStartElement()) {
|
||||||
|
String name = peek.asStartElement().getName().getLocalPart();
|
||||||
|
if (element.equals(name)) { return peek; }
|
||||||
|
}
|
||||||
|
|
||||||
|
while (parser.hasNext()) {
|
||||||
|
final XMLEvent event = parser.nextEvent();
|
||||||
|
if (event != null && event.isStartElement()) {
|
||||||
|
String name = event.asStartElement().getName().getLocalPart();
|
||||||
|
if (element.equals(name)) { return event; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private XMLEventReader getParser() {
|
||||||
|
try {
|
||||||
|
return inputFactory.get().createXMLEventReader(sanitize(inputStream));
|
||||||
|
} catch (XMLStreamException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Reader sanitize(final InputStream in) {
|
||||||
|
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
|
||||||
|
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
|
||||||
|
charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
|
||||||
|
return new InputStreamReader(in, charsetDecoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.collection;
|
package eu.dnetlib.dhp.collection.plugin.utils;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
|
@ -0,0 +1,60 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import net.bytebuddy.asm.Advice;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class FileCollectorPluginTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
|
||||||
|
|
||||||
|
private final ApiDescriptor api = new ApiDescriptor();
|
||||||
|
|
||||||
|
private FileCollectorPlugin plugin;
|
||||||
|
|
||||||
|
private static final String SPLIT_ON_ELEMENT = "repository";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
|
||||||
|
final String gzipFile = this
|
||||||
|
.getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml")
|
||||||
|
.getFile();
|
||||||
|
|
||||||
|
api.setBaseUrl(gzipFile);
|
||||||
|
|
||||||
|
HashMap<String, String> params = new HashMap<>();
|
||||||
|
params.put("splitOnElement", SPLIT_ON_ELEMENT);
|
||||||
|
|
||||||
|
api.setParams(params);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
plugin = new FileCollectorPlugin(fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() throws CollectorException {
|
||||||
|
|
||||||
|
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
|
||||||
|
|
||||||
|
stream.limit(10).forEach(s -> {
|
||||||
|
Assertions.assertTrue(s.length() > 0);
|
||||||
|
log.info(s);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.file;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
|
||||||
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class FileGZipCollectorPluginTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
|
||||||
|
|
||||||
|
private final ApiDescriptor api = new ApiDescriptor();
|
||||||
|
|
||||||
|
private FileGZipCollectorPlugin plugin;
|
||||||
|
|
||||||
|
private static final String SPLIT_ON_ELEMENT = "repository";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
|
||||||
|
final String gzipFile = Objects.requireNonNull(this
|
||||||
|
.getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz"))
|
||||||
|
.getFile();
|
||||||
|
|
||||||
|
api.setBaseUrl(gzipFile);
|
||||||
|
|
||||||
|
HashMap<String, String> params = new HashMap<>();
|
||||||
|
params.put("splitOnElement", SPLIT_ON_ELEMENT);
|
||||||
|
|
||||||
|
api.setParams(params);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
plugin = new FileGZipCollectorPlugin(fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() throws CollectorException {
|
||||||
|
|
||||||
|
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
|
||||||
|
|
||||||
|
stream.limit(10).forEach(s -> {
|
||||||
|
Assertions.assertTrue(s.length() > 0);
|
||||||
|
log.info(s);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Loading…
Reference in New Issue