From 6f3e925cae3ddf9d4f68037d2dccf622f3ad4eb5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 3 Apr 2024 17:07:14 +0200 Subject: [PATCH] Implemented first part of the new MAG mapping --- .../collection/crossref/Crossref2Oaf.scala | 30 +- .../crossref/SparkMapDumpIntoOAF.scala | 18 +- .../dhp/collection/mag/MagUtility.scala | 269 +++++++++++++----- .../crossref/CrossrefMappingTest.scala | 19 +- .../dhp/collection/mag/MAGMappingTest.scala | 62 +++- 5 files changed, 309 insertions(+), 89 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index 3b0ee8ccad..c48d011053 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.collection.crossref import com.fasterxml.jackson.databind.ObjectMapper - +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf._ @@ -15,6 +15,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.{ } import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang.StringUtils +import org.apache.spark.sql.Row import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST._ @@ -650,6 +651,33 @@ case object Crossref2Oaf { r } + def generateAffliation(input: Row): List[String] = { + val doi = input.getString(0) + val rorId = input.getString(1) + + val pubId = s"50|${PidType.doi.toString.padTo(12, "_")}::${DoiCleaningRule.normalizeDoi(doi)}" + val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId) + + val r: Relation = new Relation + DoiCleaningRule.clean(doi) + r.setSource(pubId) + r.setTarget(affId) + r.setRelType(ModelConstants.RESULT_ORGANIZATION) + r.setRelClass(ModelConstants.HAS_AUTHOR_INSTITUTION) + r.setSubRelType(ModelConstants.AFFILIATION) + r.setDataInfo(generateDataInfo()) + r.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava) + val r1: Relation = new Relation + r1.setTarget(pubId) + r1.setSource(affId) + r1.setRelType(ModelConstants.RESULT_ORGANIZATION) + r1.setRelClass(ModelConstants.IS_AUTHOR_INSTITUTION_OF) + r1.setSubRelType(ModelConstants.AFFILIATION) + r1.setDataInfo(generateDataInfo()) + r1.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava) + List(mapper.writeValueAsString(r), mapper.writeValueAsString(r1)) + } + def convert(input: String, vocabularies: VocabularyGroup, mode: TransformationType): List[Oaf] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala index 85f1b86c61..8377fa4384 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala @@ -7,7 +7,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.sql._ -import org.apache.spark.sql.functions.{col, lower} +import org.apache.spark.sql.functions.{col, explode, lower} import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} @@ -104,6 +104,22 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger .mode(SaveMode.Append) .option("compression", "gzip") .text(s"$targetPath") + + // Generate affiliation relations: + spark.read + .json(sourcePath) + .select(col("DOI"), explode(col("author.affiliation")).alias("affiliations")) + .select(col("DOI"), explode(col("affiliations.id")).alias("aids")) + .where("aids is not null") + .select(col("DOI"), explode(col("aids")).alias("aff")) + .select(col("DOI"), col("aff.id").alias("id"), col("aff.id-type").alias("idType")) + .where(col("idType").like("ROR")) + .flatMap(r => Crossref2Oaf.generateAffliation(r)) + .write + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(s"$targetPath") + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index cc2d7b62f3..b9a64dedaa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -1,9 +1,12 @@ package eu.dnetlib.dhp.collection.mag import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.{Author, Journal, Publication} -import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, PidType} +import eu.dnetlib.dhp.schema.oaf +import eu.dnetlib.dhp.schema.oaf.{ Dataset => OafDataset, Author, DataInfo, Instance, Journal, Publication, Qualifier, Result} +import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils, PidType} import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._ +import eu.dnetlib.dhp.utils +import eu.dnetlib.dhp.utils.DHPUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.json4s @@ -304,14 +307,15 @@ object MagUtility extends Serializable { def getSchema(streamName: String): StructType = { var schema = new StructType() val d: Seq[String] = stream(streamName)._2 - d.foreach { case t => - val currentType = t.split(":") - val fieldName: String = currentType.head - var fieldType: String = currentType.last - val nullable: Boolean = fieldType.endsWith("?") - if (nullable) - fieldType = fieldType.replace("?", "") - schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) + d.foreach { + case t => + val currentType = t.split(":") + val fieldName: String = currentType.head + var fieldType: String = currentType.last + val nullable: Boolean = fieldType.endsWith("?") + if (nullable) + fieldType = fieldType.replace("?", "") + schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) } schema } @@ -331,12 +335,152 @@ object MagUtility extends Serializable { } + def getInstanceType(magType: Option[String], source: Option[String]): Result = { + + + + var result:Result = null + val di = new DataInfo + di.setDeletedbyinference(false) + di.setInferred(false) + di.setInvisible(false) + di.setTrust("0.9") + di.setProvenanceaction( + OafMapperUtils.qualifier( + ModelConstants.SYSIMPORT_ACTIONSET, + ModelConstants.SYSIMPORT_ACTIONSET, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS + ) + ) + if (magType== null) { + result = new Publication + result.setDataInfo(di) + val i =new Instance + i.setInstancetype(qualifier( + "0038", + "Other literature type", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + )) + + result.setInstance(List(i).asJava) + return result + } + + val currentType: String = magType.get + + val tp = currentType.toLowerCase match { + case "book" => + result = new Publication + qualifier("0002", "Book", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE) + case "bookchapter" => + result = new Publication + qualifier( + "00013", + "Part of book or chapter of book", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + case "journal" => + result = new Publication + qualifier("0043", "Journal", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE) + case "patent" => + if (source.nonEmpty) { + val s = source.get.toLowerCase + if (s.contains("patent") || s.contains("brevet")) { + result = new Publication + qualifier( + "0019", + "Patent", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + } else if (s.contains("journal of")) { + result = new Publication + qualifier( + "0043", + "Journal", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + } else if (s.contains("proceedings") || s.contains("conference") || s.contains("workshop") || s.contains( + "symposium" + )) { + result = new Publication + qualifier( + "0001", + "Article", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + } else null + } else null + + case "repository" => + result = new Publication() + di.setInvisible(true) + qualifier( + "0038", + "Other literature type", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + + case "thesis" => + result = new Publication + qualifier( + "0044", + "Thesis", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + case "dataset" => + result = new OafDataset + qualifier( + "0021", + "Dataset", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + case "conference" => + result = new Publication + qualifier( + "0001", + "Article", + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + } + + if (result != null) { + result.setDataInfo(di) + val i =new Instance + i.setInstancetype(tp) + result.setInstance(List(i).asJava) + } + result + + } + def convertMAGtoOAF(paper: MAGPaper): Publication = { + val pub = new Publication + + val magPid = structuredProperty( + paper.doi.get, + qualifier( + PidType.mag_id.toString, + PidType.mag_id.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null + ) if (paper.doi.isDefined) { - val pub = new Publication pub.setPid( List( + magPid, structuredProperty( paper.doi.get, qualifier( @@ -349,76 +493,65 @@ object MagUtility extends Serializable { ) ).asJava ) - pub.setOriginalId(List(paper.paperId.get.toString, paper.doi.get).asJava) + } else { + pub.setPid( + List( + magPid + ).asJava + ) + pub.setOriginalId(List(paper.paperId.get.toString).asJava) + } - //IMPORTANT - //The old method result.setId(generateIdentifier(result, doi)) - //will be replaced using IdentifierFactory - pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub)) + pub.setId(s"50|mag_________::${DHPUtils.md5(paper.paperId.get.toString)}") - val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null) + val mainTitles = structuredProperty(paper.originalTitle.get, ModelConstants.MAIN_TITLE_QUALIFIER, null) - val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) + val originalTitles = structuredProperty(paper.paperTitle.get, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) - pub.setTitle(List(mainTitles, originalTitles).asJava) + pub.setTitle(List(mainTitles, originalTitles).asJava) - if (paper.bookTitle.isDefined) - pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava) - if (paper.abstractText.isDefined) - pub.setDescription(List(field(paper.abstractText.get, null)).asJava) - if (paper.authors.isDefined && paper.authors.get.nonEmpty) { - pub.setAuthor( - paper.authors.get - .filter(a => a.AuthorName.isDefined) - .map(a => { - val author = new Author - author.setFullname(a.AuthorName.get) - if (a.AffiliationName.isDefined) - author.setAffiliation(List(field(a.AffiliationName.get, null)).asJava) - author.setPid( - List( - structuredProperty( - s"https://academic.microsoft.com/#/detail/${a.AuthorId.get}", - qualifier("url", "url", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES), - null - ) - ).asJava - ) - author - }) - .asJava - ) + if (paper.bookTitle.isDefined) + pub.setSource(List(field[String](paper.bookTitle.get, null)).asJava) + if (paper.abstractText.isDefined) + pub.setDescription(List(field(paper.abstractText.get, null)).asJava) + if (paper.authors.isDefined && paper.authors.get.nonEmpty) { + pub.setAuthor( + paper.authors.get + .filter(a => a.AuthorName.isDefined) + .map(a => { + val author = new Author + author.setFullname(a.AuthorName.get) + author + }) + .asJava + ) + } - } + if (paper.date.isDefined) + pub.setDateofacceptance(field(paper.date.get, null)) - if (paper.date.isDefined) - pub.setDateofacceptance(field(paper.date.get, null)) + if (paper.publisher.isDefined) + pub.setPublisher(field(paper.publisher.get, null)) + if (paper.journalId.isDefined && paper.journalName.isDefined) { + val j = new Journal + + j.setName(paper.journalName.get) + j.setSp(paper.firstPage.orNull) + j.setEp(paper.lastPage.orNull) if (paper.publisher.isDefined) pub.setPublisher(field(paper.publisher.get, null)) - - if (paper.journalId.isDefined && paper.journalName.isDefined) { - val j = new Journal - - j.setName(paper.journalName.get) - j.setSp(paper.firstPage.orNull) - j.setEp(paper.lastPage.orNull) - if (paper.publisher.isDefined) - pub.setPublisher(field(paper.publisher.get, null)) - j.setIssnPrinted(paper.journalIssn.orNull) - j.setVol(paper.volume.orNull) - j.setIss(paper.issue.orNull) - j.setConferenceplace(paper.conferenceLocation.orNull) - j.setEdition(paper.conferenceName.orNull) - pub.setJournal(j) - } - - pub - } else { - null + j.setIssnPrinted(paper.journalIssn.orNull) + j.setVol(paper.volume.orNull) + j.setIss(paper.issue.orNull) + j.setConferenceplace(paper.conferenceLocation.orNull) + j.setEdition(paper.conferenceName.orNull) + pub.setJournal(j) } + pub + } def convertInvertedIndexString(json_input: String): String = { diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala index 6254a7d8e4..296c303950 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala @@ -2,13 +2,11 @@ package eu.dnetlib.dhp.collection.crossref import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest -import org.apache.spark.sql.SparkSession -import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith import org.mockito.junit.jupiter.MockitoExtension import org.slf4j.{Logger, LoggerFactory} -import scala.io.Source @ExtendWith(Array(classOf[MockitoExtension])) class CrossrefMappingTest extends AbstractVocabularyTest { @@ -21,21 +19,6 @@ class CrossrefMappingTest extends AbstractVocabularyTest { super.setUpVocabulary() } - def testMapping(): Unit = { - val spark = SparkSession.builder().master("local[*]").appName("TransformCrossref").getOrCreate() - val s = new SparkMapDumpIntoOAF(null, null, null) - import spark.implicits._ - - s.transformCrossref( - spark, - sourcePath = "/home/sandro/Downloads/crossref", - targetPath = "/home/sandro/Downloads/crossref_transformed", - unpaywallPath = null, - vocabularies = vocabularies - ) - - print(spark.read.text("/home/sandro/Downloads/crossref_transformed").count) - } } diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala index 34b1538ff8..08556ea674 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala @@ -1,15 +1,19 @@ package eu.dnetlib.dhp.collection.mag import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.schema.oaf.{Dataset, Publication, Result} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions._ + + class MAGMappingTest { val mapper = new ObjectMapper() - @Test + def mappingTest(): Unit = { val spark = SparkSession @@ -28,4 +32,60 @@ class MAGMappingTest { } + + + @Test + def mappingMagType(): Unit = { + /* + +-----------+--------+ + | docType| count| + +-----------+--------+ + | null|79939635| + |BookChapter| 2431452| + | Dataset| 123923| + | Repository| 5044165| + | Thesis| 5525681| + | Conference| 5196866| + | Journal|89452763| + | Book| 4460017| + | Patent|64631955| + +-----------+--------+ + + "instancetype":{ + "classid":"0001", + "classname":"Article", + "schemeid":"dnet:publication_resource", + "schemename":"dnet:publication_resource"},"instanceTypeMapping":[{"originalType":"journal-article","typeCode":null,"typeLabel":null,"vocabularyName":"openaire::coar_resource_types_3_1"} + + */ + + checkResult[Publication](MagUtility.getInstanceType(null, null), invisible = false,"Other literature type") + checkResult[Publication](MagUtility.getInstanceType(Some("BookChapter"), null), invisible = false,"Part of book or chapter of book") + checkResult[Publication](MagUtility.getInstanceType(Some("Book"), null), invisible = false,"Book") + checkResult[Publication](MagUtility.getInstanceType(Some("Repository"), null), invisible = true,"Other literature type") + checkResult[Publication](MagUtility.getInstanceType(Some("Thesis"), null), invisible = false,"Thesis") + checkResult[Publication](MagUtility.getInstanceType(Some("Conference"), null), invisible = false,"Article") + checkResult[Publication](MagUtility.getInstanceType(Some("Journal"), null), invisible = false,"Journal") + checkResult[Dataset](MagUtility.getInstanceType(Some("Dataset"), null), invisible = false,"Dataset") + checkResult[Publication](MagUtility.getInstanceType(Some("Patent"), Some("Patent Department of the Navy")), invisible = false,"Patent") + checkResult[Dataset](MagUtility.getInstanceType(Some("Dataset"), null), invisible = false,"Dataset") + + + + } + + + def checkResult[T](r:Result, invisible:Boolean, typeName:String): Unit = { + + assertNotNull(r) + assertTrue(r.isInstanceOf[T]) + assertNotNull(r.getDataInfo) + assertEquals( invisible ,r.getDataInfo.getInvisible) + assertNotNull(r.getInstance()) + assertTrue(r.getInstance().size()>0) + assertNotNull(r.getInstance().get(0).getInstancetype) + assertEquals(typeName, r.getInstance().get(0).getInstancetype.getClassname) + + } + }