diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java new file mode 100644 index 0000000000..3ea29a9b03 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java @@ -0,0 +1,96 @@ + +package eu.dnetlib.dhp.collection.plugin.zenodo; + +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + +import java.io.IOException; +import java.io.InputStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin { + + final private Logger log = LoggerFactory.getLogger(getClass()); + + private void downloadItem(final String name, final String itemURL, final String basePath, + final FileSystem fileSystem) { + try { + final Path hdfsWritePath = new Path(String.format("%s/%s", basePath, name)); + final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true); + final HttpGet request = new HttpGet(itemURL); + final int timeout = 60; // seconds + final RequestConfig config = RequestConfig + .custom() + .setConnectTimeout(timeout * 1000) + .setConnectionRequestTimeout(timeout * 1000) + .setSocketTimeout(timeout * 1000) + .build(); + log.info("Downloading url {} into {}", itemURL, hdfsWritePath.getName()); + try (CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build(); + CloseableHttpResponse response = client.execute(request)) { + int responseCode = response.getStatusLine().getStatusCode(); + log.info("Response code is {}", responseCode); + if (responseCode >= 200 && responseCode < 400) { + IOUtils.copy(response.getEntity().getContent(), fsDataOutputStream); + } + } catch (Throwable eu) { + throw new RuntimeException(eu); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { + try { + final String zenodoURL = api.getBaseUrl(); + final String hdfsURI = api.getParams().get("hdfsURI"); + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsURI)); + downloadItem("zenodoDump.tar.gz", zenodoURL, "/tmp", fileSystem); + CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf()); + + Path sourcePath = new Path("/tmp/zenodoDump.tar.gz"); + CompressionCodec codec = factory.getCodec(sourcePath); + InputStream gzipInputStream = null; + try { + gzipInputStream = codec.createInputStream(fileSystem.open(sourcePath)); + return iterateTar(gzipInputStream); + + } catch (IOException e) { + throw new CollectorException(e); + } finally { + log.info("Closing gzip stream"); + org.apache.hadoop.io.IOUtils.closeStream(gzipInputStream); + } + } catch (Exception e) { + throw new CollectorException(e); + } + } + + private Stream iterateTar(InputStream gzipInputStream) throws Exception { + + Iterable iterable = () -> new ZenodoTarIterator(gzipInputStream); + return StreamSupport.stream(iterable.spliterator(), false); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoTarIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoTarIterator.java new file mode 100644 index 0000000000..8e627683e6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoTarIterator.java @@ -0,0 +1,59 @@ + +package eu.dnetlib.dhp.collection.plugin.zenodo; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Iterator; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.IOUtils; + +public class ZenodoTarIterator implements Iterator, Closeable { + + private final InputStream gzipInputStream; + private final StringBuilder currentItem = new StringBuilder(); + private TarArchiveInputStream tais; + private boolean hasNext; + + public ZenodoTarIterator(InputStream gzipInputStream) { + this.gzipInputStream = gzipInputStream; + tais = new TarArchiveInputStream(gzipInputStream); + hasNext = getNextItem(); + } + + private boolean getNextItem() { + try { + TarArchiveEntry entry; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isFile()) { + currentItem.setLength(0); + currentItem.append(IOUtils.toString(new InputStreamReader(tais))); + return true; + } + } + return false; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public String next() { + final String data = currentItem.toString(); + hasNext = getNextItem(); + return data; + } + + @Override + public void close() throws IOException { + gzipInputStream.close(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java new file mode 100644 index 0000000000..9b5cf1850f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java @@ -0,0 +1,35 @@ + +package eu.dnetlib.dhp.collection.plugin.zenodo; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.zip.GZIPInputStream; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class ZenodoPluginCollectionTest { + + @Test + public void testZenodoIterator() throws Exception { + + final GZIPInputStream gis = new GZIPInputStream( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz")); + try (ZenodoTarIterator it = new ZenodoTarIterator(gis)) { + Assertions.assertTrue(it.hasNext()); + int i = 0; + while (it.hasNext()) { + Assertions.assertNotNull(it.next()); + i++; + } + Assertions.assertEquals(10, i); + + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz new file mode 100644 index 0000000000..6c06bf4e5a Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz differ