[HostedByMap - DOIBoost] changed to use code moved to common since used also from hostedbymap now

This commit is contained in:
Miriam Baglioni 2022-03-04 11:05:23 +01:00
parent 8a41f63348
commit b7c2340952
2 changed files with 45 additions and 26 deletions

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.common.collection;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DecompressTarGz {
public static void doExtract(FileSystem fs, String outputPath, String tarGzPath) throws IOException {
FSDataInputStream inputFileStream = fs.open(new Path(tarGzPath));
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(inputFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (!entry.isDirectory()) {
try (
FSDataOutputStream out = fs
.create(new Path(outputPath.concat(entry.getName()).concat(".gz")));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(tais, gzipOs);
}
}
}
}
}
}

View File

@ -1,19 +1,13 @@
package eu.dnetlib.doiboost.crossref;
import java.io.BufferedOutputStream;
import java.net.URI;
import java.util.zip.GZIPOutputStream;
import static eu.dnetlib.dhp.common.collection.DecompressTarGz.doExtract;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -33,31 +27,16 @@ public class ExtractCrossrefRecords {
final String outputPath = parser.get("outputPath");
final String crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
Path hdfsreadpath = new Path(workingPath.concat("/").concat(crossrefFileNameTarGz));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", workingPath);
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(workingPath), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (!entry.isDirectory()) {
try (
FSDataOutputStream out = fs
.create(new Path(outputPath.concat(entry.getName()).concat(".gz")));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(tais, gzipOs);
doExtract(fs, outputPath, workingPath.concat("/").concat(crossrefFileNameTarGz));
}
}
}
}
Log.info("Crossref dump reading completed");
}
}