diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java new file mode 100644 index 000000000..f3846f5f7 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -0,0 +1,65 @@ + +package eu.dnetlib.doiboost.crossref; + +import java.io.BufferedOutputStream; +import java.net.URI; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mortbay.log.Log; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ExtractCrossrefRecords { + public static void main(String[] args) throws Exception { + String hdfsServerUri; + String workingPath; + String crossrefFileNameTarGz; + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ExtractCrossrefRecords.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json"))); + parser.parseArgument(args); + hdfsServerUri = parser.get("hdfsServerUri"); + workingPath = parser.get("workingPath"); + crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); + + Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz)); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); + FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(crossrefFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isDirectory()) { + } else { + try ( + FSDataOutputStream out = fs + .create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + + } + + } + } + } + Log.info("Crossref dump reading completed"); + + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala new file mode 100644 index 000000000..e186c9b8b --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala @@ -0,0 +1,61 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ + +import scala.io.Source + +object GenerateCrossrefDatasetSpark { + + + val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDatasetSpark.getClass) + + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } + + def crossrefElement(meta: String): CrossrefDT = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(meta) + val doi:String = (json \ "DOI").extract[String] + val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] + new CrossrefDT(doi, meta, timestamp) + + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(GenerateCrossrefDatasetSpark.getClass.getSimpleName) + .master(master) + .getOrCreate() + + import spark.implicits._ + val sc: SparkContext = spark.sparkContext + + + sc.wholeTextFiles(sourcePath,2000).flatMap(d =>extractDump(d._2)) + .map(meta => crossrefElement(meta)) + .toDS() + .write.mode(SaveMode.Overwrite).save(targetPath) + + } +}