Merge pull request 'Add Collector Plugin for Zenodo Dumps' (#516) from zenodo_dump_collection into beta

Reviewed-on: #516
This commit is contained in:
Claudio Atzori 2024-12-06 13:51:10 +01:00
commit 5c7f7fb3b8
6 changed files with 194 additions and 1 deletions

View File

@ -7,6 +7,7 @@ import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import eu.dnetlib.dhp.collection.plugin.zenodo.CollectZenodoDumpCollectorPlugin;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@ -129,6 +130,8 @@ public class CollectorWorker extends ReportingJob {
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case zenodoDump:
return new CollectZenodoDumpCollectorPlugin();
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type"))

View File

@ -11,7 +11,7 @@ public interface CollectorPlugin {
enum NAME {
oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications, osfPreprints;
oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications, osfPreprints, zenodoDump;
public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.collection.plugin.zenodo;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin {
final private Logger log = LoggerFactory.getLogger(getClass());
private void downloadItem(final String name, final String itemURL, final String basePath,
final FileSystem fileSystem) {
try {
final Path hdfsWritePath = new Path(String.format("%s/%s", basePath, name));
final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
final HttpGet request = new HttpGet(itemURL);
final int timeout = 60; // seconds
final RequestConfig config = RequestConfig
.custom()
.setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000)
.build();
log.info("Downloading url {} into {}", itemURL, hdfsWritePath.getName());
try (CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
CloseableHttpResponse response = client.execute(request)) {
int responseCode = response.getStatusLine().getStatusCode();
log.info("Response code is {}", responseCode);
if (responseCode >= 200 && responseCode < 400) {
IOUtils.copy(response.getEntity().getContent(), fsDataOutputStream);
}
} catch (Throwable eu) {
throw new RuntimeException(eu);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
try {
final String zenodoURL = api.getBaseUrl();
final String hdfsURI = api.getParams().get("hdfsURI");
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsURI));
downloadItem("zenodoDump.tar.gz", zenodoURL, "/tmp", fileSystem);
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
Path sourcePath = new Path("/tmp/zenodoDump.tar.gz");
CompressionCodec codec = factory.getCodec(sourcePath);
InputStream gzipInputStream = null;
try {
gzipInputStream = codec.createInputStream(fileSystem.open(sourcePath));
return iterateTar(gzipInputStream);
} catch (IOException e) {
throw new CollectorException(e);
} finally {
log.info("Closing gzip stream");
org.apache.hadoop.io.IOUtils.closeStream(gzipInputStream);
}
} catch (Exception e) {
throw new CollectorException(e);
}
}
private Stream<String> iterateTar(InputStream gzipInputStream) throws Exception {
Iterable<String> iterable = () -> new ZenodoTarIterator(gzipInputStream);
return StreamSupport.stream(iterable.spliterator(), false);
}
}

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.collection.plugin.zenodo;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
public class ZenodoTarIterator implements Iterator<String>, Closeable {
private final InputStream gzipInputStream;
private final StringBuilder currentItem = new StringBuilder();
private TarArchiveInputStream tais;
private boolean hasNext;
public ZenodoTarIterator(InputStream gzipInputStream) {
this.gzipInputStream = gzipInputStream;
tais = new TarArchiveInputStream(gzipInputStream);
hasNext = getNextItem();
}
private boolean getNextItem() {
try {
TarArchiveEntry entry;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isFile()) {
currentItem.setLength(0);
currentItem.append(IOUtils.toString(new InputStreamReader(tais)));
return true;
}
}
return false;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public String next() {
final String data = currentItem.toString();
hasNext = getNextItem();
return data;
}
@Override
public void close() throws IOException {
gzipInputStream.close();
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.collection.plugin.zenodo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.zip.GZIPInputStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class ZenodoPluginCollectionTest {
@Test
public void testZenodoIterator() throws Exception {
final GZIPInputStream gis = new GZIPInputStream(
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/zenodo/zenodo.tar.gz"));
try (ZenodoTarIterator it = new ZenodoTarIterator(gis)) {
Assertions.assertTrue(it.hasNext());
int i = 0;
while (it.hasNext()) {
Assertions.assertNotNull(it.next());
i++;
}
Assertions.assertEquals(10, i);
}
}
}