update mag mapping

This commit is contained in:
Sandro La Bruzzo 2024-03-08 16:31:40 +01:00
parent d34cef3f8d
commit cbd4e5e4bb
9 changed files with 236 additions and 42 deletions

View File

@ -0,0 +1,21 @@
[
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
},
{
"paramName": "md",
"paramLongName": "mdstorePath",
"paramDescription": "The base path of MAG DUMP CSV Tables",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "The working path",
"paramRequired": false
}
]

View File

@ -0,0 +1,21 @@
[
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
},
{
"paramName": "mp",
"paramLongName": "magBasePath",
"paramDescription": "The base path of MAG DUMP CSV Tables",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "The working path",
"paramRequired": false
}
]

View File

@ -1,11 +1,11 @@
<workflow-app name="generate_MAG_Datasource" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="generate_MAG_Datasource" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>crossrefPath</name> <name>magBasePath</name>
<description>the path of the native Crossref DUMP</description> <description>The base path of MAG DUMP CSV Tables</description>
</property> </property>
<property> <property>
<name>magBasePath</name> <name>mdstorePath</name>
<description>The base path of MAG DUMP CSV Tables</description> <description>The base path of MAG DUMP CSV Tables</description>
</property> </property>
<property> <property>
@ -14,12 +14,9 @@
</property> </property>
<property> <property>
<name>resume_from</name> <name>resume_from</name>
<value>generateOAF</value>
<description>start Node</description> <description>start Node</description>
</property> </property>
</parameters> </parameters>
<start to="resume_from"/> <start to="resume_from"/>
@ -31,17 +28,17 @@
<decision name="resume_from"> <decision name="resume_from">
<switch> <switch>
<case to="generateTable">${wf:conf('resumeFrom') eq 'generateTable'}</case> <case to="generateTable">${wf:conf('resume_from') eq 'generateTable'}</case>
<default to="generateOAF"/> <!-- first action to be done when downloadDump is to be performed --> <default to="generateOAF"/> <!-- first action to be done when downloadDump is to be performed -->
</switch> </switch>
</decision> </decision>
<action name="generateTables"> <action name="generateTable">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Generate ORCID Tables</name> <name>Generate MAG Table</name>
<class>eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable</class> <class>eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -55,7 +52,6 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--crossrefPath</arg><arg>${crossrefPath}</arg>
<arg>--magBasePath</arg><arg>${magBasePath}</arg> <arg>--magBasePath</arg><arg>${magBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
@ -68,8 +64,8 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Generate ORCID Tables</name> <name>MAG TO OAF</name>
<class>eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable</class> <class>eu.dnetlib.dhp.collection.mag.SparkMAGtoOAF</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -82,8 +78,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--crossrefPath</arg><arg>${crossrefPath}</arg> <arg>--mdstorePath</arg><arg>${mdstorePath}</arg>
<arg>--magBasePath</arg><arg>${magBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>

View File

@ -1,11 +1,17 @@
package eu.dnetlib.dhp.collection.mag package eu.dnetlib.dhp.collection.mag
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, Journal, Publication}
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, PidType}
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
case class MAGPaper( case class MAGPaper(
paperId: Option[Long], paperId: Option[Long],
rank: Option[Int], rank: Option[Int],
@ -308,15 +314,14 @@ object MagUtility extends Serializable {
def getSchema(streamName: String): StructType = { def getSchema(streamName: String): StructType = {
var schema = new StructType() var schema = new StructType()
val d: Seq[String] = stream(streamName)._2 val d: Seq[String] = stream(streamName)._2
d.foreach { d.foreach { case t =>
case t => val currentType = t.split(":")
val currentType = t.split(":") val fieldName: String = currentType.head
val fieldName: String = currentType.head var fieldType: String = currentType.last
var fieldType: String = currentType.last val nullable: Boolean = fieldType.endsWith("?")
val nullable: Boolean = fieldType.endsWith("?") if (nullable)
if (nullable) fieldType = fieldType.replace("?", "")
fieldType = fieldType.replace("?", "") schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
} }
schema schema
} }
@ -336,6 +341,96 @@ object MagUtility extends Serializable {
} }
def convertMAGtoOAF(paper: MAGPaper): Publication = {
if (paper.doi.isDefined) {
val pub = new Publication
pub.setPid(
List(
structuredProperty(
paper.doi.get,
qualifier(
PidType.doi.toString,
PidType.doi.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES
),
null
)
).asJava
)
pub.setOriginalId(List(paper.paperId.get.toString, paper.doi.get).asJava)
//IMPORTANT
//The old method result.setId(generateIdentifier(result, doi))
//will be replaced using IdentifierFactory
pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub))
val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null)
val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null)
pub.setTitle(List(mainTitles, originalTitles).asJava)
if (paper.bookTitle.isDefined)
pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava)
if (paper.abstractText.isDefined)
pub.setDescription(List(field(paper.abstractText.get, null)).asJava)
if (paper.authors.isDefined && paper.authors.get.nonEmpty) {
pub.setAuthor(
paper.authors.get
.filter(a => a.AuthorName.isDefined)
.map(a => {
val author = new Author
author.setFullname(a.AuthorName.get)
if (a.AffiliationName.isDefined)
author.setAffiliation(List(field(a.AffiliationName.get, null)).asJava)
author.setPid(
List(
structuredProperty(
s"https://academic.microsoft.com/#/detail/${a.AuthorId.get}",
qualifier("url", "url", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES),
null
)
).asJava
)
author
})
.asJava
)
}
if (paper.date.isDefined)
pub.setDateofacceptance(field(paper.date.get, null))
if (paper.publisher.isDefined)
pub.setPublisher(field(paper.publisher.get, null))
if (paper.journalId.isDefined && paper.journalName.isDefined) {
val j = new Journal
j.setName(paper.journalName.get)
j.setSp(paper.firstPage.orNull)
j.setEp(paper.lastPage.orNull)
if (paper.publisher.isDefined)
pub.setPublisher(field(paper.publisher.get, null))
j.setIssnPrinted(paper.journalIssn.orNull)
j.setVol(paper.volume.orNull)
j.setIss(paper.issue.orNull)
j.setConferenceplace(paper.conferenceLocation.orNull)
j.setEdition(paper.conferenceName.orNull)
pub.setJournal(j)
}
pub
} else {
null
}
}
def convertInvertedIndexString(json_input: String): String = { def convertInvertedIndexString(json_input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(json_input) lazy val json: json4s.JValue = parse(json_input)

View File

@ -13,18 +13,15 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
* where the whole logic of the spark node is defined * where the whole logic of the spark node is defined
*/ */
override def run(): Unit = { override def run(): Unit = {
val crossrefPath: String = parser.get("crossrefPath")
log.info("found parameters crossrefPath: {}", crossrefPath)
val magBasePath: String = parser.get("magBasePath") val magBasePath: String = parser.get("magBasePath")
log.info("found parameters magBasePath: {}", magBasePath) log.info("found parameters magBasePath: {}", magBasePath)
val workingPath: String = parser.get("workingPath") val workingPath: String = parser.get("workingPath")
log.info("found parameters workingPath: {}", workingPath) log.info("found parameters workingPath: {}", workingPath)
generatedDenormalizedMAGTable(spark, crossrefPath, magBasePath, workingPath) generatedDenormalizedMAGTable(spark, magBasePath, workingPath)
} }
private def generatedDenormalizedMAGTable( private def generatedDenormalizedMAGTable(
spark: SparkSession, spark: SparkSession,
crossrefPath: String,
magBasePath: String, magBasePath: String,
workingPath: String workingPath: String
): Unit = { ): Unit = {
@ -33,17 +30,13 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
val schema: StructType = StructType(StructField("DOI", StringType) :: Nil) val schema: StructType = StructType(StructField("DOI", StringType) :: Nil)
//Filter all the MAG Papers that intersect with a Crossref DOI //Filter all the MAG Papers that intersect with a Crossref DOI
val crId =
spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId")
val magPapers = MagUtility val magPapers = MagUtility
.loadMagEntity(spark, "Papers", magBasePath) .loadMagEntity(spark, "Papers", magBasePath)
.withColumn("Doi", lower(col("Doi"))) .withColumn("Doi", lower(col("Doi")))
.where(col("Doi").isNotNull)
val intersectedPapers: Dataset[Row] = magPapers.cache()
magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi") magPapers.count()
intersectedPapers.cache()
intersectedPapers.count()
//log.info("Create current abstract") //log.info("Create current abstract")
//Abstract is an inverted list, we define a function that convert in string the abstract and recreate //Abstract is an inverted list, we define a function that convert in string the abstract and recreate
@ -56,14 +49,14 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
//We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract
val step0 = intersectedPapers val step0 = magPapers
.join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left") .join(paperAbstract, magPapers("PaperId") === paperAbstract("PaperId"), "left")
.select(intersectedPapers("*"), paperAbstract("Abstract")) .select(magPapers("*"), paperAbstract("Abstract"))
.cache() .cache()
step0.count() step0.count()
intersectedPapers.unpersist() magPapers.unpersist()
// We have three table Author, Affiliation, and PaperAuthorAffiliation, in the // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the
//next step we create a table containing //next step we create a table containing
@ -214,9 +207,9 @@ object SparkCreateMagDenormalizedTable {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
new SparkCreateMagDenormalizedTable( new SparkCreateMagDenormalizedTable(
"/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json", "/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properties.json",
args, args,
log log
) ).initialize().run()
} }
} }

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.collection.mag
import eu.dnetlib.dhp.application.AbstractScalaApplication
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val mdstorePath: String = parser.get("mdstorePath")
log.info("found parameters mdstorePath: {}", mdstorePath)
val workingPath: String = parser.get("workingPath")
log.info("found parameters workingPath: {}", workingPath)
convertMAG(spark, workingPath, mdstorePath)
}
def convertMAG(spark: SparkSession, workingPath: String, mdStorePath: String): Unit = {
import spark.implicits._
val papers = spark.read.load(s"$workingPath/mag").as[MAGPaper]
val total = papers.count()
log.info(s"TOTAL PAPERS: $total")
}
}
object SparkMAGtoOAF {
val log: Logger = LoggerFactory.getLogger(SparkMAGtoOAF.getClass)
def main(args: Array[String]): Unit = {
new SparkMAGtoOAF("/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json", args, log)
.initialize()
.run()
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.collection.mag
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.junit.jupiter.api.Test
class MAGMappingTest {
val mapper = new ObjectMapper()
@Test
def mappingTest(): Unit = {
val spark = SparkSession
.builder()
.appName("Test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val magDS = spark.read.load("/home/sandro/Downloads/mag").as[MAGPaper].where(col("journalId").isNotNull)
val paper = magDS.first()
print(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(MagUtility.convertMAGtoOAF(paper)))
}
}

View File

@ -888,7 +888,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[4.17.2]</dhp-schemas.version> <dhp-schemas.version>[5.17.4-SNAPSHOT]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version> <dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version> <dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version> <dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>