From f417515e43c949fcdf4b4a3da821d5ca9cf36159 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 4 Mar 2024 17:15:13 +0100 Subject: [PATCH] Implemented class that generates a normalized table of MAG, which is the starting point for the creation of the mag source --- .../mag/SparkCreateMagDataset.scala | 63 +++++++++++++++++-- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala index 34e4e982a..70073cbe6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.collection.mag import eu.dnetlib.dhp.application.AbstractScalaApplication -import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.slf4j.Logger @@ -19,21 +19,22 @@ class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Log } - private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath): Unit = { + private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath:String): Unit = { import spark.implicits._ val schema:StructType= StructType(StructField("DOI", StringType)::Nil) - log.info("Phase 1 intersect MAG Paper containing doi also present in crossref") //Filter all the MAG Papers that intersect with a Crossref DOI val crId= spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId") - val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath).withColumn("Doi", lower(col("Doi"))).where(col("Doi").isNotNull) + val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath) + .withColumn("Doi", lower(col("Doi"))) + .where(col("Doi").isNotNull) val intersectedPapers:Dataset[Row] =magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") intersectedPapers.cache() intersectedPapers.count() - log.info("Create current abstract") + //log.info("Create current abstract") //Abstract is an inverted list, we define a function that convert in string the abstract and recreate // a table(PaperId, Abstract) @@ -67,11 +68,61 @@ class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Log .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId")) .groupBy("PaperId") .agg(collect_list(struct("AffiliationId","AuthorSequenceNumber","AffiliationName","AuthorName","AuthorId","GridId")).alias("authors")) - val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left").cache() + val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left") + .select(step0("*"),paperAuthorAffiliationNormalized("authors")) + .cache() step1.count() step0.unpersist() + val conference = MagUtility.loadMagEntity(spark, "ConferenceInstances", magBasePath).select( + $"ConferenceInstanceId", + $"DisplayName".as("conferenceName"), + $"Location".as("conferenceLocation"), + $"StartDate".as("conferenceStartDate"), + $"EndDate".as("conferenceEndDate") + ) + + val step2 =step1.join(conference, step1("ConferenceInstanceId")=== conference("ConferenceInstanceId"),"left").select( + step1("*"), conference("conferenceName"), + conference("conferenceLocation"), + conference("conferenceStartDate"), + conference("conferenceEndDate")).cache() + step2.count() + step1.unpersist() + + val fos = MagUtility.loadMagEntity(spark, "FieldsOfStudy", magBasePath) + .select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") + + val paperFieldsOfStudy = MagUtility.loadMagEntity(spark, "PaperFieldsOfStudy", magBasePath) + .select($"FieldOfStudyId", $"Score", $"PaperId") + + val paperFoS = paperFieldsOfStudy.join(broadcast(fos),fos("fos")===paperFieldsOfStudy("FieldOfStudyId")).groupBy("PaperId") + .agg(collect_set(struct("FieldOfStudyId","DisplayName","MainType","Score")).as("FoS")) + + val step3=step2.join(paperFoS, step2("PaperId")===paperFoS("PaperId"), "left") + .select(step2("*"), paperFoS("FoS")).cache() + step3.count() + + step2.unpersist() + + val journals= MagUtility.loadMagEntity(spark, "Journals", magBasePath) + .select( + $"JournalId", + $"DisplayName".as("journalName"), + $"Issn".as("journalIssn"), + $"Publisher".as("journalPublisher"), + $"Webpage".as("journalWebpage") + ) + step3.join(journals, step3("JournalId")===journals("JournalId"), "left"). + select(step3("*"), + journals("journalName"), + journals("journalIssn"), + journals("journalPublisher"), + journals("journalWebpage") + ).write.mode("OverWrite") + .save(s"$workingPath/generatedMAG") + step3.unpersist() } }