forked from D-Net/dnet-hadoop
implemented zenodo dump collector plugin
This commit is contained in:
parent
cc6bbbb804
commit
32e2a8b340
|
@ -0,0 +1,96 @@
|
|||
|
||||
package eu.dnetlib.dhp.collection.plugin.zenodo;
|
||||
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
|
||||
public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin {
|
||||
|
||||
final private Logger log = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private void downloadItem(final String name, final String itemURL, final String basePath,
|
||||
final FileSystem fileSystem) {
|
||||
try {
|
||||
final Path hdfsWritePath = new Path(String.format("%s/%s", basePath, name));
|
||||
final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
|
||||
final HttpGet request = new HttpGet(itemURL);
|
||||
final int timeout = 60; // seconds
|
||||
final RequestConfig config = RequestConfig
|
||||
.custom()
|
||||
.setConnectTimeout(timeout * 1000)
|
||||
.setConnectionRequestTimeout(timeout * 1000)
|
||||
.setSocketTimeout(timeout * 1000)
|
||||
.build();
|
||||
log.info("Downloading url {} into {}", itemURL, hdfsWritePath.getName());
|
||||
try (CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
|
||||
CloseableHttpResponse response = client.execute(request)) {
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
log.info("Response code is {}", responseCode);
|
||||
if (responseCode >= 200 && responseCode < 400) {
|
||||
IOUtils.copy(response.getEntity().getContent(), fsDataOutputStream);
|
||||
}
|
||||
} catch (Throwable eu) {
|
||||
throw new RuntimeException(eu);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
||||
try {
|
||||
final String zenodoURL = api.getBaseUrl();
|
||||
final String hdfsURI = api.getParams().get("hdfsURI");
|
||||
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsURI));
|
||||
downloadItem("zenodoDump.tar.gz", zenodoURL, "/tmp", fileSystem);
|
||||
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
|
||||
|
||||
Path sourcePath = new Path("/tmp/zenodoDump.tar.gz");
|
||||
CompressionCodec codec = factory.getCodec(sourcePath);
|
||||
InputStream gzipInputStream = null;
|
||||
try {
|
||||
gzipInputStream = codec.createInputStream(fileSystem.open(sourcePath));
|
||||
return iterateTar(gzipInputStream);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new CollectorException(e);
|
||||
} finally {
|
||||
log.info("Closing gzip stream");
|
||||
org.apache.hadoop.io.IOUtils.closeStream(gzipInputStream);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new CollectorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Stream<String> iterateTar(InputStream gzipInputStream) throws Exception {
|
||||
|
||||
Iterable<String> iterable = () -> new ZenodoTarIterator(gzipInputStream);
|
||||
return StreamSupport.stream(iterable.spliterator(), false);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
|
||||
package eu.dnetlib.dhp.collection.plugin.zenodo;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
public class ZenodoTarIterator implements Iterator<String>, Closeable {
|
||||
|
||||
private final InputStream gzipInputStream;
|
||||
private final StringBuilder currentItem = new StringBuilder();
|
||||
private TarArchiveInputStream tais;
|
||||
private boolean hasNext;
|
||||
|
||||
public ZenodoTarIterator(InputStream gzipInputStream) {
|
||||
this.gzipInputStream = gzipInputStream;
|
||||
tais = new TarArchiveInputStream(gzipInputStream);
|
||||
hasNext = getNextItem();
|
||||
}
|
||||
|
||||
private boolean getNextItem() {
|
||||
try {
|
||||
TarArchiveEntry entry;
|
||||
while ((entry = tais.getNextTarEntry()) != null) {
|
||||
if (entry.isFile()) {
|
||||
currentItem.setLength(0);
|
||||
currentItem.append(IOUtils.toString(new InputStreamReader(tais)));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
final String data = currentItem.toString();
|
||||
hasNext = getNextItem();
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
gzipInputStream.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
|
||||
package eu.dnetlib.dhp.collection.plugin.zenodo;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
|
||||
public class ZenodoPluginCollectionTest {
|
||||
|
||||
@Test
|
||||
public void testZenodoIterator() throws Exception {
|
||||
|
||||
final GZIPInputStream gis = new GZIPInputStream(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz"));
|
||||
try (ZenodoTarIterator it = new ZenodoTarIterator(gis)) {
|
||||
Assertions.assertTrue(it.hasNext());
|
||||
int i = 0;
|
||||
while (it.hasNext()) {
|
||||
Assertions.assertNotNull(it.next());
|
||||
i++;
|
||||
}
|
||||
Assertions.assertEquals(10, i);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Binary file not shown.
Loading…
Reference in New Issue