100 lines
2.9 KiB
Scala
100 lines
2.9 KiB
Scala
package eu.dnetlib.doiboost.crossref
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
import org.apache.commons.io.IOUtils
|
|
import org.apache.hadoop.io.{IntWritable, Text}
|
|
import org.apache.spark.SparkConf
|
|
import org.apache.spark.sql.expressions.Aggregator
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
import org.json4s
|
|
import org.json4s.DefaultFormats
|
|
import org.json4s.jackson.JsonMethods.parse
|
|
import org.slf4j.{Logger, LoggerFactory}
|
|
|
|
object CrossrefDataset {
|
|
|
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
|
|
|
|
|
def to_item(input:String):CrossrefDT = {
|
|
|
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
|
lazy val json: json4s.JValue = parse(input)
|
|
val ts:Long = (json \ "indexed" \ "timestamp").extract[Long]
|
|
val doi:String = (json \ "DOI").extract[String]
|
|
CrossrefDT(doi, input, ts)
|
|
|
|
}
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
|
|
val conf: SparkConf = new SparkConf()
|
|
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
|
|
parser.parseArgument(args)
|
|
val spark: SparkSession =
|
|
SparkSession
|
|
.builder()
|
|
.config(conf)
|
|
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
|
|
.master(parser.get("master")).getOrCreate()
|
|
import spark.implicits._
|
|
|
|
|
|
val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable {
|
|
|
|
override def zero: CrossrefDT = null
|
|
|
|
override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = {
|
|
if (b == null)
|
|
return a
|
|
if (a == null)
|
|
return b
|
|
|
|
|
|
if(a.timestamp >b.timestamp) {
|
|
return a
|
|
}
|
|
b
|
|
}
|
|
|
|
override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = {
|
|
if (b == null)
|
|
return a
|
|
if (a == null)
|
|
return b
|
|
|
|
if(a.timestamp >b.timestamp) {
|
|
return a
|
|
}
|
|
b
|
|
}
|
|
|
|
override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
|
|
|
|
override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
|
|
|
|
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
|
|
}
|
|
|
|
val workingPath:String = parser.get("workingPath")
|
|
|
|
|
|
val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT]
|
|
|
|
|
|
val update =
|
|
spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text])
|
|
.map(i =>CrossrefImporter.decompressBlob(i._2.toString))
|
|
.map(i =>to_item(i)))
|
|
|
|
main_ds.union(update).groupByKey(_.doi)
|
|
.agg(crossrefAggregator.toColumn)
|
|
.map(s=>s._2)
|
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated")
|
|
|
|
}
|
|
|
|
}
|