Fix reading files from HDFS in FileCollector & FileGZipCollector plugins

This commit is contained in:
Serafeim Chatzopoulos 2022-04-28 16:31:11 +03:00
parent 81c4496d32
commit 623f7be26d
6 changed files with 86 additions and 32 deletions

View File

@ -117,9 +117,9 @@ public class CollectorWorker extends ReportingJob {
case rest_json2xml: case rest_json2xml:
return new RestCollectorPlugin(clientParams); return new RestCollectorPlugin(clientParams);
case file: case file:
return new FileCollectorPlugin(); return new FileCollectorPlugin(fileSystem);
case fileGZip: case fileGZip:
return new FileGZipCollectorPlugin(); return new FileGZipCollectorPlugin(fileSystem);
case other: case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(api.getParams().get("other_plugin_type")) .ofNullable(api.getParams().get("other_plugin_type"))

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.collection.plugin.file; package eu.dnetlib.dhp.collection.plugin.file;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@ -15,6 +15,8 @@ import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -24,22 +26,40 @@ public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin {
public static final String SPLIT_ON_ELEMENT = "splitOnElement"; public static final String SPLIT_ON_ELEMENT = "splitOnElement";
private final FileSystem fileSystem;
public AbstractSplittedRecordPlugin(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
@Override @Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
final String baseUrl = Optional // get path to file
final Path filePath = Optional
.ofNullable(api.getBaseUrl()) .ofNullable(api.getBaseUrl())
.orElseThrow( () -> new CollectorException("missing baseUrl, required by the AbstractSplittedRecordPlugin")); .map(Path::new)
.orElseThrow( () -> new CollectorException("missing baseUrl"));
log.info("baseUrl: {}", baseUrl); log.info("baseUrl: {}", filePath);
// check that path to file exists
try {
if (!fileSystem.exists(filePath)) {
throw new CollectorException("path does not exist: " + filePath);
}
} catch (IOException e) {
throw new CollectorException(e);
}
// get split element
final String splitOnElement = Optional final String splitOnElement = Optional
.ofNullable(api.getParams().get(SPLIT_ON_ELEMENT)) .ofNullable(api.getParams().get(SPLIT_ON_ELEMENT))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT))); .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT)));
log.info("splitOnElement: {}", splitOnElement); log.info("splitOnElement: {}", splitOnElement);
final BufferedInputStream bis = getBufferedInputStream(baseUrl); final BufferedInputStream bis = getBufferedInputStream(filePath);
Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis); Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis);
@ -49,6 +69,9 @@ public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin {
); );
} }
abstract protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException; abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException;
public FileSystem getFileSystem() {
return fileSystem;
}
} }

View File

@ -1,25 +1,31 @@
package eu.dnetlib.dhp.collection.plugin.file; package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.FileInputStream;
public class FileCollectorPlugin extends AbstractSplittedRecordPlugin { public class FileCollectorPlugin extends AbstractSplittedRecordPlugin {
private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class); private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class);
@Override public FileCollectorPlugin(FileSystem fileSystem) {
protected BufferedInputStream getBufferedInputStream(final String baseUrl) throws CollectorException { super(fileSystem);
}
log.info("baseUrl: {}", baseUrl); @Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
log.info("filePath: {}", filePath);
try { try {
return new BufferedInputStream(new FileInputStream(baseUrl)); FileSystem fs = super.getFileSystem();
return new BufferedInputStream(fs.open(filePath));
} catch (Exception e) { } catch (Exception e) {
throw new CollectorException("Error reading file " + baseUrl, e); throw new CollectorException("Error reading file " + filePath, e);
} }
} }
} }

View File

@ -1,28 +1,33 @@
package eu.dnetlib.dhp.collection.plugin.file; package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.BufferedInputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin { public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
@Override public FileGZipCollectorPlugin(FileSystem fileSystem) {
protected BufferedInputStream getBufferedInputStream(String baseUrl) throws CollectorException { super(fileSystem);
}
log.info("baseUrl: {}", baseUrl); @Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
log.info("filePath: {}", filePath);
try { try {
GZIPInputStream stream = new GZIPInputStream(new FileInputStream(baseUrl)); FileSystem fs = super.getFileSystem();
GZIPInputStream stream = new GZIPInputStream(fs.open(filePath));
return new BufferedInputStream(stream); return new BufferedInputStream(stream);
} catch (Exception e) { } catch (Exception e) {
throw new CollectorException("Error reading file " + baseUrl, e); throw new CollectorException("Error reading file " + filePath, e);
} }
} }
} }

View File

@ -3,12 +3,17 @@ package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import net.bytebuddy.asm.Advice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -17,12 +22,13 @@ public class FileCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor(); private final ApiDescriptor api = new ApiDescriptor();
private FileCollectorPlugin plugin; private FileCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "repository"; private static final String SPLIT_ON_ELEMENT = "repository";
@BeforeEach @BeforeEach
public void setUp() { public void setUp() throws IOException {
final String gzipFile = this final String gzipFile = this
.getClass() .getClass()
@ -36,7 +42,8 @@ public class FileCollectorPluginTest {
api.setParams(params); api.setParams(params);
plugin = new FileCollectorPlugin(); FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileCollectorPlugin(fs);
} }
@Test @Test

View File

@ -3,30 +3,42 @@ package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import org.junit.jupiter.api.Assertions; import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach; import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test; import org.apache.hadoop.fs.LocalFileSystem;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap; import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Stream; import java.util.stream.Stream;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class FileGZipCollectorPluginTest { public class FileGZipCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor(); private final ApiDescriptor api = new ApiDescriptor();
private FileGZipCollectorPlugin plugin; private FileGZipCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "repository"; private static final String SPLIT_ON_ELEMENT = "repository";
@BeforeEach @BeforeEach
public void setUp() { public void setUp() throws IOException {
final String gzipFile = this final String gzipFile = Objects.requireNonNull(this
.getClass() .getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz") .getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz"))
.getFile(); .getFile();
api.setBaseUrl(gzipFile); api.setBaseUrl(gzipFile);
@ -36,7 +48,8 @@ public class FileGZipCollectorPluginTest {
api.setParams(params); api.setParams(params);
plugin = new FileGZipCollectorPlugin(); FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileGZipCollectorPlugin(fs);
} }
@Test @Test