dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala

169 lines
5.6 KiB
Scala

package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.io.{IntWritable, SequenceFile, Text}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter._
import java.time.{LocalDateTime, ZoneOffset}
import scala.io.Source
object ImportDatacite {
val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass)
def convertAPIStringToDataciteItem(input:String): DataciteType = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase
val isActive = (json \ "attributes" \ "isActive").extract[Boolean]
val timestamp_string = (json \ "attributes" \ "updated").extract[String]
val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME)
DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli/1000, isActive = isActive, json = input)
}
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val hdfsuri = parser.get("namenode")
log.info(s"namenode is $hdfsuri")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is $targetPath")
val dataciteDump = parser.get("dataciteDumpPath")
log.info(s"dataciteDump is $dataciteDump")
val hdfsTargetPath =new Path(targetPath)
log.info(s"hdfsTargetPath is $hdfsTargetPath")
val spark: SparkSession = SparkSession.builder()
.appName(ImportDatacite.getClass.getSimpleName)
.master(master)
.getOrCreate()
// ====== Init HDFS File System Object
val conf = new Configuration
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri)
// Because of Maven
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
val sc:SparkContext = spark.sparkContext
sc.setLogLevel("ERROR")
import spark.implicits._
val dataciteAggregator: Aggregator[DataciteType, DataciteType, DataciteType] = new Aggregator[DataciteType, DataciteType, DataciteType] with Serializable {
override def zero: DataciteType = null
override def reduce(a: DataciteType, b: DataciteType): DataciteType = {
if (b == null)
return a
if (a == null)
return b
if(a.timestamp >b.timestamp) {
return a
}
b
}
override def merge(a: DataciteType, b: DataciteType): DataciteType = {
reduce(a,b)
}
override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
override def outputEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
override def finish(reduction: DataciteType): DataciteType = reduction
}
val dump:Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType]
val ts = dump.select(max("timestamp")).first().getLong(0)
log.info(s"last Timestamp is $ts")
val cnt = writeSequenceFile(hdfsTargetPath, ts, conf)
log.info(s"Imported from Datacite API $cnt documents")
if (cnt > 0) {
val inputRdd:RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
.map(s => s._2.toString)
.map(s => convertAPIStringToDataciteItem(s))
spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset")
val ds:Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType]
dump
.union(ds)
.groupByKey(_.doi)
.agg(dataciteAggregator.toColumn)
.map(s=>s._2)
.repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated")
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(s"$dataciteDump"), true)
fs.rename(new Path(s"${dataciteDump}_updated"),new Path(s"$dataciteDump"))
}
}
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration):Long = {
val client = new DataciteAPIImporter(timestamp*1000, 1000)
var i = 0
try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
try {
var start: Long = System.currentTimeMillis
var end: Long = 0
val key: IntWritable = new IntWritable(i)
val value: Text = new Text
while ( {
client.hasNext
}) {
key.set({
i += 1;
i - 1
})
value.set(client.next())
writer.append(key, value)
writer.hflush()
if (i % 1000 == 0) {
end = System.currentTimeMillis
val time = (end - start) / 1000.0F
println(s"Imported $i in $time seconds")
start = System.currentTimeMillis
}
}
} finally if (writer != null) writer.close()
}
i
}
}