From b039952d97249b8abe04c0394cf48f83c249bd02 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Fri, 13 Dec 2024 10:43:27 +0100 Subject: [PATCH] bug fixed on zenodo plugin --- .../CollectZenodoDumpCollectorPlugin.java | 36 +++++++++++++------ .../zenodo/ZenodoPluginCollectionTest.java | 10 ++---- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java index 3ea29a9b03..73e6aca3ec 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/zenodo/CollectZenodoDumpCollectorPlugin.java @@ -51,6 +51,9 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin { log.info("Response code is {}", responseCode); if (responseCode >= 200 && responseCode < 400) { IOUtils.copy(response.getEntity().getContent(), fsDataOutputStream); + fsDataOutputStream.flush(); + fsDataOutputStream.hflush(); + fsDataOutputStream.close(); } } catch (Throwable eu) { throw new RuntimeException(eu); @@ -60,16 +63,30 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin { } } + public FileSystem initializeFileSystem(final String hdfsURI) { + try { + return FileSystem.get(getHadoopConfiguration(hdfsURI)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - try { - final String zenodoURL = api.getBaseUrl(); - final String hdfsURI = api.getParams().get("hdfsURI"); - final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsURI)); - downloadItem("zenodoDump.tar.gz", zenodoURL, "/tmp", fileSystem); - CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf()); - Path sourcePath = new Path("/tmp/zenodoDump.tar.gz"); + final String zenodoURL = api.getBaseUrl(); + final String hdfsURI = api.getParams().get("hdfsURI"); + final FileSystem fileSystem = initializeFileSystem(hdfsURI); + return doStream(fileSystem, zenodoURL, "/tmp"); + } + + + public Stream doStream(FileSystem fileSystem, String zenodoURL, String basePath) throws CollectorException { + try { + + downloadItem("zenodoDump.tar.gz", zenodoURL, basePath, fileSystem); + CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf()); + Path sourcePath = new Path(basePath+"/zenodoDump.tar.gz"); CompressionCodec codec = factory.getCodec(sourcePath); InputStream gzipInputStream = null; try { @@ -78,15 +95,14 @@ public class CollectZenodoDumpCollectorPlugin implements CollectorPlugin { } catch (IOException e) { throw new CollectorException(e); - } finally { - log.info("Closing gzip stream"); - org.apache.hadoop.io.IOUtils.closeStream(gzipInputStream); } } catch (Exception e) { throw new CollectorException(e); } } + + private Stream iterateTar(InputStream gzipInputStream) throws Exception { Iterable iterable = () -> new ZenodoTarIterator(gzipInputStream); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java index 9b5cf1850f..e0112914b6 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/zenodo/ZenodoPluginCollectionTest.java @@ -1,20 +1,15 @@ package eu.dnetlib.dhp.collection.plugin.zenodo; -import static org.junit.jupiter.api.Assertions.assertNotNull; - import java.util.zip.GZIPInputStream; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.common.collection.CollectorException; public class ZenodoPluginCollectionTest { + + @Test public void testZenodoIterator() throws Exception { @@ -32,4 +27,5 @@ public class ZenodoPluginCollectionTest { } } + }