changed to split in two steps the generation of the crossref dataset

This commit is contained in:
Miriam Baglioni 2021-06-18 09:42:31 +02:00
parent 6aca0d8ebb
commit 464c2ddde3
2 changed files with 67 additions and 20 deletions

View File

@ -1,6 +1,11 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item
import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.json4s import org.json4s
@ -17,13 +22,6 @@ object GenerateCrossrefDataset {
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
def extractDump(input:String):List[String] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val a = (json \ "items").extract[JArray]
a.arr.map(s => compact(render(s)))
}
def crossrefElement(meta: String): CrossrefDT = { def crossrefElement(meta: String): CrossrefDT = {
@ -44,7 +42,7 @@ object GenerateCrossrefDataset {
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf) val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateCrossrefDataset.getClass.getSimpleName) .appName(UnpackCrtossrefEntries.getClass.getSimpleName)
.master(master) .master(master)
.getOrCreate() .getOrCreate()
val sc: SparkContext = spark.sparkContext val sc: SparkContext = spark.sparkContext
@ -52,19 +50,14 @@ object GenerateCrossrefDataset {
import spark.implicits._ import spark.implicits._
def extractDump(input:String):List[String] = { val tmp : RDD[String] = sc.textFile(sourcePath,6000)
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val a = (json \ "items").extract[JArray] spark.createDataset(tmp)
a.arr.map(s => compact(render(s))) .map(entry => crossrefElement(entry))
}
sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
.map(meta => crossrefElement(meta))
.toDS()//.as[CrossrefDT]
.write.mode(SaveMode.Overwrite).save(targetPath) .write.mode(SaveMode.Overwrite).save(targetPath)
// .map(meta => crossrefElement(meta))
// .toDS.as[CrossrefDT]
// .write.mode(SaveMode.Overwrite).save(targetPath)
} }

View File

@ -0,0 +1,54 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object UnpackCrtossrefEntries {
val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass)
def extractDump(input:String):List[String] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val a = (json \ "items").extract[JArray]
a.arr.map(s => compact(render(s)))
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(UnpackCrtossrefEntries.getClass.getSimpleName)
.master(master)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
.saveAsTextFile(targetPath, classOf[GzipCodec])
}
}