dnet-hadoop/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala

99 lines
3.7 KiB
Scala
Raw Normal View History

package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
2020-04-29 13:13:02 +02:00
import eu.dnetlib.dhp.schema.oaf
2020-05-28 09:57:46 +02:00
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
2020-05-28 09:57:46 +02:00
import org.apache.spark.rdd.RDD
2020-04-29 13:13:02 +02:00
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
2020-04-23 09:33:48 +02:00
case class Reference(author: String, firstPage: String) {}
object SparkMapDumpIntoOAF {
def main(args: Array[String]): Unit = {
2020-04-23 09:33:48 +02:00
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
2020-04-29 13:13:02 +02:00
2020-05-28 09:57:46 +02:00
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val sc = spark.sparkContext
2020-04-29 13:13:02 +02:00
val targetPath = parser.get("targetPath")
2020-04-23 09:33:48 +02:00
2020-04-29 13:13:02 +02:00
sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
2020-04-23 09:33:48 +02:00
.map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
2020-04-29 13:13:02 +02:00
.flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
2020-05-28 09:57:46 +02:00
val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
.map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
r = p2
else
r = p1
} else {
r = if (p1.getLastupdatetimestamp == null) p2 else p1
}
}
r
}.map(_._2)
val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
2020-04-29 13:13:02 +02:00
pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
2020-05-28 09:57:46 +02:00
val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
.map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
r = p2
else
r = p1
} else {
r = if (p1.getLastupdatetimestamp == null) p2 else p1
}
}
r
}.map(_._2)
spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
.map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
.reduceByKey { case (p1: Relation, p2: Relation) =>
if (p1 == null) p2 else p1
}.map(_._2)
val rels: Dataset[Relation] = spark.createDataset(distinctRels)
2020-04-29 13:13:02 +02:00
rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
}
}