1
0
Fork 0

Implemented first part of the new MAG mapping

This commit is contained in:
Sandro La Bruzzo 2024-04-03 17:07:14 +02:00
parent 73a67c0e4a
commit 6f3e925cae
5 changed files with 309 additions and 89 deletions

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.collection.crossref package eu.dnetlib.dhp.collection.crossref
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
@ -15,6 +15,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.{
} }
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.Row
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST._ import org.json4s.JsonAST._
@ -650,6 +651,33 @@ case object Crossref2Oaf {
r r
} }
def generateAffliation(input: Row): List[String] = {
val doi = input.getString(0)
val rorId = input.getString(1)
val pubId = s"50|${PidType.doi.toString.padTo(12, "_")}::${DoiCleaningRule.normalizeDoi(doi)}"
val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId)
val r: Relation = new Relation
DoiCleaningRule.clean(doi)
r.setSource(pubId)
r.setTarget(affId)
r.setRelType(ModelConstants.RESULT_ORGANIZATION)
r.setRelClass(ModelConstants.HAS_AUTHOR_INSTITUTION)
r.setSubRelType(ModelConstants.AFFILIATION)
r.setDataInfo(generateDataInfo())
r.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava)
val r1: Relation = new Relation
r1.setTarget(pubId)
r1.setSource(affId)
r1.setRelType(ModelConstants.RESULT_ORGANIZATION)
r1.setRelClass(ModelConstants.IS_AUTHOR_INSTITUTION_OF)
r1.setSubRelType(ModelConstants.AFFILIATION)
r1.setDataInfo(generateDataInfo())
r1.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava)
List(mapper.writeValueAsString(r), mapper.writeValueAsString(r1))
}
def convert(input: String, vocabularies: VocabularyGroup, mode: TransformationType): List[Oaf] = { def convert(input: String, vocabularies: VocabularyGroup, mode: TransformationType): List[Oaf] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input) lazy val json: json4s.JValue = parse(input)

View File

@ -7,7 +7,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset}
import eu.dnetlib.dhp.utils.ISLookupClientFactory import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lower} import org.apache.spark.sql.functions.{col, explode, lower}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -104,6 +104,22 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.text(s"$targetPath") .text(s"$targetPath")
// Generate affiliation relations:
spark.read
.json(sourcePath)
.select(col("DOI"), explode(col("author.affiliation")).alias("affiliations"))
.select(col("DOI"), explode(col("affiliations.id")).alias("aids"))
.where("aids is not null")
.select(col("DOI"), explode(col("aids")).alias("aff"))
.select(col("DOI"), col("aff.id").alias("id"), col("aff.id-type").alias("idType"))
.where(col("idType").like("ROR"))
.flatMap(r => Crossref2Oaf.generateAffliation(r))
.write
.mode(SaveMode.Append)
.option("compression", "gzip")
.text(s"$targetPath")
} }
} }

View File

