added normalization step to the doi

This commit is contained in:
Miriam Baglioni 2021-06-29 18:51:11 +02:00
parent 06074ea7d3
commit a74de1cda2
1 changed files with 20 additions and 8 deletions

View File

@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
@ -12,6 +13,23 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
object SparkProcessMAG { object SparkProcessMAG {
def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={
d.where(col("Doi").isNotNull)
.groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING)
.reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2))
.map(_._2)(Encoders.product[MagPapers])
.map(mp => {
new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi),
mp.DocType, mp.PaperTitle, mp.OriginalTitle,
mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String,
mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId,
mp.Volume, mp.Issue, mp.FirstPage, mp.LastPage,
mp.ReferenceCount, mp.CitationCount, mp.EstimatedCitation,
mp.OriginalVenue, mp.FamilyId, mp.CreatedDate)
})(Encoders.product[MagPapers])
}
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
@ -33,17 +51,11 @@ object SparkProcessMAG {
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
logger.info("Phase 1) make uninque DOI in Papers:") logger.info("Phase 1) make uninue DOI in Papers:")
val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers] val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[MagPapers] = d.where(col("Doi").isNotNull) val distinctPaper: Dataset[MagPapers] = getDistinctResults(d)
.rdd
.map{ p: MagPapers => Tuple2(p.Doi, p) }
.reduceByKey((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2))
.map(_._2)
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct") distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct")