From 464c2ddde363a2f4f0a3ebdbf58cef27486281fd Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:42:31 +0200 Subject: [PATCH] changed to split in two steps the generation of the crossref dataset --- .../crossref/GenerateCrossrefDataset.scala | 33 +++++------- .../crossref/UnpackCrtossrefEntries.scala | 54 +++++++++++++++++++ 2 files changed, 67 insertions(+), 20 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index e48f68a7f7..b11e2d8de0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -1,6 +1,11 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item +import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s @@ -17,13 +22,6 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } def crossrefElement(meta: String): CrossrefDT = { @@ -44,7 +42,7 @@ object GenerateCrossrefDataset { val targetPath = parser.get("targetPath") val spark: SparkSession = SparkSession.builder().config(conf) - .appName(GenerateCrossrefDataset.getClass.getSimpleName) + .appName(UnpackCrtossrefEntries.getClass.getSimpleName) .master(master) .getOrCreate() val sc: SparkContext = spark.sparkContext @@ -52,19 +50,14 @@ object GenerateCrossrefDataset { import spark.implicits._ - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) + val tmp : RDD[String] = sc.textFile(sourcePath,6000) - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } - - - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) - .map(meta => crossrefElement(meta)) - .toDS()//.as[CrossrefDT] - .write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(tmp) + .map(entry => crossrefElement(entry)) + .write.mode(SaveMode.Overwrite).save(targetPath) +// .map(meta => crossrefElement(meta)) +// .toDS.as[CrossrefDT] +// .write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala new file mode 100644 index 0000000000..95ecb568bd --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala @@ -0,0 +1,54 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JArray +import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object UnpackCrtossrefEntries { + + val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass) + + + + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + + + } + + + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(UnpackCrtossrefEntries.getClass.getSimpleName) + .master(master) + .getOrCreate() + val sc: SparkContext = spark.sparkContext + + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + .saveAsTextFile(targetPath, classOf[GzipCodec]) + + + } + +}