@ -1,9 +1,12 @@
package eu.dnetlib.dhp.collection.mag package eu.dnetlib.dhp.collection.mag
import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, Journal, Publication} import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, PidType} import eu.dnetlib.dhp.schema.oaf.{ Dataset => OafDataset, Author, DataInfo, Instance, Journal, Publication, Qualifier, Result}
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils, PidType}
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._
import eu.dnetlib.dhp.utils
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.json4s import org.json4s
@ -304,14 +307,15 @@ object MagUtility extends Serializable {
def getSchema(streamName: String): StructType = { def getSchema(streamName: String): StructType = {
var schema = new StructType() var schema = new StructType()
val d: Seq[String] = stream(streamName)._2 val d: Seq[String] = stream(streamName)._2
d.foreach { case t => d.foreach {
val currentType = t.split(":") case t =>
val fieldName: String = currentType.head val currentType = t.split(":")
var fieldType: String = currentType.last val fieldName: String = currentType.head
val nullable: Boolean = fieldType.endsWith("?") var fieldType: String = currentType.last
if (nullable) val nullable: Boolean = fieldType.endsWith("?")
fieldType = fieldType.replace("?", "") if (nullable)
schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) fieldType = fieldType.replace("?", "")
schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
} }
schema schema
} }
@ -331,12 +335,152 @@ object MagUtility extends Serializable {
} }
def getInstanceType(magType: Option[String], source: Option[String]): Result = {
var result:Result = null
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust("0.9")
di.setProvenanceaction(
OafMapperUtils.qualifier(
ModelConstants.SYSIMPORT_ACTIONSET,
ModelConstants.SYSIMPORT_ACTIONSET,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS
)
)
if (magType== null) {
result = new Publication
result.setDataInfo(di)
val i =new Instance
i.setInstancetype(qualifier(
"0038",
"Other literature type",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
))
result.setInstance(List(i).asJava)
return result
}
val currentType: String = magType.get
val tp = currentType.toLowerCase match {
case "book" =>
result = new Publication
qualifier("0002", "Book", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)
case "bookchapter" =>
result = new Publication
qualifier(
"00013",
"Part of book or chapter of book",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
case "journal" =>
result = new Publication
qualifier("0043", "Journal", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)
case "patent" =>
if (source.nonEmpty) {
val s = source.get.toLowerCase
if (s.contains("patent") || s.contains("brevet")) {
result = new Publication
qualifier(
"0019",
"Patent",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
} else if (s.contains("journal of")) {
result = new Publication
qualifier(
"0043",
"Journal",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
} else if (s.contains("proceedings") || s.contains("conference") || s.contains("workshop") || s.contains(
"symposium"
)) {
result = new Publication
qualifier(
"0001",
"Article",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
} else null
} else null
case "repository" =>
result = new Publication()
di.setInvisible(true)
qualifier(
"0038",
"Other literature type",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
case "thesis" =>
result = new Publication
qualifier(
"0044",
"Thesis",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
case "dataset" =>
result = new OafDataset
qualifier(
"0021",
"Dataset",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
case "conference" =>
result = new Publication
qualifier(
"0001",
"Article",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
}
if (result != null) {
result.setDataInfo(di)
val i =new Instance
i.setInstancetype(tp)
result.setInstance(List(i).asJava)
}
result
}
def convertMAGtoOAF(paper: MAGPaper): Publication = { def convertMAGtoOAF(paper: MAGPaper): Publication = {
val pub = new Publication
val magPid = structuredProperty(
paper.doi.get,
qualifier(
PidType.mag_id.toString,
PidType.mag_id.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES
),
null
)
if (paper.doi.isDefined) { if (paper.doi.isDefined) {
val pub = new Publication
pub.setPid( pub.setPid(
List( List(
magPid,
structuredProperty( structuredProperty(
paper.doi.get, paper.doi.get,
qualifier( qualifier(
@ -349,76 +493,65 @@ object MagUtility extends Serializable {
) )
).asJava ).asJava
) )
pub.setOriginalId(List(paper.paperId.get.toString, paper.doi.get).asJava) pub.setOriginalId(List(paper.paperId.get.toString, paper.doi.get).asJava)
} else {
pub.setPid(
List(
magPid
).asJava
)
pub.setOriginalId(List(paper.paperId.get.toString).asJava)
}
//IMPORTANT pub.setId(s"50|mag_________::${DHPUtils.md5(paper.paperId.get.toString)}")
//The old method result.setId(generateIdentifier(result, doi))
//will be replaced using IdentifierFactory
pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub))
val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null) val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null)
val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null)
pub.setTitle(List(mainTitles, originalTitles).asJava) pub.setTitle(List(mainTitles, originalTitles).asJava)
if (paper.bookTitle.isDefined) if (paper.bookTitle.isDefined)
pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava) pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava)
if (paper.abstractText.isDefined) if (paper.abstractText.isDefined)
pub.setDescription(List(field(paper.abstractText.get, null)).asJava) pub.setDescription(List(field(paper.abstractText.get, null)).asJava)
if (paper.authors.isDefined && paper.authors.get.nonEmpty) { if (paper.authors.isDefined && paper.authors.get.nonEmpty) {
pub.setAuthor( pub.setAuthor(
paper.authors.get paper.authors.get
.filter(a => a.AuthorName.isDefined) .filter(a => a.AuthorName.isDefined)
.map(a => { .map(a => {
val author = new Author val author = new Author
author.setFullname(a.AuthorName.get) author.setFullname(a.AuthorName.get)
if (a.AffiliationName.isDefined) author
author.setAffiliation(List(field(a.AffiliationName.get, null)).asJava) })
author.setPid( .asJava
List( )
structuredProperty( }
s"https://academic.microsoft.com/#/detail/${a.AuthorId.get}",
qualifier("url", "url", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES),
null
)
).asJava
)
author
})
.asJava
)
} if (paper.date.isDefined)
pub.setDateofacceptance(field(paper.date.get, null))
if (paper.date.isDefined) if (paper.publisher.isDefined)
pub.setDateofacceptance(field(paper.date.get, null)) pub.setPublisher(field(paper.publisher.get, null))
if (paper.journalId.isDefined && paper.journalName.isDefined) {
val j = new Journal
j.setName(paper.journalName.get)
j.setSp(paper.firstPage.orNull)
j.setEp(paper.lastPage.orNull)
if (paper.publisher.isDefined) if (paper.publisher.isDefined)
pub.setPublisher(field(paper.publisher.get, null)) pub.setPublisher(field(paper.publisher.get, null))
j.setIssnPrinted(paper.journalIssn.orNull)
if (paper.journalId.isDefined && paper.journalName.isDefined) { j.setVol(paper.volume.orNull)
val j = new Journal j.setIss(paper.issue.orNull)
j.setConferenceplace(paper.conferenceLocation.orNull)
j.setName(paper.journalName.get) j.setEdition(paper.conferenceName.orNull)
j.setSp(paper.firstPage.orNull) pub.setJournal(j)
j.setEp(paper.lastPage.orNull)
if (paper.publisher.isDefined)
pub.setPublisher(field(paper.publisher.get, null))
j.setIssnPrinted(paper.journalIssn.orNull)
j.setVol(paper.volume.orNull)
j.setIss(paper.issue.orNull)
j.setConferenceplace(paper.conferenceLocation.orNull)
j.setEdition(paper.conferenceName.orNull)
pub.setJournal(j)
}
pub
} else {
null
} }
pub
} }
def convertInvertedIndexString(json_input: String): String = { def convertInvertedIndexString(json_input: String): String = {

View File

@ -2,13 +2,11 @@ package eu.dnetlib.dhp.collection.crossref
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import org.apache.spark.sql.SparkSession import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.junit.jupiter.MockitoExtension import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
@ExtendWith(Array(classOf[MockitoExtension])) @ExtendWith(Array(classOf[MockitoExtension]))
class CrossrefMappingTest extends AbstractVocabularyTest { class CrossrefMappingTest extends AbstractVocabularyTest {
@ -21,21 +19,6 @@ class CrossrefMappingTest extends AbstractVocabularyTest {
super.setUpVocabulary() super.setUpVocabulary()
} }
def testMapping(): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("TransformCrossref").getOrCreate()
val s = new SparkMapDumpIntoOAF(null, null, null)
import spark.implicits._
s.transformCrossref(
spark,
sourcePath = "/home/sandro/Downloads/crossref",
targetPath = "/home/sandro/Downloads/crossref_transformed",
unpaywallPath = null,
vocabularies = vocabularies
)
print(spark.read.text("/home/sandro/Downloads/crossref_transformed").count)
}
} }

View File

@ -1,15 +1,19 @@
package eu.dnetlib.dhp.collection.mag package eu.dnetlib.dhp.collection.mag
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.oaf.{Dataset, Publication, Result}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.col
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
class MAGMappingTest { class MAGMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@Test
def mappingTest(): Unit = { def mappingTest(): Unit = {
val spark = SparkSession val spark = SparkSession
@ -28,4 +32,60 @@ class MAGMappingTest {
} }
@Test
def mappingMagType(): Unit = {
/*
+-----------+--------+
| docType| count|
+-----------+--------+
| null|79939635|
|BookChapter| 2431452|
| Dataset| 123923|
| Repository| 5044165|
| Thesis| 5525681|
| Conference| 5196866|
| Journal|89452763|
| Book| 4460017|
| Patent|64631955|
+-----------+--------+
"instancetype":{
"classid":"0001",
"classname":"Article",
"schemeid":"dnet:publication_resource",
"schemename":"dnet:publication_resource"},"instanceTypeMapping":[{"originalType":"journal-article","typeCode":null,"typeLabel":null,"vocabularyName":"openaire::coar_resource_types_3_1"}
*/
checkResult[Publication](MagUtility.getInstanceType(null, null), invisible = false,"Other literature type")
checkResult[Publication](MagUtility.getInstanceType(Some("BookChapter"), null), invisible = false,"Part of book or chapter of book")
checkResult[Publication](MagUtility.getInstanceType(Some("Book"), null), invisible = false,"Book")
checkResult[Publication](MagUtility.getInstanceType(Some("Repository"), null), invisible = true,"Other literature type")
checkResult[Publication](MagUtility.getInstanceType(Some("Thesis"), null), invisible = false,"Thesis")
checkResult[Publication](MagUtility.getInstanceType(Some("Conference"), null), invisible = false,"Article")
checkResult[Publication](MagUtility.getInstanceType(Some("Journal"), null), invisible = false,"Journal")
checkResult[Dataset](MagUtility.getInstanceType(Some("Dataset"), null), invisible = false,"Dataset")
checkResult[Publication](MagUtility.getInstanceType(Some("Patent"), Some("Patent Department of the Navy")), invisible = false,"Patent")
checkResult[Dataset](MagUtility.getInstanceType(Some("Dataset"), null), invisible = false,"Dataset")
}
def checkResult[T](r:Result, invisible:Boolean, typeName:String): Unit = {
assertNotNull(r)
assertTrue(r.isInstanceOf[T])
assertNotNull(r.getDataInfo)
assertEquals( invisible ,r.getDataInfo.getInvisible)
assertNotNull(r.getInstance())
assertTrue(r.getInstance().size()>0)
assertNotNull(r.getInstance().get(0).getInstancetype)
assertEquals(typeName, r.getInstance().get(0).getInstancetype.getClassname)
}
} }