From cbd4e5e4bbc407ff756b07ae0a4a88d5059fff94 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 8 Mar 2024 16:31:40 +0100 Subject: [PATCH] update mag mapping --- .../mag/convert_MAG_to_OAF_properties.json | 21 ++++ ...ate_MAG_denormalized_table_properites.json | 0 ...ate_MAG_denormalized_table_properties.json | 21 ++++ .../dhp/collection/mag/oozie_app/workflow.xml | 25 ++-- .../dhp/collection/mag/MagUtility.scala | 113 ++++++++++++++++-- .../mag/SparkCreateMagDenormalizedTable.scala | 27 ++--- .../dhp/collection/mag/SparkMAGtoOAF.scala | 38 ++++++ .../dhp/collection/mag/MAGMappingTest.scala | 31 +++++ pom.xml | 2 +- 9 files changed, 236 insertions(+), 42 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala create mode 100644 dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json new file mode 100644 index 000000000..ff2322a76 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "md", + "paramLongName": "mdstorePath", + "paramDescription": "The base path of MAG DUMP CSV Tables", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "The working path", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json deleted file mode 100644 index e69de29bb..000000000 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json new file mode 100644 index 000000000..95a4a8517 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "mp", + "paramLongName": "magBasePath", + "paramDescription": "The base path of MAG DUMP CSV Tables", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "The working path", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml index bcb049a06..91a429d35 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml @@ -1,11 +1,11 @@ - crossrefPath - the path of the native Crossref DUMP + magBasePath + The base path of MAG DUMP CSV Tables - magBasePath + mdstorePath The base path of MAG DUMP CSV Tables @@ -14,12 +14,9 @@ resume_from + generateOAF start Node - - - - @@ -31,17 +28,17 @@ - ${wf:conf('resumeFrom') eq 'generateTable'} + ${wf:conf('resume_from') eq 'generateTable'} - + yarn cluster - Generate ORCID Tables + Generate MAG Table eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable dhp-aggregation-${projectVersion}.jar @@ -55,7 +52,6 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --crossrefPath${crossrefPath} --magBasePath${magBasePath} --workingPath${workingPath} --masteryarn @@ -68,8 +64,8 @@ yarn cluster - Generate ORCID Tables - eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable + MAG TO OAF + eu.dnetlib.dhp.collection.mag.SparkMAGtoOAF dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -82,8 +78,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --crossrefPath${crossrefPath} - --magBasePath${magBasePath} + --mdstorePath${mdstorePath} --workingPath${workingPath} --masteryarn diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 42cee7354..e65e8e020 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -1,11 +1,17 @@ package eu.dnetlib.dhp.collection.mag +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.{Author, Journal, Publication} +import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, PidType} +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse +import scala.collection.JavaConverters._ + case class MAGPaper( paperId: Option[Long], rank: Option[Int], @@ -308,15 +314,14 @@ object MagUtility extends Serializable { def getSchema(streamName: String): StructType = { var schema = new StructType() val d: Seq[String] = stream(streamName)._2 - d.foreach { - case t => - val currentType = t.split(":") - val fieldName: String = currentType.head - var fieldType: String = currentType.last - val nullable: Boolean = fieldType.endsWith("?") - if (nullable) - fieldType = fieldType.replace("?", "") - schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) + d.foreach { case t => + val currentType = t.split(":") + val fieldName: String = currentType.head + var fieldType: String = currentType.last + val nullable: Boolean = fieldType.endsWith("?") + if (nullable) + fieldType = fieldType.replace("?", "") + schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) } schema } @@ -336,6 +341,96 @@ object MagUtility extends Serializable { } + def convertMAGtoOAF(paper: MAGPaper): Publication = { + + if (paper.doi.isDefined) { + val pub = new Publication + pub.setPid( + List( + structuredProperty( + paper.doi.get, + qualifier( + PidType.doi.toString, + PidType.doi.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null + ) + ).asJava + ) + + pub.setOriginalId(List(paper.paperId.get.toString, paper.doi.get).asJava) + + //IMPORTANT + //The old method result.setId(generateIdentifier(result, doi)) + //will be replaced using IdentifierFactory + pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub)) + + val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null) + + val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) + + pub.setTitle(List(mainTitles, originalTitles).asJava) + + if (paper.bookTitle.isDefined) + pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava) + if (paper.abstractText.isDefined) + pub.setDescription(List(field(paper.abstractText.get, null)).asJava) + if (paper.authors.isDefined && paper.authors.get.nonEmpty) { + pub.setAuthor( + paper.authors.get + .filter(a => a.AuthorName.isDefined) + .map(a => { + val author = new Author + author.setFullname(a.AuthorName.get) + if (a.AffiliationName.isDefined) + author.setAffiliation(List(field(a.AffiliationName.get, null)).asJava) + author.setPid( + List( + structuredProperty( + s"https://academic.microsoft.com/#/detail/${a.AuthorId.get}", + qualifier("url", "url", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES), + null + ) + ).asJava + ) + author + }) + .asJava + ) + + } + + if (paper.date.isDefined) + pub.setDateofacceptance(field(paper.date.get, null)) + + if (paper.publisher.isDefined) + pub.setPublisher(field(paper.publisher.get, null)) + + if (paper.journalId.isDefined && paper.journalName.isDefined) { + val j = new Journal + + j.setName(paper.journalName.get) + j.setSp(paper.firstPage.orNull) + j.setEp(paper.lastPage.orNull) + if (paper.publisher.isDefined) + pub.setPublisher(field(paper.publisher.get, null)) + j.setIssnPrinted(paper.journalIssn.orNull) + j.setVol(paper.volume.orNull) + j.setIss(paper.issue.orNull) + j.setConferenceplace(paper.conferenceLocation.orNull) + j.setEdition(paper.conferenceName.orNull) + pub.setJournal(j) + } + + pub + } else { + null + } + + } + def convertInvertedIndexString(json_input: String): String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(json_input) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala index 7843d796a..73fdbe1f7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala @@ -13,18 +13,15 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String], * where the whole logic of the spark node is defined */ override def run(): Unit = { - val crossrefPath: String = parser.get("crossrefPath") - log.info("found parameters crossrefPath: {}", crossrefPath) val magBasePath: String = parser.get("magBasePath") log.info("found parameters magBasePath: {}", magBasePath) val workingPath: String = parser.get("workingPath") log.info("found parameters workingPath: {}", workingPath) - generatedDenormalizedMAGTable(spark, crossrefPath, magBasePath, workingPath) + generatedDenormalizedMAGTable(spark, magBasePath, workingPath) } private def generatedDenormalizedMAGTable( spark: SparkSession, - crossrefPath: String, magBasePath: String, workingPath: String ): Unit = { @@ -33,17 +30,13 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String], val schema: StructType = StructType(StructField("DOI", StringType) :: Nil) //Filter all the MAG Papers that intersect with a Crossref DOI - val crId = - spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId") + val magPapers = MagUtility .loadMagEntity(spark, "Papers", magBasePath) .withColumn("Doi", lower(col("Doi"))) - .where(col("Doi").isNotNull) - val intersectedPapers: Dataset[Row] = - magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") - intersectedPapers.cache() - intersectedPapers.count() + magPapers.cache() + magPapers.count() //log.info("Create current abstract") //Abstract is an inverted list, we define a function that convert in string the abstract and recreate @@ -56,14 +49,14 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String], //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract - val step0 = intersectedPapers - .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left") - .select(intersectedPapers("*"), paperAbstract("Abstract")) + val step0 = magPapers + .join(paperAbstract, magPapers("PaperId") === paperAbstract("PaperId"), "left") + .select(magPapers("*"), paperAbstract("Abstract")) .cache() step0.count() - intersectedPapers.unpersist() + magPapers.unpersist() // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the //next step we create a table containing @@ -214,9 +207,9 @@ object SparkCreateMagDenormalizedTable { def main(args: Array[String]): Unit = { new SparkCreateMagDenormalizedTable( - "/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json", + "/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json", args, log - ) + ).initialize().run() } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala new file mode 100644 index 000000000..60367263a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala @@ -0,0 +1,38 @@ +package eu.dnetlib.dhp.collection.mag + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val mdstorePath: String = parser.get("mdstorePath") + log.info("found parameters mdstorePath: {}", mdstorePath) + val workingPath: String = parser.get("workingPath") + log.info("found parameters workingPath: {}", workingPath) + convertMAG(spark, workingPath, mdstorePath) + } + + def convertMAG(spark: SparkSession, workingPath: String, mdStorePath: String): Unit = { + import spark.implicits._ + val papers = spark.read.load(s"$workingPath/mag").as[MAGPaper] + val total = papers.count() + log.info(s"TOTAL PAPERS: $total") + } +} + +object SparkMAGtoOAF { + + val log: Logger = LoggerFactory.getLogger(SparkMAGtoOAF.getClass) + + def main(args: Array[String]): Unit = { + new SparkMAGtoOAF("/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json", args, log) + .initialize() + .run() + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala new file mode 100644 index 000000000..34b1538ff --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala @@ -0,0 +1,31 @@ +package eu.dnetlib.dhp.collection.mag + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.col +import org.junit.jupiter.api.Test + +class MAGMappingTest { + + val mapper = new ObjectMapper() + + @Test + def mappingTest(): Unit = { + + val spark = SparkSession + .builder() + .appName("Test") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + val magDS = spark.read.load("/home/sandro/Downloads/mag").as[MAGPaper].where(col("journalId").isNotNull) + + val paper = magDS.first() + + print(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(MagUtility.convertMAGtoOAF(paper))) + + } + +} diff --git a/pom.xml b/pom.xml index 6ef320253..8c9cdafca 100644 --- a/pom.xml +++ b/pom.xml @@ -888,7 +888,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [4.17.2] + [5.17.4-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6]