From d6e21bb6ea91cf6912bd41c2442d06a69307963d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 17:27:19 +0200 Subject: [PATCH] creates the crossref dataset used for doiboost together with unpacking part from tar --- .../crossref/GenerateCrossrefDataset.scala | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index 3b60a9095..9d17b5162 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -1,13 +1,12 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.doiboost.crossref.UnpackCrossrefDumpEntries.getClass -import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats -import org.json4s.jackson.JsonMethods.parse +import org.json4s.JsonAST.JArray +import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -18,6 +17,13 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } def crossrefElement(meta: String): CrossrefDT = { @@ -25,7 +31,7 @@ object GenerateCrossrefDataset { lazy val json: json4s.JValue = parse(meta) val doi:String = (json \ "DOI").extract[String] val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] - new CrossrefDT(doi, meta, timestamp) + CrossrefDT(doi, meta, timestamp) } @@ -45,9 +51,20 @@ object GenerateCrossrefDataset { import spark.implicits._ - sc.textFile(sourcePath,6000) + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } + + + // sc.textFile(sourcePath,6000) + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) .map(meta => crossrefElement(meta)) - .toDS() + .toDS()//.as[CrossrefDT] .write.mode(SaveMode.Overwrite).save(targetPath) }