From b771d67e9d479811025be0346c71fcef6a18eec8 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 May 2020 08:14:03 +0200 Subject: [PATCH 1/2] next step of MAG conversion implemented --- .../doiboost/DoiBoostMappingUtil.scala | 6 +- .../doiboost/crossref/Crossref2Oaf.scala | 9 + .../dnetlib/doiboost/mag/MagDataModel.scala | 5 + .../doiboost/mag/SparkPreProcessMAG.scala | 240 ++++++++++-------- .../dhp/doiboost/mag/oozie_app/workflow.xml | 2 +- .../dnetlib/doiboost/mag/MAGMappingTest.scala | 42 +-- 6 files changed, 161 insertions(+), 143 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 68a3231e0..3980217d8 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -21,11 +21,15 @@ object DoiBoostMappingUtil { def generateDataInfo(): DataInfo = { + generateDataInfo("0.9") + } + + def generateDataInfo(trust:String): DataInfo = { val di = new DataInfo di.setDeletedbyinference(false) di.setInferred(false) di.setInvisible(false) - di.setTrust("0.9") + di.setTrust(trust) di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions")) di } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index eda3bf17a..ece5d123f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -140,6 +140,15 @@ case object Crossref2Oaf { result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava) + //Mapping Subject + val subjectList:List[String] = (json \ "subject").extractOrElse[List[String]](List()) + + if (subjectList.nonEmpty) { + result.setSubject(subjectList.map(s=> createSP(s, "keywords", "dnet:subject_classification_typologies")).asJava) + } + + + //Mapping AUthor val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List()) result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 17f0395ca..e83236d53 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -40,6 +40,9 @@ case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option case class MagUrl(PaperId: Long, instances: List[String]) +case class MagSubject(FieldOfStudyId:Long, DisplayName:String, MainType:Option[String], Score:Float){} + +case class MagFieldOfStudy(PaperId:Long, subjects:List[MagSubject]) {} case class MagJournal(JournalId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], Issn: Option[String], Publisher: Option[String], Webpage: Option[String], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {} @@ -135,6 +138,8 @@ case object ConversionUtil { j.setIssnPrinted(journal.Issn.get) pub.setJournal(j) } + pub.setCollectedfrom(List(createMAGCollectedFrom()).asJava) + pub.setDataInfo(generateDataInfo()) pub } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index a0e20be1a..56ab91d62 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -1,20 +1,18 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Publication -import eu.dnetlib.doiboost.DoiBoostMappingUtil.asField +import eu.dnetlib.dhp.schema.oaf.{Publication, StructuredProperty} +import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil.{asField, createSP} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.sql.functions._ - import scala.collection.JavaConverters._ object SparkPreProcessMAG { - - def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -31,110 +29,138 @@ object SparkPreProcessMAG { val sourcePath = parser.get("sourcePath") import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] - implicit val tupleForJoinEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + +// logger.info("Phase 1) make uninque DOI in Papers:") +// val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] +// +// +// // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one +// val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => +// var r = if (p1 == null) p2 else p1 +// if (p1 != null && p2 != null) { +// if (p1.CreatedDate != null && p2.CreatedDate != null) { +// if (p1.CreatedDate.before(p2.CreatedDate)) +// r = p1 +// else +// r = p2 +// } else { +// r = if (p1.CreatedDate == null) p2 else p1 +// } +// } +// r +// }.map(_._2) +// +// val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) +// distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") +// logger.info(s"Total number of element: ${result.count()}") +// +// logger.info("Phase 3) Group Author by PaperId") +// val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] +// +// val affiliation = spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation] +// +// val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] +// +// +// paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) +// .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) } +// .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") +// .map(s => { +// val mpa = s._1._2 +// val af = s._2 +// if (af != null) { +// MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) +// } else +// mpa +// }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) +// .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") +// +// logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") +// +// val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] +// +// val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] +// +// val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] +// +// val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") +// firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") +// .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") +// +// +// var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] +// +// val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] +// +// +// logger.info("Phase 5) enrich publication with URL and Instances") +// +// magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") +// .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } +// .write.mode(SaveMode.Overwrite) +// .save(s"${parser.get("targetPath")}/merge_step_3") +// +// +// logger.info("Phase 6) Enrich Publication with description") +// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] +// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") +// +// val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] +// +// +// magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] +// +// magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => { +// val pub = p._1._2 +// val abst = p._2 +// if (abst != null) { +// pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) +// } +// pub +// } +// ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") +// + + logger.info("Phase 7) Enrich Publication with FieldOfStudy") + + val magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + + val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") + + val pfos = spark.read.load(s"$sourcePath/PaperFieldsOfStudy") + + val paperField = pfos.joinWith(fos, fos("fos").equalTo(pfos("FieldOfStudyId"))) + .select($"_1.FieldOfStudyId", $"_2.DisplayName", $"_2.MainType", $"_1.PaperId", $"_1.Score") + .groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects")) + .as[MagFieldOfStudy] - - logger.info("Phase 1) make uninque DOI in Papers:") - - val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] - - - // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one - val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => - var r = if (p1 == null) p2 else p1 - if (p1 != null && p2 != null) { - if (p1.CreatedDate != null && p2.CreatedDate != null) { - if (p1.CreatedDate.before(p2.CreatedDate)) - r = p1 - else - r = p2 - } else { - r = if (p1.CreatedDate == null) p2 else p1 + magPubs.joinWith(paperField, col("_1").equalTo(paperField("PaperId")), "left"). + map(item => { + val publication = item._1._2 + val fieldOfStudy = item._2 + if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { + val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { + val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies") + val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) + var resList: List[StructuredProperty] = List(s1) + if (s.MainType.isDefined) { + val maintp = s.MainType.get + val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") + s2.setDataInfo(di) + resList = resList ::: List(s2) + if (maintp.contains(".")) { + val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies") + s3.setDataInfo(di) + resList = resList ::: List(s3) + } + } + resList + }) + publication.setSubject(p.asJava) } - } - r - }.map(_._2) - - val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) - distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") - logger.info(s"Total number of element: ${result.count()}") - - logger.info("Phase 3) Group Author by PaperId") - val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] - - val affiliation =spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation] - - val paperAuthorAffiliation =spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] - - - paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) - .map{case (a:MagPaperAuthorAffiliation,b:MagAuthor )=> (a.AffiliationId,MagPaperAuthorDenormalized(a.PaperId, b, null)) } - .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") - .map(s => { - val mpa = s._1._2 - val af = s._2 - if (af!= null) { - MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) - } else - mpa - }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) - .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") - - - - logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") - - - val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] - - val papers =spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] - - val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] - - - - val firstJoin =papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")),"left") - firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") - .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") - - - - var magPubs:Dataset[(String,Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)] - - val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] - - - logger.info("Phase 5) enrich publication with URL and Instances") - - magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") - .map{a:((String,Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2))} - .write.mode(SaveMode.Overwrite) - .save(s"${parser.get("targetPath")}/merge_step_3") - - - - logger.info("Phase 6) Enrich Publication with description") - val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] - pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") - - val paperAbstract =spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] - - - magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)] - - magPubs.joinWith(paperAbstract,col("_1").equalTo(paperAbstract("PaperId")), "left").map(p=> - { - val pub = p._1._2 - val abst = p._2 - if (abst!= null) { - pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) - } - pub - } - ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") - + publication + }).map{s:Publication => s}(Encoders.bean(classOf[Publication])).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication") } - - } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml index 2277b79b0..6b4cad2af 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -22,7 +22,7 @@ - + diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index 4d26969dd..1567e0008 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -24,47 +24,21 @@ class MAGMappingTest { val mapper = new ObjectMapper() + + @Test - def testMAGCSV(): Unit = { - // SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" ")) - - val sparkConf: SparkConf = new SparkConf - - val spark: SparkSession = SparkSession.builder() - .config(sparkConf) - .appName(getClass.getSimpleName) - .master("local[*]") - .getOrCreate() - - import spark.implicits._ + def testSplitter():Unit = { + val s = "sports.team" - implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] - implicit val longBarEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs) - - val sourcePath = "/data/doiboost/mag/input" - - mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) - - - val magOAF = spark.read.load("$sourcePath/merge_step_4").as[Publication] - - println(magOAF.first().getOriginalId) - - - magOAF.map(k => (ConversionUtil.extractMagIdentifier(k.getOriginalId.asScala),k)).as[(String,Publication)].show() - - - println((ConversionUtil.extractMagIdentifier(magOAF.first().getOriginalId.asScala))) - - val magIDRegex: Regex = "^[0-9]+$".r - - - println(magIDRegex.findFirstMatchIn("suca").isDefined) + if (s.contains(".")) { + println(s.split("\\.")head) + } } + @Test def buildInvertedIndexTest(): Unit = { val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString From 5818abaab4402ac508c4ab1a3a9e44249eeb9594 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 May 2020 17:05:46 +0200 Subject: [PATCH 2/2] fixed Crossref Mapping --- .../doiboost/crossref/Crossref2Oaf.scala | 15 +- .../doiboost/mag/SparkPreProcessMAG.scala | 182 +++++++++--------- .../doiboost/CrossrefMappingTest.scala | 4 +- .../eu/dnetlib/doiboost/awardTest.json | 9 +- 4 files changed, 111 insertions(+), 99 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index ece5d123f..b515a7330 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -137,6 +137,9 @@ case object Crossref2Oaf { if (StringUtils.isNotBlank(issuedDate)) { result.setDateofacceptance(asField(issuedDate)) } + else { + result.setDateofacceptance(asField(createdDate.getValue)) + } result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava) @@ -233,6 +236,16 @@ case object Crossref2Oaf { val queue = new mutable.Queue[Relation] + def snfRule(award:String): String = { + var tmp1 = StringUtils.substringAfter(award,"_") + val tmp2 = StringUtils.substringBefore(tmp1,"/") + logger.debug(s"From $award to $tmp2") + tmp2 + + + } + + def extractECAward(award: String): String = { val awardECRegex: Regex = "[0-9]{4,9}".r if (awardECRegex.findAllIn(award).hasNext) @@ -298,7 +311,7 @@ case object Crossref2Oaf { case "10.13039/501100006588" | "10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") ) case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a) - case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", extractECAward) + case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snfRule) case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a) case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a) case "10.13039/100004440"=> queue += generateRelation(sourceId,"1e5e62235d094afd01cd56e65112fc63", "wt__________" ) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index 56ab91d62..0001c83c6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -31,100 +31,99 @@ object SparkPreProcessMAG { implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) -// logger.info("Phase 1) make uninque DOI in Papers:") -// val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] -// -// -// // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one -// val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => -// var r = if (p1 == null) p2 else p1 -// if (p1 != null && p2 != null) { -// if (p1.CreatedDate != null && p2.CreatedDate != null) { -// if (p1.CreatedDate.before(p2.CreatedDate)) -// r = p1 -// else -// r = p2 -// } else { -// r = if (p1.CreatedDate == null) p2 else p1 -// } -// } -// r -// }.map(_._2) -// -// val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) -// distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") -// logger.info(s"Total number of element: ${result.count()}") -// -// logger.info("Phase 3) Group Author by PaperId") -// val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] -// -// val affiliation = spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation] -// -// val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] -// -// -// paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) -// .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) } -// .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") -// .map(s => { -// val mpa = s._1._2 -// val af = s._2 -// if (af != null) { -// MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) -// } else -// mpa -// }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) -// .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") -// -// logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") -// -// val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] -// -// val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] -// -// val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] -// -// val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") -// firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") -// .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") -// -// -// var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] -// -// val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] -// -// -// logger.info("Phase 5) enrich publication with URL and Instances") -// -// magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") -// .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } -// .write.mode(SaveMode.Overwrite) -// .save(s"${parser.get("targetPath")}/merge_step_3") -// -// -// logger.info("Phase 6) Enrich Publication with description") -// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] -// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") -// -// val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] -// -// -// magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] -// -// magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => { -// val pub = p._1._2 -// val abst = p._2 -// if (abst != null) { -// pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) -// } -// pub -// } -// ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") -// + logger.info("Phase 1) make uninque DOI in Papers:") + val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] + + // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one + val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => + var r = if (p1 == null) p2 else p1 + if (p1 != null && p2 != null) { + if (p1.CreatedDate != null && p2.CreatedDate != null) { + if (p1.CreatedDate.before(p2.CreatedDate)) + r = p1 + else + r = p2 + } else { + r = if (p1.CreatedDate == null) p2 else p1 + } + } + r + }.map(_._2) + + + + val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) + distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") + logger.info(s"Total number of element: ${result.count()}") + + logger.info("Phase 3) Group Author by PaperId") + val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] + + val affiliation = spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation] + val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] + + paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) + .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) } + .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") + .map(s => { + val mpa = s._1._2 + val af = s._2 + if (af != null) { + MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) + } else + mpa + }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) + .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") + + logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") + + val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] + + val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] + + val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] + + val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") + firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") + .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") + + + var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + + val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] + + + logger.info("Phase 5) enrich publication with URL and Instances") + + magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") + .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } + .write.mode(SaveMode.Overwrite) + .save(s"${parser.get("targetPath")}/merge_step_3") + + + logger.info("Phase 6) Enrich Publication with description") + val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] + pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") + + val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] + + + magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + + magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => { + val pub = p._1._2 + val abst = p._2 + if (abst != null) { + pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) + } + pub + } + ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") + logger.info("Phase 7) Enrich Publication with FieldOfStudy") - val magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") @@ -135,7 +134,6 @@ object SparkPreProcessMAG { .groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects")) .as[MagFieldOfStudy] - magPubs.joinWith(paperField, col("_1").equalTo(paperField("PaperId")), "left"). map(item => { val publication = item._1._2 diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala index 2d7cf4216..0f9a3f8fb 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala @@ -90,9 +90,9 @@ class CrossrefMappingTest { val rels:List[Relation] = resultList.filter(p => p.isInstanceOf[Relation]).map(r=> r.asInstanceOf[Relation]) - assertEquals(rels.size, 4) - rels.foreach(s => logger.info(s.getTarget)) + rels.foreach(s => logger.info(s.getTarget)) + assertEquals(rels.size, 3 ) } diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json index f570b3bdb..7c6e7beb9 100644 --- a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json @@ -179,12 +179,13 @@ ], "funder": [ { + "DOI": "10.13039/501100001711", + "name": "Swiss National Science Foundation (Schweizerische Nationalfonds)", "doi-asserted-by": "publisher", - "DOI": "10.13039/501100000781", - "name": "European Research Council", "award": [ - "284236-REPCOLLAB", - "FP7/2007-2013" + "CR32I3_156724", + "31003A_173281/1", + "200021_165850" ] } ],