1
0
Fork 0

bug fixed on zenodo plugin

This commit is contained in:
sandro.labruzzo 2024-12-13 10:43:27 +01:00
parent 29a2a29666
commit b039952d97
2 changed files with 29 additions and 17 deletions

View File

@ -51,6 +51,9 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin {
log.info("Response code is {}", responseCode);
if (responseCode >= 200 && responseCode < 400) {
IOUtils.copy(response.getEntity().getContent(), fsDataOutputStream);
fsDataOutputStream.flush();
fsDataOutputStream.hflush();
fsDataOutputStream.close();
}
} catch (Throwable eu) {
throw new RuntimeException(eu);
@ -60,16 +63,30 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin {
}
}
public FileSystem initializeFileSystem(final String hdfsURI) {
try {
return FileSystem.get(getHadoopConfiguration(hdfsURI));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
try {
final String zenodoURL = api.getBaseUrl();
final String hdfsURI = api.getParams().get("hdfsURI");
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsURI));
downloadItem("zenodoDump.tar.gz", zenodoURL, "/tmp", fileSystem);
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
final FileSystem fileSystem = initializeFileSystem(hdfsURI);
return doStream(fileSystem, zenodoURL, "/tmp");
}
Path sourcePath = new Path("/tmp/zenodoDump.tar.gz");
public Stream<String> doStream(FileSystem fileSystem, String zenodoURL, String basePath) throws CollectorException {
try {
downloadItem("zenodoDump.tar.gz", zenodoURL, basePath, fileSystem);
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
Path sourcePath = new Path(basePath+"/zenodoDump.tar.gz");
CompressionCodec codec = factory.getCodec(sourcePath);
InputStream gzipInputStream = null;
try {
@ -78,15 +95,14 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin {
} catch (IOException e) {
throw new CollectorException(e);
} finally {
log.info("Closing gzip stream");
org.apache.hadoop.io.IOUtils.closeStream(gzipInputStream);
}
} catch (Exception e) {
throw new CollectorException(e);
}
}
private Stream<String> iterateTar(InputStream gzipInputStream) throws Exception {
Iterable<String> iterable = () -> new ZenodoTarIterator(gzipInputStream);

View File

@ -1,20 +1,15 @@
package eu.dnetlib.dhp.collection.plugin.zenodo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.zip.GZIPInputStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class ZenodoPluginCollectionTest {
@Test
public void testZenodoIterator() throws Exception {
@ -32,4 +27,5 @@ public class ZenodoPluginCollectionTest {
}
}
}