diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/DecompressTarGz.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/DecompressTarGz.java new file mode 100644 index 000000000..8bcf14ba4 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/DecompressTarGz.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.common.collection; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class DecompressTarGz { + + public static void doExtract(FileSystem fs, String outputPath, String tarGzPath) throws IOException { + + FSDataInputStream inputFileStream = fs.open(new Path(tarGzPath)); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(inputFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (!entry.isDirectory()) { + try ( + FSDataOutputStream out = fs + .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + + } + + } + } + } + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java index d1861ff0a..6060b619e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -1,19 +1,13 @@ package eu.dnetlib.doiboost.crossref; -import java.io.BufferedOutputStream; -import java.net.URI; -import java.util.zip.GZIPOutputStream; +import static eu.dnetlib.dhp.common.collection.DecompressTarGz.doExtract; + +import java.net.URI; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.mortbay.log.Log; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -33,31 +27,16 @@ public class ExtractCrossrefRecords { final String outputPath = parser.get("outputPath"); final String crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); - Path hdfsreadpath = new Path(workingPath.concat("/").concat(crossrefFileNameTarGz)); Configuration conf = new Configuration(); conf.set("fs.defaultFS", workingPath); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); FileSystem fs = FileSystem.get(URI.create(workingPath), conf); - FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); - try (TarArchiveInputStream tais = new TarArchiveInputStream( - new GzipCompressorInputStream(crossrefFileStream))) { - TarArchiveEntry entry = null; - while ((entry = tais.getNextTarEntry()) != null) { - if (!entry.isDirectory()) { - try ( - FSDataOutputStream out = fs - .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); - GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - IOUtils.copy(tais, gzipOs); + doExtract(fs, outputPath, workingPath.concat("/").concat(crossrefFileNameTarGz)); - } - - } - } - } Log.info("Crossref dump reading completed"); } + }