From 623f7be26d53063c88dc3f4946bd1d2cd1648561 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 28 Apr 2022 16:31:11 +0300 Subject: [PATCH] Fix reading files from HDFS in FileCollector & FileGZipCollector plugins --- .../dhp/collection/CollectorWorker.java | 4 +-- .../file/AbstractSplittedRecordPlugin.java | 35 +++++++++++++++---- .../plugin/file/FileCollectorPlugin.java | 18 ++++++---- .../plugin/file/FileGZipCollectorPlugin.java | 21 ++++++----- .../plugin/file/FileCollectorPluginTest.java | 11 ++++-- .../file/FileGZipCollectorPluginTest.java | 29 ++++++++++----- 6 files changed, 86 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index a770af804..f6cba6fb8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -117,9 +117,9 @@ public class CollectorWorker extends ReportingJob { case rest_json2xml: return new RestCollectorPlugin(clientParams); case file: - return new FileCollectorPlugin(); + return new FileCollectorPlugin(fileSystem); case fileGZip: - return new FileGZipCollectorPlugin(); + return new FileGZipCollectorPlugin(fileSystem); case other: final CollectorPlugin.NAME.OTHER_NAME plugin = Optional .ofNullable(api.getParams().get("other_plugin_type")) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java index c569926c0..97f61980a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/AbstractSplittedRecordPlugin.java @@ -1,11 +1,11 @@ package eu.dnetlib.dhp.collection.plugin.file; import java.io.BufferedInputStream; +import java.io.IOException; import java.util.Iterator; import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -15,6 +15,8 @@ import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,22 +26,40 @@ public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin { public static final String SPLIT_ON_ELEMENT = "splitOnElement"; + private final FileSystem fileSystem; + + public AbstractSplittedRecordPlugin(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + @Override public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - final String baseUrl = Optional + // get path to file + final Path filePath = Optional .ofNullable(api.getBaseUrl()) - .orElseThrow( () -> new CollectorException("missing baseUrl, required by the AbstractSplittedRecordPlugin")); + .map(Path::new) + .orElseThrow( () -> new CollectorException("missing baseUrl")); - log.info("baseUrl: {}", baseUrl); + log.info("baseUrl: {}", filePath); + // check that path to file exists + try { + if (!fileSystem.exists(filePath)) { + throw new CollectorException("path does not exist: " + filePath); + } + } catch (IOException e) { + throw new CollectorException(e); + } + + // get split element final String splitOnElement = Optional .ofNullable(api.getParams().get(SPLIT_ON_ELEMENT)) .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT))); log.info("splitOnElement: {}", splitOnElement); - final BufferedInputStream bis = getBufferedInputStream(baseUrl); + final BufferedInputStream bis = getBufferedInputStream(filePath); Iterator xmlIterator = new XMLIterator(splitOnElement, bis); @@ -49,6 +69,9 @@ public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin { ); } - abstract protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException; + abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException; + public FileSystem getFileSystem() { + return fileSystem; + } } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java index 3a268eabb..d7a992038 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPlugin.java @@ -1,25 +1,31 @@ package eu.dnetlib.dhp.collection.plugin.file; import eu.dnetlib.dhp.common.collection.CollectorException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; -import java.io.FileInputStream; public class FileCollectorPlugin extends AbstractSplittedRecordPlugin { private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class); - @Override - protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException { + public FileCollectorPlugin(FileSystem fileSystem) { + super(fileSystem); + } - log.info("baseUrl: {}", baseUrl); + @Override + protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { + + log.info("filePath: {}", filePath); try { - return new BufferedInputStream(new FileInputStream(baseUrl)); + FileSystem fs = super.getFileSystem(); + return new BufferedInputStream(fs.open(filePath)); } catch (Exception e) { - throw new CollectorException("Error reading file " + baseUrl, e); + throw new CollectorException("Error reading file " + filePath, e); } } } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java index b2c77e8b6..b724cfefd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPlugin.java @@ -1,28 +1,33 @@ package eu.dnetlib.dhp.collection.plugin.file; import eu.dnetlib.dhp.common.collection.CollectorException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.net.MalformedURLException; -import java.net.URL; +import java.io.BufferedInputStream; import java.util.zip.GZIPInputStream; public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin { private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); - @Override - protected BufferedInputStream getBufferedInputStream(String baseUrl) throws CollectorException { + public FileGZipCollectorPlugin(FileSystem fileSystem) { + super(fileSystem); + } - log.info("baseUrl: {}", baseUrl); + @Override + protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException { + + log.info("filePath: {}", filePath); try { - GZIPInputStream stream = new GZIPInputStream(new FileInputStream(baseUrl)); + FileSystem fs = super.getFileSystem(); + GZIPInputStream stream = new GZIPInputStream(fs.open(filePath)); return new BufferedInputStream(stream); } catch (Exception e) { - throw new CollectorException("Error reading file " + baseUrl, e); + throw new CollectorException("Error reading file " + filePath, e); } } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java index d9b5152fe..ed0578059 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileCollectorPluginTest.java @@ -3,12 +3,17 @@ package eu.dnetlib.dhp.collection.plugin.file; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; +import net.bytebuddy.asm.Advice; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.stream.Stream; @@ -17,12 +22,13 @@ public class FileCollectorPluginTest { private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); private final ApiDescriptor api = new ApiDescriptor(); + private FileCollectorPlugin plugin; private static final String SPLIT_ON_ELEMENT = "repository"; @BeforeEach - public void setUp() { + public void setUp() throws IOException { final String gzipFile = this .getClass() @@ -36,7 +42,8 @@ public class FileCollectorPluginTest { api.setParams(params); - plugin = new FileCollectorPlugin(); + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileCollectorPlugin(fs); } @Test diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java index 18caf1c45..23f08e6ea 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/file/FileGZipCollectorPluginTest.java @@ -3,30 +3,42 @@ package eu.dnetlib.dhp.collection.plugin.file; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.HashMap; +import java.util.Objects; import java.util.stream.Stream; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@ExtendWith(MockitoExtension.class) public class FileGZipCollectorPluginTest { private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); private final ApiDescriptor api = new ApiDescriptor(); + private FileGZipCollectorPlugin plugin; private static final String SPLIT_ON_ELEMENT = "repository"; @BeforeEach - public void setUp() { + public void setUp() throws IOException { - final String gzipFile = this - .getClass() - .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz") + final String gzipFile = Objects.requireNonNull(this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz")) .getFile(); api.setBaseUrl(gzipFile); @@ -36,7 +48,8 @@ public class FileGZipCollectorPluginTest { api.setParams(params); - plugin = new FileGZipCollectorPlugin(); + FileSystem fs = FileSystem.get(new Configuration()); + plugin = new FileGZipCollectorPlugin(fs); } @Test