diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index f6cba6fb81..9d94000682 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -7,8 +7,6 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; -import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -21,6 +19,8 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.ReporterCallback; import eu.dnetlib.dhp.aggregation.common.ReportingJob; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java index 97f61980aa..f2fa3d2bbd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.collection.plugin.file; import java.io.BufferedInputStream; @@ -9,69 +10,71 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin { - private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); + private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); - public static final String SPLIT_ON_ELEMENT = "splitOnElement"; + public static final String SPLIT_ON_ELEMENT = "splitOnElement"; - private final FileSystem fileSystem; + private final FileSystem fileSystem; - public AbstractSplittedRecordPlugin(FileSystem fileSystem) { - this.fileSystem = fileSystem; - } + public AbstractSplittedRecordPlugin(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } - @Override - public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - // get path to file - final Path filePath = Optional - .ofNullable(api.getBaseUrl()) - .map(Path::new) - .orElseThrow( () -> new CollectorException("missing baseUrl")); + // get path to file + final Path filePath = Optional + .ofNullable(api.getBaseUrl()) + .map(Path::new) + .orElseThrow(() -> new CollectorException("missing baseUrl")); - log.info("baseUrl: {}", filePath); + log.info("baseUrl: {}", filePath); - // check that path to file exists - try { - if (!fileSystem.exists(filePath)) { - throw new CollectorException("path does not exist: " + filePath); - } - } catch (IOException e) { - throw new CollectorException(e); - } + // check that path to file exists + try { + if (!fileSystem.exists(filePath)) { + throw new CollectorException("path does not exist: " + filePath); + } + } catch (IOException e) { + throw new CollectorException(e); + } - // get split element - final String splitOnElement = Optional - .ofNullable(api.getParams().get(SPLIT_ON_ELEMENT)) - .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT))); + // get split element + final String splitOnElement = Optional + .ofNullable(api.getParams().get(SPLIT_ON_ELEMENT)) + .orElseThrow( + () -> new CollectorException(String + .format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT))); - log.info("splitOnElement: {}", splitOnElement); + log.info("splitOnElement: {}", splitOnElement); - final BufferedInputStream bis = getBufferedInputStream(filePath); + final BufferedInputStream bis = getBufferedInputStream(filePath); - Iterator xmlIterator = new XMLIterator(splitOnElement, bis); + Iterator xmlIterator = new XMLIterator(splitOnElement, bis); - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED), - false - ); - } + return StreamSupport + .stream( + Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED), + false); + } - abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException; + abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException; - public FileSystem getFileSystem() { - return fileSystem; - } -} \ No newline at end of file + public FileSystem getFileSystem() { + return fileSystem; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java index d7a992038b..f771def930 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java @@ -1,31 +1,33 @@ + package eu.dnetlib.dhp.collection.plugin.file; -import eu.dnetlib.dhp.common.collection.CollectorException; +import java.io.BufferedInputStream; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; +import eu.dnetlib.dhp.common.collection.CollectorException; public class FileCollectorPlugin extends AbstractSplittedRecordPlugin { - private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class); + private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class); - public FileCollectorPlugin(FileSystem fileSystem) { - super(fileSystem); - } + public FileCollectorPlugin(FileSystem fileSystem) { + super(fileSystem); + } - @Override - protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { + @Override + protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { - log.info("filePath: {}", filePath); + log.info("filePath: {}", filePath); - try { - FileSystem fs = super.getFileSystem(); - return new BufferedInputStream(fs.open(filePath)); - } catch (Exception e) { - throw new CollectorException("Error reading file " + filePath, e); - } - } -} \ No newline at end of file + try { + FileSystem fs = super.getFileSystem(); + return new BufferedInputStream(fs.open(filePath)); + } catch (Exception e) { + throw new CollectorException("Error reading file " + filePath, e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java index b724cfefdd..91a6e9f166 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java @@ -1,33 +1,35 @@ + package eu.dnetlib.dhp.collection.plugin.file; -import eu.dnetlib.dhp.common.collection.CollectorException; +import java.io.BufferedInputStream; +import java.util.zip.GZIPInputStream; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.util.zip.GZIPInputStream; +import eu.dnetlib.dhp.common.collection.CollectorException; public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin { - private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); - public FileGZipCollectorPlugin(FileSystem fileSystem) { - super(fileSystem); - } + public FileGZipCollectorPlugin(FileSystem fileSystem) { + super(fileSystem); + } - @Override - protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { + @Override + protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { - log.info("filePath: {}", filePath); + log.info("filePath: {}", filePath); - try { - FileSystem fs = super.getFileSystem(); - GZIPInputStream stream = new GZIPInputStream(fs.open(filePath)); - return new BufferedInputStream(stream); - } catch (Exception e) { - throw new CollectorException("Error reading file " + filePath, e); - } - } + try { + FileSystem fs = super.getFileSystem(); + GZIPInputStream stream = new GZIPInputStream(fs.open(filePath)); + return new BufferedInputStream(stream); + } catch (Exception e) { + throw new CollectorException("Error reading file " + filePath, e); + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java index 853973ebbb..e05fe263a6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/utils/XMLIterator.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.collection.plugin.utils; import java.io.InputStream; @@ -23,148 +24,154 @@ import org.apache.commons.logging.LogFactory; public class XMLIterator implements Iterator { - private static final Log log = LogFactory.getLog(XMLIterator.class); + private static final Log log = LogFactory.getLog(XMLIterator.class); - private ThreadLocal inputFactory = new ThreadLocal() { + private ThreadLocal inputFactory = new ThreadLocal() { - @Override - protected XMLInputFactory initialValue() { - return XMLInputFactory.newInstance(); - } - }; + @Override + protected XMLInputFactory initialValue() { + return XMLInputFactory.newInstance(); + } + }; - private ThreadLocal outputFactory = new ThreadLocal() { + private ThreadLocal outputFactory = new ThreadLocal() { - @Override - protected XMLOutputFactory initialValue() { - return XMLOutputFactory.newInstance(); - } - }; + @Override + protected XMLOutputFactory initialValue() { + return XMLOutputFactory.newInstance(); + } + }; - private ThreadLocal eventFactory = new ThreadLocal() { + private ThreadLocal eventFactory = new ThreadLocal() { - @Override - protected XMLEventFactory initialValue() { - return XMLEventFactory.newInstance(); - } - }; + @Override + protected XMLEventFactory initialValue() { + return XMLEventFactory.newInstance(); + } + }; - public static final String UTF_8 = "UTF-8"; + public static final String UTF_8 = "UTF-8"; - final XMLEventReader parser; + final XMLEventReader parser; - private XMLEvent current = null; + private XMLEvent current = null; - private String element; + private String element; - private InputStream inputStream; + private InputStream inputStream; - public XMLIterator(final String element, final InputStream inputStream) { - super(); - this.element = element; - this.inputStream = inputStream; - this.parser = getParser(); - try { - this.current = findElement(parser); - } catch (XMLStreamException e) { - log.warn("cannot init parser position. No element found: " + element); - current = null; - } - } + public XMLIterator(final String element, final InputStream inputStream) { + super(); + this.element = element; + this.inputStream = inputStream; + this.parser = getParser(); + try { + this.current = findElement(parser); + } catch (XMLStreamException e) { + log.warn("cannot init parser position. No element found: " + element); + current = null; + } + } - @Override - public boolean hasNext() { - return current != null; - } + @Override + public boolean hasNext() { + return current != null; + } - @Override - public String next() { - String result = null; - try { - result = copy(parser); - current = findElement(parser); - return result; - } catch (XMLStreamException e) { - throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e); - } - } + @Override + public String next() { + String result = null; + try { + result = copy(parser); + current = findElement(parser); + return result; + } catch (XMLStreamException e) { + throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e); + } + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } - @SuppressWarnings("finally") - private String copy(final XMLEventReader parser) throws XMLStreamException { - final StringWriter result = new StringWriter(); - try { - final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result); - final StartElement start = current.asStartElement(); - final StartElement newRecord = eventFactory.get().createStartElement(start.getName(), start.getAttributes(), start.getNamespaces()); + @SuppressWarnings("finally") + private String copy(final XMLEventReader parser) throws XMLStreamException { + final StringWriter result = new StringWriter(); + try { + final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result); + final StartElement start = current.asStartElement(); + final StartElement newRecord = eventFactory + .get() + .createStartElement(start.getName(), start.getAttributes(), start.getNamespaces()); - // new root record - writer.add(newRecord); + // new root record + writer.add(newRecord); - // copy the rest as it is - while (parser.hasNext()) { - final XMLEvent event = parser.nextEvent(); + // copy the rest as it is + while (parser.hasNext()) { + final XMLEvent event = parser.nextEvent(); - // TODO: replace with depth tracking instead of close tag tracking. - if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { - writer.add(event); - break; - } + // TODO: replace with depth tracking instead of close tag tracking. + if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { + writer.add(event); + break; + } - writer.add(event); - } - writer.close(); - } finally { - return result.toString(); - } - } + writer.add(event); + } + writer.close(); + } finally { + return result.toString(); + } + } - /** - * Looks for the next occurrence of the splitter element. - * - * @param parser - * @return - * @throws XMLStreamException - */ - private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException { + /** + * Looks for the next occurrence of the splitter element. + * + * @param parser + * @return + * @throws XMLStreamException + */ + private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException { - /* - * if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; } - */ + /* + * if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; } + */ - XMLEvent peek = parser.peek(); - if (peek != null && peek.isStartElement()) { - String name = peek.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { return peek; } - } + XMLEvent peek = parser.peek(); + if (peek != null && peek.isStartElement()) { + String name = peek.asStartElement().getName().getLocalPart(); + if (element.equals(name)) { + return peek; + } + } - while (parser.hasNext()) { - final XMLEvent event = parser.nextEvent(); - if (event != null && event.isStartElement()) { - String name = event.asStartElement().getName().getLocalPart(); - if (element.equals(name)) { return event; } - } - } - return null; - } + while (parser.hasNext()) { + final XMLEvent event = parser.nextEvent(); + if (event != null && event.isStartElement()) { + String name = event.asStartElement().getName().getLocalPart(); + if (element.equals(name)) { + return event; + } + } + } + return null; + } - private XMLEventReader getParser() { - try { - return inputFactory.get().createXMLEventReader(sanitize(inputStream)); - } catch (XMLStreamException e) { - throw new RuntimeException(e); - } - } + private XMLEventReader getParser() { + try { + return inputFactory.get().createXMLEventReader(sanitize(inputStream)); + } catch (XMLStreamException e) { + throw new RuntimeException(e); + } + } - private Reader sanitize(final InputStream in) { - final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); - charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); - charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE); - return new InputStreamReader(in, charsetDecoder); - } + private Reader sanitize(final InputStream in) { + final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); + charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); + charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + return new InputStreamReader(in, charsetDecoder); + } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java index ed0578059c..6fd101634a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java @@ -1,9 +1,10 @@ + package eu.dnetlib.dhp.collection.plugin.file; -import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -import eu.dnetlib.dhp.common.collection.CollectorException; -import net.bytebuddy.asm.Advice; +import java.io.IOException; +import java.util.HashMap; +import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -13,48 +14,48 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.stream.Stream; +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import net.bytebuddy.asm.Advice; public class FileCollectorPluginTest { - private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); - private final ApiDescriptor api = new ApiDescriptor(); + private final ApiDescriptor api = new ApiDescriptor(); - private FileCollectorPlugin plugin; + private FileCollectorPlugin plugin; - private static final String SPLIT_ON_ELEMENT = "repository"; + private static final String SPLIT_ON_ELEMENT = "repository"; - @BeforeEach - public void setUp() throws IOException { + @BeforeEach + public void setUp() throws IOException { - final String gzipFile = this - .getClass() - .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml") - .getFile(); + final String gzipFile = this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml") + .getFile(); - api.setBaseUrl(gzipFile); + api.setBaseUrl(gzipFile); - HashMap params = new HashMap<>(); - params.put("splitOnElement", SPLIT_ON_ELEMENT); + HashMap params = new HashMap<>(); + params.put("splitOnElement", SPLIT_ON_ELEMENT); - api.setParams(params); + api.setParams(params); - FileSystem fs = FileSystem.get(new Configuration()); - plugin = new FileCollectorPlugin(fs); - } + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileCollectorPlugin(fs); + } - @Test - void test() throws CollectorException { + @Test + void test() throws CollectorException { - final Stream stream = plugin.collect(api, new AggregatorReport()); + final Stream stream = plugin.collect(api, new AggregatorReport()); - stream.limit(10).forEach(s -> { - Assertions.assertTrue(s.length() > 0); - log.info(s); - }); - } + stream.limit(10).forEach(s -> { + Assertions.assertTrue(s.length() > 0); + log.info(s); + }); + } } - diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java index 23f08e6ea9..dc24d6f137 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java @@ -1,8 +1,13 @@ + package eu.dnetlib.dhp.collection.plugin.file; -import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -import eu.dnetlib.dhp.common.collection.CollectorException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Objects; +import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -13,53 +18,51 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.HashMap; -import java.util.Objects; -import java.util.stream.Stream; - +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @ExtendWith(MockitoExtension.class) public class FileGZipCollectorPluginTest { - private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); - private final ApiDescriptor api = new ApiDescriptor(); + private final ApiDescriptor api = new ApiDescriptor(); - private FileGZipCollectorPlugin plugin; + private FileGZipCollectorPlugin plugin; - private static final String SPLIT_ON_ELEMENT = "repository"; + private static final String SPLIT_ON_ELEMENT = "repository"; - @BeforeEach - public void setUp() throws IOException { + @BeforeEach + public void setUp() throws IOException { - final String gzipFile = Objects.requireNonNull(this - .getClass() - .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz")) - .getFile(); + final String gzipFile = Objects + .requireNonNull( + this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz")) + .getFile(); - api.setBaseUrl(gzipFile); + api.setBaseUrl(gzipFile); - HashMap params = new HashMap<>(); - params.put("splitOnElement", SPLIT_ON_ELEMENT); + HashMap params = new HashMap<>(); + params.put("splitOnElement", SPLIT_ON_ELEMENT); - api.setParams(params); + api.setParams(params); - FileSystem fs = FileSystem.get(new Configuration()); - plugin = new FileGZipCollectorPlugin(fs); - } + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileGZipCollectorPlugin(fs); + } - @Test - void test() throws CollectorException { + @Test + void test() throws CollectorException { - final Stream stream = plugin.collect(api, new AggregatorReport()); + final Stream stream = plugin.collect(api, new AggregatorReport()); - stream.limit(10).forEach(s -> { - Assertions.assertTrue(s.length() > 0); - log.info(s); - }); - } + stream.limit(10).forEach(s -> { + Assertions.assertTrue(s.length() > 0); + log.info(s); + }); + } }