diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 2ea3f35ccb..03f592169c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import eu.dnetlib.dhp.collection.plugin.fileGZip.FileGZipCollectorPlugin; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -114,6 +115,8 @@ public class CollectorWorker extends ReportingJob { return new OaiCollectorPlugin(clientParams); case rest_json2xml: return new RestCollectorPlugin(clientParams); + case fileGZip: + return new FileGZipCollectorPlugin(); case other: final CollectorPlugin.NAME.OTHER_NAME plugin = Optional .ofNullable(api.getParams().get("other_plugin_type")) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index 841d42fea8..7d6aaaff86 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -10,7 +10,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException; public interface CollectorPlugin { enum NAME { - oai, other, rest_json2xml; + oai, other, rest_json2xml, fileGZip; public enum OTHER_NAME { mdstore_mongodb_dump, mdstore_mongodb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java new file mode 100644 index 0000000000..38f3288904 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPlugin.java @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.collection.plugin.fileGZip; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URL; +import java.util.Optional; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; + +public class FileGZipCollectorPlugin implements CollectorPlugin { + + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class); + + public static final String ENCODING = "encoding"; + + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + + final String baseUrl = Optional + .ofNullable(api.getBaseUrl()) + .orElseThrow( () -> new CollectorException("missing baseUrl, required by the fileGZip collector plugin")); + + log.info("fileGZip.baseUrl: {}", baseUrl); + + final String encoding = Optional + .ofNullable(api.getParams().get(ENCODING)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the fileGZip collector plugin", ENCODING))); + + log.info("fileGZip.encoding: {}", encoding); + + try { + + InputStream gzipStream = new GZIPInputStream(new FileInputStream(baseUrl)); + Reader decoder = new InputStreamReader(gzipStream, encoding); + BufferedReader reader = new BufferedReader(decoder); + + return reader.lines(); + + } catch (Exception e) { + throw new CollectorException(e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java new file mode 100644 index 0000000000..793a5b04cd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/fileGZip/FileGZipCollectorPluginTest.java @@ -0,0 +1,55 @@ +package eu.dnetlib.dhp.collection.plugin.fileGZip; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.stream.Stream; + +public class FileGZipCollectorPluginTest { + + private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class); + + private final ApiDescriptor api = new ApiDescriptor(); + private FileGZipCollectorPlugin plugin; + + private static final String SPLIT_ON_ELEMENT = "repository"; + private static final String ENCODING = "UTF-8"; + + @BeforeEach + public void setUp() { + + final String gzipFile = this + .getClass() + .getResource("/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz") + .getFile(); + + System.out.println(gzipFile); + api.setBaseUrl(gzipFile); + + HashMap params = new HashMap<>(); + params.put("splitOnElement", SPLIT_ON_ELEMENT); + params.put("encoding", ENCODING); + + api.setParams(params); + + plugin = new FileGZipCollectorPlugin(); + } + + @Test + void test() throws CollectorException { + + final Stream stream = plugin.collect(api, new AggregatorReport()); + + stream.limit(10).forEach(s -> { + Assertions.assertTrue(s.length() > 0); + log.info(s); + }); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz new file mode 100644 index 0000000000..f783b69e7b Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/fileGZip/opendoar.xml.gz differ