From ad0e9aa80c898ff85504d502edb0cef5e169f449 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 29 Feb 2024 18:16:15 +0100 Subject: [PATCH] added first part of refactoring of the code generating MAG, make it more readable using spark sql queries --- .../dhp/collection/mag/MagUtility.scala | 302 ++++++++++++++++++ .../mag/SparkCreateMagDataset.scala | 77 +++++ 2 files changed, 379 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala new file mode 100644 index 000000000..412089893 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -0,0 +1,302 @@ +package eu.dnetlib.dhp.collection.mag + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse + + +object MagUtility extends Serializable { + + + val datatypedict = Map( + "bool" -> BooleanType, + "int" -> IntegerType, + "uint" -> IntegerType, + "long" -> LongType, + "ulong" -> LongType, + "float" -> FloatType, + "string" -> StringType, + "DateTime" -> DateType + ) + + val stream = Map( + "Affiliations" -> Tuple2( + "mag/Affiliations.txt", + Seq( + "AffiliationId:long", + "Rank:uint", + "NormalizedName:string", + "DisplayName:string", + "GridId:string", + "OfficialPage:string", + "WikiPage:string", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "Iso3166Code:string", + "Latitude:float?", + "Longitude:float?", + "CreatedDate:DateTime" + ) + ), + "AuthorExtendedAttributes" -> Tuple2( + "mag/AuthorExtendedAttributes.txt", + Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string") + ), + "Authors" -> Tuple2( + "mag/Authors.txt", + Seq( + "AuthorId:long", + "Rank:uint", + "NormalizedName:string", + "DisplayName:string", + "LastKnownAffiliationId:long?", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "CreatedDate:DateTime" + ) + ), + "ConferenceInstances" -> Tuple2( + "mag/ConferenceInstances.txt", + Seq( + "ConferenceInstanceId:long", + "NormalizedName:string", + "DisplayName:string", + "ConferenceSeriesId:long", + "Location:string", + "OfficialUrl:string", + "StartDate:DateTime?", + "EndDate:DateTime?", + "AbstractRegistrationDate:DateTime?", + "SubmissionDeadlineDate:DateTime?", + "NotificationDueDate:DateTime?", + "FinalVersionDueDate:DateTime?", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "Latitude:float?", + "Longitude:float?", + "CreatedDate:DateTime" + ) + ), + "ConferenceSeries" -> Tuple2( + "mag/ConferenceSeries.txt", + Seq( + "ConferenceSeriesId:long", + "Rank:uint", + "NormalizedName:string", + "DisplayName:string", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "CreatedDate:DateTime" + ) + ), + "EntityRelatedEntities" -> Tuple2( + "advanced/EntityRelatedEntities.txt", + Seq( + "EntityId:long", + "EntityType:string", + "RelatedEntityId:long", + "RelatedEntityType:string", + "RelatedType:int", + "Score:float" + ) + ), + "FieldOfStudyChildren" -> Tuple2( + "advanced/FieldOfStudyChildren.txt", + Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long") + ), + "FieldOfStudyExtendedAttributes" -> Tuple2( + "advanced/FieldOfStudyExtendedAttributes.txt", + Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string") + ), + "FieldsOfStudy" -> Tuple2( + "advanced/FieldsOfStudy.txt", + Seq( + "FieldOfStudyId:long", + "Rank:uint", + "NormalizedName:string", + "DisplayName:string", + "MainType:string", + "Level:int", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "CreatedDate:DateTime" + ) + ), + "Journals" -> Tuple2( + "mag/Journals.txt", + Seq( + "JournalId:long", + "Rank:uint", + "NormalizedName:string", + "DisplayName:string", + "Issn:string", + "Publisher:string", + "Webpage:string", + "PaperCount:long", + "PaperFamilyCount:long", + "CitationCount:long", + "CreatedDate:DateTime" + ) + ), + "PaperAbstractsInvertedIndex" -> Tuple2( + "nlp/PaperAbstractsInvertedIndex.txt.*", + Seq("PaperId:long", "IndexedAbstract:string") + ), + "PaperAuthorAffiliations" -> Tuple2( + "mag/PaperAuthorAffiliations.txt", + Seq( + "PaperId:long", + "AuthorId:long", + "AffiliationId:long?", + "AuthorSequenceNumber:uint", + "OriginalAuthor:string", + "OriginalAffiliation:string" + ) + ), + "PaperCitationContexts" -> Tuple2( + "nlp/PaperCitationContexts.txt", + Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string") + ), + "PaperExtendedAttributes" -> Tuple2( + "mag/PaperExtendedAttributes.txt", + Seq("PaperId:long", "AttributeType:int", "AttributeValue:string") + ), + "PaperFieldsOfStudy" -> Tuple2( + "advanced/PaperFieldsOfStudy.txt", + Seq("PaperId:long", "FieldOfStudyId:long", "Score:float") + ), + "PaperMeSH" -> Tuple2( + "advanced/PaperMeSH.txt", + Seq( + "PaperId:long", + "DescriptorUI:string", + "DescriptorName:string", + "QualifierUI:string", + "QualifierName:string", + "IsMajorTopic:bool" + ) + ), + "PaperRecommendations" -> Tuple2( + "advanced/PaperRecommendations.txt", + Seq("PaperId:long", "RecommendedPaperId:long", "Score:float") + ), + "PaperReferences" -> Tuple2( + "mag/PaperReferences.txt", + Seq("PaperId:long", "PaperReferenceId:long") + ), + "PaperResources" -> Tuple2( + "mag/PaperResources.txt", + Seq( + "PaperId:long", + "ResourceType:int", + "ResourceUrl:string", + "SourceUrl:string", + "RelationshipType:int" + ) + ), + "PaperUrls" -> Tuple2( + "mag/PaperUrls.txt", + Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string") + ), + "Papers" -> Tuple2( + "mag/Papers.txt", + Seq( + "PaperId:long", + "Rank:uint", + "Doi:string", + "DocType:string", + "PaperTitle:string", + "OriginalTitle:string", + "BookTitle:string", + "Year:int?", + "Date:DateTime?", + "OnlineDate:DateTime?", + "Publisher:string", + "JournalId:long?", + "ConferenceSeriesId:long?", + "ConferenceInstanceId:long?", + "Volume:string", + "Issue:string", + "FirstPage:string", + "LastPage:string", + "ReferenceCount:long", + "CitationCount:long", + "EstimatedCitation:long", + "OriginalVenue:string", + "FamilyId:long?", + "FamilyRank:uint?", + "DocSubTypes:string", + "CreatedDate:DateTime" + ) + ), + "RelatedFieldOfStudy" -> Tuple2( + "advanced/RelatedFieldOfStudy.txt", + Seq( + "FieldOfStudyId1:long", + "Type1:string", + "FieldOfStudyId2:long", + "Type2:string", + "Rank:float" + ) + ) + ) + + def getSchema(streamName: String): StructType = { + var schema = new StructType() + val d: Seq[String] = stream(streamName)._2 + d.foreach { case t => + val currentType = t.split(":") + val fieldName: String = currentType.head + var fieldType: String = currentType.last + val nullable: Boolean = fieldType.endsWith("?") + if (nullable) + fieldType = fieldType.replace("?", "") + schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) + } + schema + } + + + def loadMagEntity(spark:SparkSession, entity:String, basePath:String):Dataset[Row] = { + if (stream.contains(entity)) { + val s =getSchema(entity) + val pt = stream(entity)._1 + spark.read + .option("header", "false") + .option("charset", "UTF8") + .option("delimiter", "\t") + .schema(s) + .csv(s"$basePath/$pt") + } else + null + + } + def convertInvertedIndexString(json_input: String): String = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(json_input) + val idl = (json \ "IndexLength").extract[Int] + if (idl > 0) { + val res = Array.ofDim[String](idl) + + val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]] + + for { (k: String, v: List[Int]) <- iid } { + v.foreach(item => res(item) = k) + } + (0 until idl).foreach(i => { + if (res(i) == null) + res(i) = "" + }) + return res.mkString(" ") + } + "" + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala new file mode 100644 index 000000000..34e4e982a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala @@ -0,0 +1,77 @@ +package eu.dnetlib.dhp.collection.mag + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.slf4j.Logger + +class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + + } + + + private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath): Unit = { + + import spark.implicits._ + val schema:StructType= StructType(StructField("DOI", StringType)::Nil) + log.info("Phase 1 intersect MAG Paper containing doi also present in crossref") + + //Filter all the MAG Papers that intersect with a Crossref DOI + val crId= spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId") + val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath).withColumn("Doi", lower(col("Doi"))).where(col("Doi").isNotNull) + + + val intersectedPapers:Dataset[Row] =magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") + intersectedPapers.cache() + intersectedPapers.count() + log.info("Create current abstract") + + //Abstract is an inverted list, we define a function that convert in string the abstract and recreate + // a table(PaperId, Abstract) + val paperAbstract = MagUtility.loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath) + .map(s => (s.getLong(0),MagUtility.convertInvertedIndexString(s.getString(1)))) + .withColumnRenamed("_1","PaperId") + .withColumnRenamed("_2","Abstract") + + //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract + + val step0 =intersectedPapers + .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left") + .select(intersectedPapers("*"),paperAbstract("Abstract")).cache() + + step0.count() + + intersectedPapers.unpersist() + + // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the + //next step we create a table containing + val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath) + val affiliations= MagUtility.loadMagEntity(spark, "Affiliations", magBasePath) + val paaf= MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath) + + val paperAuthorAffiliations =paaf.join(step0,paaf("PaperId") === step0("PaperId"),"leftsemi") + + val j1 = paperAuthorAffiliations.join(authors,paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner") + .select(col("PaperId"), col("AffiliationId"),col("AuthorSequenceNumber"), authors("DisplayName").alias("AuthorName"), authors("AuthorId")) + + val paperAuthorAffiliationNormalized = j1.join(affiliations, j1("AffiliationId")=== affiliations("AffiliationId"), "left") + .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId")) + .groupBy("PaperId") + .agg(collect_list(struct("AffiliationId","AuthorSequenceNumber","AffiliationName","AuthorName","AuthorId","GridId")).alias("authors")) + val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left").cache() + step1.count() + + step0.unpersist() + + + } +}