forked from D-Net/dnet-hadoop
added the normalization step to the doi
This commit is contained in:
parent
779015e4a9
commit
22ae8a81c2
|
@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.mag
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
@ -12,6 +13,23 @@ import org.slf4j.{Logger, LoggerFactory}
|
|||
import scala.collection.JavaConverters._
|
||||
|
||||
object SparkProcessMAG {
|
||||
|
||||
|
||||
def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={
|
||||
d.where(col("Doi").isNotNull)
|
||||
.groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING)
|
||||
.reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2))
|
||||
.map(_._2)(Encoders.product[MagPapers])
|
||||
.map(mp => {
|
||||
new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi),
|
||||
mp.DocType, mp.PaperTitle, mp.OriginalTitle,
|
||||
mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String,
|
||||
mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId,
|
||||
mp.Volume, mp.Issue, mp.FirstPage, mp.LastPage,
|
||||
mp.ReferenceCount, mp.CitationCount, mp.EstimatedCitation,
|
||||
mp.OriginalVenue, mp.FamilyId, mp.CreatedDate)
|
||||
})(Encoders.product[MagPapers])
|
||||
}
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
@ -33,17 +51,11 @@ object SparkProcessMAG {
|
|||
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
|
||||
|
||||
logger.info("Phase 1) make uninque DOI in Papers:")
|
||||
logger.info("Phase 1) make unique DOI in Papers:")
|
||||
val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers]
|
||||
|
||||
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
|
||||
val result: RDD[MagPapers] = d.where(col("Doi").isNotNull)
|
||||
.rdd
|
||||
.map{ p: MagPapers => Tuple2(p.Doi, p) }
|
||||
.reduceByKey((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2))
|
||||
.map(_._2)
|
||||
|
||||
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
|
||||
val distinctPaper: Dataset[MagPapers] = getDistinctResults(d)
|
||||
|
||||
distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct")
|
||||
|
||||
|
|
Loading…
Reference in New Issue