From ef582948a735b2484143672eebde159155a8b85a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 5 Apr 2024 11:10:44 +0200 Subject: [PATCH] Updated mapping --- .../dhp/collection/mag/MagUtility.scala | 37 +++++++++---------- .../dhp/collection/mag/SparkMAGtoOAF.scala | 12 +++--- .../dhp/collection/mag/MAGMappingTest.scala | 25 ++++++------- 3 files changed, 34 insertions(+), 40 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 81c1c581a..3551bb215 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -1,20 +1,10 @@ package eu.dnetlib.dhp.collection.mag +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf -import eu.dnetlib.dhp.schema.oaf.{ - Dataset => OafDataset, - Author, - DataInfo, - Instance, - Journal, - Publication, - Qualifier, - Result -} -import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils, PidType} import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._ -import eu.dnetlib.dhp.utils +import eu.dnetlib.dhp.schema.oaf.utils.{OafMapperUtils, PidType} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Journal, Publication, Result, Dataset => OafDataset} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -73,6 +63,10 @@ case class MAGAuthor( object MagUtility extends Serializable { + val mapper = new ObjectMapper() + + val MAGCollectedFrom =keyValue(ModelConstants.MAG_ID,ModelConstants.MAG_NAME) + val datatypedict = Map( "bool" -> BooleanType, "int" -> IntegerType, @@ -357,7 +351,7 @@ object MagUtility extends Serializable { ModelConstants.DNET_PROVENANCE_ACTIONS ) ) - if (magType == null) { + if (magType == null || magType.orNull ==null) { result = new Publication result.setDataInfo(di) val i = new Instance @@ -392,7 +386,7 @@ object MagUtility extends Serializable { result = new Publication qualifier("0043", "Journal", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE) case "patent" => - if (source != null) { + if (source!= null && source.orNull != null) { val s = source.get.toLowerCase if (s.contains("patent") || s.contains("brevet")) { result = new Publication @@ -469,20 +463,21 @@ object MagUtility extends Serializable { } - def convertMAGtoOAF(paper: MAGPaper): Result = { + def convertMAGtoOAF(paper: MAGPaper): String = { // FILTER all the MAG paper with no URL - if (paper.urls == null || paper.urls.get != null || paper.urls.get.isEmpty) + if (paper.urls.orNull == null ) return null val result = createResultFromType(paper.docType, paper.originalVenue) - if (result == null) return null + + result.setCollectedfrom(List(MAGCollectedFrom).asJava) val pidList = List( structuredProperty( - paper.doi.get, + paper.paperId.get.toString, qualifier( PidType.mag_id.toString, PidType.mag_id.toString, @@ -587,6 +582,7 @@ object MagUtility extends Serializable { val instance = result.getInstance().get(0) instance.setPid(pidList.asJava) + if(paper.doi.orNull != null) instance.setAlternateIdentifier( List( structuredProperty( @@ -603,6 +599,7 @@ object MagUtility extends Serializable { ) instance.setUrl(paper.urls.get.asJava) instance.setHostedby(ModelConstants.UNKNOWN_REPOSITORY) + instance.setCollectedfrom(MAGCollectedFrom) instance.setAccessright(accessRight( ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, @@ -648,7 +645,7 @@ object MagUtility extends Serializable { } .asJava ) - result + mapper.writeValueAsString(result) } def convertInvertedIndexString(json_input: String): String = { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala index efe26b7d1..fc2aefaf1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.collection.mag import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.Result -import org.apache.spark.sql.{Encoders, SaveMode, SparkSession} +import eu.dnetlib.dhp.schema.oaf.{Publication, Result} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger) @@ -21,19 +21,17 @@ class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger) def convertMAG(spark: SparkSession, workingPath: String, mdStorePath: String): Unit = { import spark.implicits._ - implicit val resultEncoder = Encoders.bean(classOf[Result]) - spark.read - .load(s"$workingPath/mag") - .as[MAGPaper].show() + spark.read .load(s"$workingPath/mag") .as[MAGPaper] .map(s => MagUtility.convertMAGtoOAF(s)) .write + .option("compression", "gzip") .mode(SaveMode.Overwrite) - .json(mdStorePath) + .text(mdStorePath) } } diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala index d0f14cf64..0790f1400 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.collection.mag import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.{Dataset, Publication, Result} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ @@ -14,18 +13,18 @@ class MAGMappingTest { val mapper = new ObjectMapper() - @Test - def mappingTest(): Unit = { - - val spark = SparkSession - .builder() - .appName("Test") - .master("local[*]") - .getOrCreate() - - new SparkMAGtoOAF(null,null,null).convertMAG(spark,"/Users/sandro/Downloads", "/Users/sandro/Downloads/mag_oaf") - - } +// @Test +// def mappingTest(): Unit = { +// +// val spark = SparkSession +// .builder() +// .appName("Test") +// .master("local[*]") +// .getOrCreate() +// +// new SparkMAGtoOAF(null,null,null).convertMAG(spark,"/home/sandro/Downloads", "/home/sandro/Downloads/mag_oaf") +// +// }