From 93efe4de823fa03d27d9ec363b8df8489e3d9a72 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 13:39:40 +0200 Subject: [PATCH] split the construction of crossref dataset in two parts. This one just unpacks the tar entries --- ....scala => UnpackCrossrefDumpEntries.scala} | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) rename dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/{GenerateCrossrefDatasetSpark.scala => UnpackCrossrefDumpEntries.scala} (60%) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala similarity index 60% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala rename to dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala index e186c9b8b5..4f8189cf36 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala @@ -1,6 +1,7 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -11,12 +12,12 @@ import org.json4s.jackson.JsonMethods._ import scala.io.Source -object GenerateCrossrefDatasetSpark { +object UnpackCrossrefDumpEntries { - val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDatasetSpark.getClass) + val log: Logger = LoggerFactory.getLogger(UnpackCrossrefDumpEntries.getClass) + - implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] def extractDump(input:String):List[String] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -26,14 +27,7 @@ object GenerateCrossrefDatasetSpark { a.arr.map(s => compact(render(s))) } - def crossrefElement(meta: String): CrossrefDT = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(meta) - val doi:String = (json \ "DOI").extract[String] - val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] - new CrossrefDT(doi, meta, timestamp) - } def main(args: Array[String]): Unit = { val conf = new SparkConf @@ -44,18 +38,17 @@ object GenerateCrossrefDatasetSpark { val targetPath = parser.get("targetPath") val spark: SparkSession = SparkSession.builder().config(conf) - .appName(GenerateCrossrefDatasetSpark.getClass.getSimpleName) + .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) .master(master) .getOrCreate() - import spark.implicits._ + val sc: SparkContext = spark.sparkContext - sc.wholeTextFiles(sourcePath,2000).flatMap(d =>extractDump(d._2)) - .map(meta => crossrefElement(meta)) - .toDS() - .write.mode(SaveMode.Overwrite).save(targetPath) + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + .saveAsTextFile(targetPath, classOf[GzipCodec]); + } }