This commit is contained in:
Miriam Baglioni 2021-06-21 09:16:31 +02:00
parent 2740b95f99
commit c07f820c21
1 changed files with 27 additions and 50 deletions

View File

@ -1,11 +1,8 @@
package eu.dnetlib.doiboost.crossref;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedOutputStream;
import java.net.URI;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -17,13 +14,11 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.util.LongAccumulator;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class SparkExtractCrossrefRecords {
public class ExtractCrossrefRecords {
public static void main(String[] args) throws Exception {
String hdfsServerUri;
String workingPath;
@ -31,58 +26,40 @@ public class SparkExtractCrossrefRecords {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkExtractCrossrefRecords.class
ExtractCrossrefRecords.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri");
workingPath = parser.get("workingPath");
crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
SparkConf sparkConf = new SparkConf();
runWithSparkSession(
sparkConf,
isSparkSessionManaged,
spark -> {
LongAccumulator filesCounter = spark
.sparkContext()
.longAccumulator("filesCounter");
Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
} else {
FSDataOutputStream out = fs
.create(new Path(workingPath.concat("filess/").concat(entry.getName())));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out));
try {
byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = tais.read(b)) != -1) {
gzipOs.write(b, 0, numBytes);
}
filesCounter.add(1);
} finally {
IOUtils.closeQuietly(out);
IOUtils.closeQuietly(gzipOs);
}
}
Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
} else {
try (
FSDataOutputStream out = fs
.create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz")));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(tais, gzipOs);
}
}
Log.info("Crossref dump reading completed");
Log.info("Files counter: " + filesCounter.value());
});
}
}
Log.info("Crossref dump reading completed");
}
}