From a74de1cda251c83519166a824815f11cd3b4b197 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 29 Jun 2021 18:51:11 +0200 Subject: [PATCH] added normalization step to the doi --- .../doiboost/mag/SparkProcessMAG.scala | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index 173e33360..ecb389af8 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD @@ -12,6 +13,23 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ object SparkProcessMAG { + + def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={ + d.where(col("Doi").isNotNull) + .groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING) + .reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) + .map(_._2)(Encoders.product[MagPapers]) + .map(mp => { + new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi), + mp.DocType, mp.PaperTitle, mp.OriginalTitle, + mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String, + mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId, + mp.Volume, mp.Issue, mp.FirstPage, mp.LastPage, + mp.ReferenceCount, mp.CitationCount, mp.EstimatedCitation, + mp.OriginalVenue, mp.FamilyId, mp.CreatedDate) + })(Encoders.product[MagPapers]) + } + def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -33,17 +51,11 @@ object SparkProcessMAG { implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) - logger.info("Phase 1) make uninque DOI in Papers:") + logger.info("Phase 1) make uninue DOI in Papers:") val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers] // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one - val result: RDD[MagPapers] = d.where(col("Doi").isNotNull) - .rdd - .map{ p: MagPapers => Tuple2(p.Doi, p) } - .reduceByKey((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) - .map(_._2) - - val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) + val distinctPaper: Dataset[MagPapers] = getDistinctResults(d) distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct")