dnet-hadoop/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala

115 lines
3.1 KiB
Scala

package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.{AbstractScalaApplication, ArgumentApplicationParser, SparkScalaApplication}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
class SparkGenerateCrossrefDataset (propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/**
* This method convert the Json crossoref to CrossrefDT class
*
* @param metadata the json metadata
* @return the CrossrefDT
*/
def crossrefElement(metadata: String): CrossrefDT = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(metadata)
val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String])
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
CrossrefDT(doi, metadata, timestamp)
}
def convertDataset(spark:SparkSession, sourcePath:String, targetPath:String):Unit = {
import spark.implicits._
spark.read.text(sourcePath).as[String].map(entry => crossrefElement(entry))
.write
.mode(SaveMode.Overwrite)
.save(targetPath)
}
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath is $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is $targetPath")
convertDataset(spark, sourcePath, targetPath)
}
}
//object GenerateCrossrefDataset {
//
// val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass)
//
// implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
//
//
//
// def main(args: Array[String]): Unit = {
// val conf = new SparkConf
// val parser = new ArgumentApplicationParser(
// Source
// .fromInputStream(
// getClass.getResourceAsStream(
// "/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json"
// )
// )
// .mkString
// )
// parser.parseArgument(args)
// val master = parser.get("master")
// val sourcePath = parser.get("sourcePath")
// val targetPath = parser.get("targetPath")
//
// val spark: SparkSession = SparkSession
// .builder()
// .config(conf)
// .appName(UnpackCrtossrefEntries.getClass.getSimpleName)
// .master(master)
// .getOrCreate()
// val sc: SparkContext = spark.sparkContext
//
// import spark.implicits._
//
// val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
//
// spark
// .createDataset(tmp)
// .map(entry => crossrefElement(entry))
// .write
// .mode(SaveMode.Overwrite)
// .save(targetPath)
// }
//
//}