From 3b837d38cecf33cc50a93e094cd9ccd53ebf99f3 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 5 Mar 2024 11:44:59 +0100 Subject: [PATCH] added oozie workflow --- ...ate_MAG_denormalized_table_properites.json | 0 .../mag/oozie_app/config-default.xml | 23 ++ .../dhp/collection/mag/oozie_app/workflow.xml | 95 ++++++++ .../dhp/collection/mag/MagUtility.scala | 82 ++++++- .../mag/SparkCreateMagDataset.scala | 128 ---------- .../mag/SparkCreateMagDenormalizedTable.scala | 222 ++++++++++++++++++ 6 files changed, 410 insertions(+), 140 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml new file mode 100644 index 000000000..bcb049a06 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml @@ -0,0 +1,95 @@ + + + + crossrefPath + the path of the native Crossref DUMP + + + magBasePath + The base path of MAG DUMP CSV Tables + + + workingPath + The working path + + + resume_from + start Node + + + + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + ${wf:conf('resumeFrom') eq 'generateTable'} + + + + + + + + yarn + cluster + Generate ORCID Tables + eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=2g + --conf spark.sql.shuffle.partitions=3000 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --crossrefPath${crossrefPath} + --magBasePath${magBasePath} + --workingPath${workingPath} + --masteryarn + + + + + + + + yarn + cluster + Generate ORCID Tables + eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=2g + --conf spark.sql.shuffle.partitions=3000 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --crossrefPath${crossrefPath} + --magBasePath${magBasePath} + --workingPath${workingPath} + --masteryarn + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 412089893..42cee7354 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -6,10 +6,67 @@ import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse +case class MAGPaper( + paperId: Option[Long], + rank: Option[Int], + doi: Option[String], + docType: Option[String], + paperTitle: Option[String], + originalTitle: Option[String], + bookTitle: Option[String], + year: Option[Int], + date: Option[String], + onlineDate: Option[String], + publisher: Option[String], + // Journal or Conference information (one will be populated) + journalId: Option[Long], + journalName: Option[String], + journalIssn: Option[String], + journalPublisher: Option[String], + journalWebpage: Option[String], + conferenceSeriesId: Option[Long], + conferenceInstanceId: Option[Long], + conferenceName: Option[String], + conferenceLocation: Option[String], + conferenceStartDate: Option[String], + conferenceEndDate: Option[String], + volume: Option[String], + issue: Option[String], + firstPage: Option[String], + lastPage: Option[String], + referenceCount: Option[Long], + citationCount: Option[Long], + estimatedCitation: Option[Long], + originalVenue: Option[String], + familyId: Option[Long], + familyRank: Option[Int], + docSubTypes: Option[String], + createdDate: Option[String], + abstractText: Option[String], + // List of authors + authors: Option[List[MAGAuthor]], + // List of Fields of Study + fos: Option[List[MAGFieldOfStudy]] +) + +case class MAGAuthor( + AffiliationId: Option[Long], + AuthorSequenceNumber: Option[Int], + AffiliationName: Option[String], + AuthorName: Option[String], + AuthorId: Option[Long], + GridId: Option[String] +) + +case class MAGFieldOfStudy( + FieldOfStudyId: Option[Long], + DisplayName: Option[String], + MainType: Option[String], + Score: Option[Double] +) object MagUtility extends Serializable { - val datatypedict = Map( "bool" -> BooleanType, "int" -> IntegerType, @@ -251,22 +308,22 @@ object MagUtility extends Serializable { def getSchema(streamName: String): StructType = { var schema = new StructType() val d: Seq[String] = stream(streamName)._2 - d.foreach { case t => - val currentType = t.split(":") - val fieldName: String = currentType.head - var fieldType: String = currentType.last - val nullable: Boolean = fieldType.endsWith("?") - if (nullable) - fieldType = fieldType.replace("?", "") - schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) + d.foreach { + case t => + val currentType = t.split(":") + val fieldName: String = currentType.head + var fieldType: String = currentType.last + val nullable: Boolean = fieldType.endsWith("?") + if (nullable) + fieldType = fieldType.replace("?", "") + schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) } schema } - - def loadMagEntity(spark:SparkSession, entity:String, basePath:String):Dataset[Row] = { + def loadMagEntity(spark: SparkSession, entity: String, basePath: String): Dataset[Row] = { if (stream.contains(entity)) { - val s =getSchema(entity) + val s = getSchema(entity) val pt = stream(entity)._1 spark.read .option("header", "false") @@ -278,6 +335,7 @@ object MagUtility extends Serializable { null } + def convertInvertedIndexString(json_input: String): String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(json_input) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala deleted file mode 100644 index 70073cbe6..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala +++ /dev/null @@ -1,128 +0,0 @@ -package eu.dnetlib.dhp.collection.mag - -import eu.dnetlib.dhp.application.AbstractScalaApplication -import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.slf4j.Logger - -class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Logger) - extends AbstractScalaApplication(propertyPath, args, log: Logger) { - - - - /** Here all the spark applications runs this method - * where the whole logic of the spark node is defined - */ - override def run(): Unit = { - - } - - - private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath:String): Unit = { - - import spark.implicits._ - val schema:StructType= StructType(StructField("DOI", StringType)::Nil) - - //Filter all the MAG Papers that intersect with a Crossref DOI - val crId= spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId") - val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath) - .withColumn("Doi", lower(col("Doi"))) - .where(col("Doi").isNotNull) - - - val intersectedPapers:Dataset[Row] =magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") - intersectedPapers.cache() - intersectedPapers.count() - //log.info("Create current abstract") - - //Abstract is an inverted list, we define a function that convert in string the abstract and recreate - // a table(PaperId, Abstract) - val paperAbstract = MagUtility.loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath) - .map(s => (s.getLong(0),MagUtility.convertInvertedIndexString(s.getString(1)))) - .withColumnRenamed("_1","PaperId") - .withColumnRenamed("_2","Abstract") - - //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract - - val step0 =intersectedPapers - .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left") - .select(intersectedPapers("*"),paperAbstract("Abstract")).cache() - - step0.count() - - intersectedPapers.unpersist() - - // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the - //next step we create a table containing - val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath) - val affiliations= MagUtility.loadMagEntity(spark, "Affiliations", magBasePath) - val paaf= MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath) - - val paperAuthorAffiliations =paaf.join(step0,paaf("PaperId") === step0("PaperId"),"leftsemi") - - val j1 = paperAuthorAffiliations.join(authors,paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner") - .select(col("PaperId"), col("AffiliationId"),col("AuthorSequenceNumber"), authors("DisplayName").alias("AuthorName"), authors("AuthorId")) - - val paperAuthorAffiliationNormalized = j1.join(affiliations, j1("AffiliationId")=== affiliations("AffiliationId"), "left") - .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId")) - .groupBy("PaperId") - .agg(collect_list(struct("AffiliationId","AuthorSequenceNumber","AffiliationName","AuthorName","AuthorId","GridId")).alias("authors")) - val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left") - .select(step0("*"),paperAuthorAffiliationNormalized("authors")) - .cache() - step1.count() - - step0.unpersist() - - - val conference = MagUtility.loadMagEntity(spark, "ConferenceInstances", magBasePath).select( - $"ConferenceInstanceId", - $"DisplayName".as("conferenceName"), - $"Location".as("conferenceLocation"), - $"StartDate".as("conferenceStartDate"), - $"EndDate".as("conferenceEndDate") - ) - - val step2 =step1.join(conference, step1("ConferenceInstanceId")=== conference("ConferenceInstanceId"),"left").select( - step1("*"), conference("conferenceName"), - conference("conferenceLocation"), - conference("conferenceStartDate"), - conference("conferenceEndDate")).cache() - step2.count() - step1.unpersist() - - val fos = MagUtility.loadMagEntity(spark, "FieldsOfStudy", magBasePath) - .select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") - - val paperFieldsOfStudy = MagUtility.loadMagEntity(spark, "PaperFieldsOfStudy", magBasePath) - .select($"FieldOfStudyId", $"Score", $"PaperId") - - val paperFoS = paperFieldsOfStudy.join(broadcast(fos),fos("fos")===paperFieldsOfStudy("FieldOfStudyId")).groupBy("PaperId") - .agg(collect_set(struct("FieldOfStudyId","DisplayName","MainType","Score")).as("FoS")) - - val step3=step2.join(paperFoS, step2("PaperId")===paperFoS("PaperId"), "left") - .select(step2("*"), paperFoS("FoS")).cache() - step3.count() - - step2.unpersist() - - val journals= MagUtility.loadMagEntity(spark, "Journals", magBasePath) - .select( - $"JournalId", - $"DisplayName".as("journalName"), - $"Issn".as("journalIssn"), - $"Publisher".as("journalPublisher"), - $"Webpage".as("journalWebpage") - ) - step3.join(journals, step3("JournalId")===journals("JournalId"), "left"). - select(step3("*"), - journals("journalName"), - journals("journalIssn"), - journals("journalPublisher"), - journals("journalWebpage") - ).write.mode("OverWrite") - .save(s"$workingPath/generatedMAG") - step3.unpersist() - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala new file mode 100644 index 000000000..7843d796a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala @@ -0,0 +1,222 @@ +package eu.dnetlib.dhp.collection.mag + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val crossrefPath: String = parser.get("crossrefPath") + log.info("found parameters crossrefPath: {}", crossrefPath) + val magBasePath: String = parser.get("magBasePath") + log.info("found parameters magBasePath: {}", magBasePath) + val workingPath: String = parser.get("workingPath") + log.info("found parameters workingPath: {}", workingPath) + generatedDenormalizedMAGTable(spark, crossrefPath, magBasePath, workingPath) + } + + private def generatedDenormalizedMAGTable( + spark: SparkSession, + crossrefPath: String, + magBasePath: String, + workingPath: String + ): Unit = { + + import spark.implicits._ + val schema: StructType = StructType(StructField("DOI", StringType) :: Nil) + + //Filter all the MAG Papers that intersect with a Crossref DOI + val crId = + spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId") + val magPapers = MagUtility + .loadMagEntity(spark, "Papers", magBasePath) + .withColumn("Doi", lower(col("Doi"))) + .where(col("Doi").isNotNull) + + val intersectedPapers: Dataset[Row] = + magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") + intersectedPapers.cache() + intersectedPapers.count() + //log.info("Create current abstract") + + //Abstract is an inverted list, we define a function that convert in string the abstract and recreate + // a table(PaperId, Abstract) + val paperAbstract = MagUtility + .loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath) + .map(s => (s.getLong(0), MagUtility.convertInvertedIndexString(s.getString(1)))) + .withColumnRenamed("_1", "PaperId") + .withColumnRenamed("_2", "Abstract") + + //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract + + val step0 = intersectedPapers + .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left") + .select(intersectedPapers("*"), paperAbstract("Abstract")) + .cache() + + step0.count() + + intersectedPapers.unpersist() + + // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the + //next step we create a table containing + val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath) + val affiliations = MagUtility.loadMagEntity(spark, "Affiliations", magBasePath) + val paaf = MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath) + + val paperAuthorAffiliations = paaf.join(step0, paaf("PaperId") === step0("PaperId"), "leftsemi") + + val j1 = paperAuthorAffiliations + .join(authors, paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner") + .select( + col("PaperId"), + col("AffiliationId"), + col("AuthorSequenceNumber"), + authors("DisplayName").alias("AuthorName"), + authors("AuthorId") + ) + + val paperAuthorAffiliationNormalized = j1 + .join(affiliations, j1("AffiliationId") === affiliations("AffiliationId"), "left") + .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId")) + .groupBy("PaperId") + .agg( + collect_list( + struct("AffiliationId", "AuthorSequenceNumber", "AffiliationName", "AuthorName", "AuthorId", "GridId") + ).alias("authors") + ) + val step1 = step0 + .join(paperAuthorAffiliationNormalized, step0("PaperId") === paperAuthorAffiliationNormalized("PaperId"), "left") + .select(step0("*"), paperAuthorAffiliationNormalized("authors")) + .cache() + step1.count() + + step0.unpersist() + + val conference = MagUtility + .loadMagEntity(spark, "ConferenceInstances", magBasePath) + .select( + $"ConferenceInstanceId", + $"DisplayName".as("conferenceName"), + $"Location".as("conferenceLocation"), + $"StartDate".as("conferenceStartDate"), + $"EndDate".as("conferenceEndDate") + ) + + val step2 = step1 + .join(conference, step1("ConferenceInstanceId") === conference("ConferenceInstanceId"), "left") + .select( + step1("*"), + conference("conferenceName"), + conference("conferenceLocation"), + conference("conferenceStartDate"), + conference("conferenceEndDate") + ) + .cache() + step2.count() + step1.unpersist() + + val fos = MagUtility + .loadMagEntity(spark, "FieldsOfStudy", magBasePath) + .select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") + + val paperFieldsOfStudy = MagUtility + .loadMagEntity(spark, "PaperFieldsOfStudy", magBasePath) + .select($"FieldOfStudyId", $"Score", $"PaperId") + + val paperFoS = paperFieldsOfStudy + .join(broadcast(fos), fos("fos") === paperFieldsOfStudy("FieldOfStudyId")) + .groupBy("PaperId") + .agg(collect_set(struct("FieldOfStudyId", "DisplayName", "MainType", "Score")).as("FoS")) + + val step3 = step2 + .join(paperFoS, step2("PaperId") === paperFoS("PaperId"), "left") + .select(step2("*"), paperFoS("FoS")) + .cache() + step3.count() + + step2.unpersist() + + val journals = MagUtility + .loadMagEntity(spark, "Journals", magBasePath) + .select( + $"JournalId", + $"DisplayName".as("journalName"), + $"Issn".as("journalIssn"), + $"Publisher".as("journalPublisher"), + $"Webpage".as("journalWebpage") + ) + step3 + .join(journals, step3("JournalId") === journals("JournalId"), "left") + .select( + step3("*"), + journals("journalName"), + journals("journalIssn"), + journals("journalPublisher"), + journals("journalWebpage") + ) + .select( + $"PaperId".as("paperId"), + $"Rank".as("rank"), + $"Doi".as("doi"), + $"DocType".as("docType"), + $"PaperTitle".as("paperTitle"), + $"OriginalTitle".as("originalTitle"), + $"BookTitle".as("bookTitle"), + $"Year".as("year"), + $"Date".as("date"), + $"OnlineDate".as("onlineDate"), + $"Publisher".as("publisher"), + $"JournalId".as("journalId"), + $"ConferenceSeriesId".as("conferenceSeriesId"), + $"ConferenceInstanceId".as("conferenceInstanceId"), + $"Volume".as("volume"), + $"Issue".as("issue"), + $"FirstPage".as("firstPage"), + $"LastPage".as("lastPage"), + $"ReferenceCount".as("referenceCount"), + $"CitationCount".as("citationCount"), + $"EstimatedCitation".as("estimatedCitation"), + $"OriginalVenue".as("originalVenue"), + $"FamilyId".as("familyId"), + $"FamilyRank".as("familyRank"), + $"DocSubTypes".as("docSubTypes"), + $"CreatedDate".as("createdDate"), + $"Abstract".as("abstractText"), + $"authors".as("authors"), + $"conferenceName".as("conferenceName"), + $"conferenceLocation".as("conferenceLocation"), + $"conferenceStartDate".as("conferenceStartDate"), + $"conferenceEndDate".as("conferenceEndDate"), + $"FoS".as("fos"), + $"journalName".as("journalName"), + $"journalIssn".as("journalIssn"), + $"journalPublisher".as("journalPublisher"), + $"journalWebpage".as("journalWebpage") + ) + .write + .mode("OverWrite") + .save(s"$workingPath/mag") + step3.unpersist() + } +} + +object SparkCreateMagDenormalizedTable { + + val log: Logger = LoggerFactory.getLogger(SparkCreateMagDenormalizedTable.getClass) + + def main(args: Array[String]): Unit = { + new SparkCreateMagDenormalizedTable( + "/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json", + args, + log + ) + } +}