diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java index 5993867005..f3846f5f76 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -1,11 +1,8 @@ package eu.dnetlib.doiboost.crossref; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - import java.io.BufferedOutputStream; import java.net.URI; -import java.util.Optional; import java.util.zip.GZIPOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -17,13 +14,11 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; -import org.apache.spark.util.LongAccumulator; import org.mortbay.log.Log; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -public class SparkExtractCrossrefRecords { +public class ExtractCrossrefRecords { public static void main(String[] args) throws Exception { String hdfsServerUri; String workingPath; @@ -31,58 +26,40 @@ public class SparkExtractCrossrefRecords { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkExtractCrossrefRecords.class + ExtractCrossrefRecords.class .getResourceAsStream( "/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json"))); parser.parseArgument(args); hdfsServerUri = parser.get("hdfsServerUri"); workingPath = parser.get("workingPath"); crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - SparkConf sparkConf = new SparkConf(); - runWithSparkSession( - sparkConf, - isSparkSessionManaged, - spark -> { - LongAccumulator filesCounter = spark - .sparkContext() - .longAccumulator("filesCounter"); - Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz)); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); - FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); - try (TarArchiveInputStream tais = new TarArchiveInputStream( - new GzipCompressorInputStream(crossrefFileStream))) { - TarArchiveEntry entry = null; - while ((entry = tais.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - } else { - FSDataOutputStream out = fs - .create(new Path(workingPath.concat("filess/").concat(entry.getName()))); - GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out)); - try { - byte[] b = new byte[1024]; - int numBytes = 0; - while ((numBytes = tais.read(b)) != -1) { - gzipOs.write(b, 0, numBytes); - } - filesCounter.add(1); - } finally { - IOUtils.closeQuietly(out); - IOUtils.closeQuietly(gzipOs); - } - } + Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz)); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); + FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(crossrefFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isDirectory()) { + } else { + try ( + FSDataOutputStream out = fs + .create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + } + } - Log.info("Crossref dump reading completed"); - Log.info("Files counter: " + filesCounter.value()); - }); + } + } + Log.info("Crossref dump reading completed"); + } }