From ece56f0178473ebb59b8a0a10d9bd96251620dd5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 25 Mar 2024 18:18:10 +0100 Subject: [PATCH] update crossref mapping to be transformed together with UnpayWall --- .../convert_crossref_dump_to_oaf_params.json | 6 ++ .../crossref/oozie_app/workflow.xml | 9 ++- .../collection/crossref/Crossref2Oaf.scala | 75 ++++++++++++++++++- .../crossref/SparkMapDumpIntoOAF.scala | 50 ++++++++++++- .../crossref/CrossrefMappingTest.scala | 2 +- .../dnetlib/doiboost/uw/UnpayWallToOAF.scala | 2 + 6 files changed, 136 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json index 4c2f22a75..cc7333e48 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json @@ -11,6 +11,12 @@ "paramDescription": "The base path of Crossref DUMP", "paramRequired": true }, + { + "paramName": "uw", + "paramLongName": "unpaywallPath", + "paramDescription": "The base path of unpaywall DUMP", + "paramRequired": true + }, { "paramName": "t", "paramLongName": "targetPath", diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml index 8ec98810f..c9c09d20f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml @@ -2,11 +2,15 @@ sourcePath - The base path of MAG DUMP CSV Tables + The base path of Crossref DUMP targetPath - The base path of MAG DUMP CSV Tables + The targetPath + + + unpaywallPath + The base path of unpaywall DUMP isLookupUrl @@ -42,6 +46,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${sourcePath} + --unpaywallPath${unpaywallPath} --targetPath${targetPath} --isLookupUrl${isLookupUrl} --masteryarn diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index f8e1fdc54..de9cb4c4f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -46,6 +46,10 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S case class CrossrefResult(oafType: String, body: String) {} +case class UnpayWall(doi: String, is_oa: Boolean, best_oa_location: UnpayWallOALocation, oa_status: String) {} + +case class UnpayWallOALocation(license: Option[String], url: String, host_type: Option[String]) {} + case object Crossref2Oaf { val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) val mapper = new ObjectMapper @@ -87,6 +91,15 @@ case object Crossref2Oaf { } + def createUnpayWallCollectedFrom(): KeyValue = { + + val cf = new KeyValue + cf.setValue("UnpayWall") + cf.setKey(s"10|openaire____:${DHPUtils.md5("UnpayWall".toLowerCase)}") + cf + + } + def generateDataInfo(): DataInfo = { generateDataInfo("0.91") } @@ -289,6 +302,34 @@ case object Crossref2Oaf { true } + def get_unpaywall_color(input: String): Option[OpenAccessRoute] = { + if (input == null || input.equalsIgnoreCase("close")) + return None + if (input.equalsIgnoreCase("green")) + return Some(OpenAccessRoute.green) + if (input.equalsIgnoreCase("bronze")) + return Some(OpenAccessRoute.bronze) + if (input.equalsIgnoreCase("hybrid")) + return Some(OpenAccessRoute.hybrid) + else + return Some(OpenAccessRoute.gold) + + } + + def get_color(input: String): Option[OpenAccessRoute] = { + if (input == null || input.equalsIgnoreCase("closed")) + return None + if (input.equalsIgnoreCase("green")) + return Some(OpenAccessRoute.green) + if (input.equalsIgnoreCase("bronze")) + return Some(OpenAccessRoute.bronze) + if (input.equalsIgnoreCase("hybrid")) + return Some(OpenAccessRoute.hybrid) + else + return Some(OpenAccessRoute.gold) + + } + def mappingResult(result: Result, json: JValue, instanceType: Qualifier, originalType: String): Result = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -575,9 +616,15 @@ case object Crossref2Oaf { null } - def convert(input: String, vocabularies: VocabularyGroup): List[CrossrefResult] = { + def extract_doi(input: String): CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) + CrossrefDT(doi = (json \ "DOI").extract[String].toLowerCase, json = input, 0) + } + + def convert(input: CrossrefDT, uw: UnpayWall, vocabularies: VocabularyGroup): List[CrossrefResult] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input.json) var resultList: List[CrossrefResult] = List() @@ -627,6 +674,32 @@ case object Crossref2Oaf { CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s)) ) } + + if (uw != null) { + result.getCollectedfrom.add(createUnpayWallCollectedFrom()) + val i: Instance = new Instance() + i.setCollectedfrom(createUnpayWallCollectedFrom()) + if (uw.best_oa_location != null) { + + i.setUrl(List(uw.best_oa_location.url).asJava) + if (uw.best_oa_location.license.isDefined) { + i.setLicense(field[String](uw.best_oa_location.license.get, null)) + } + + val colour = get_unpaywall_color(uw.oa_status) + if (colour.isDefined) { + val a = new AccessRight + a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN) + a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN) + a.setSchemeid(ModelConstants.DNET_ACCESS_MODES) + a.setSchemename(ModelConstants.DNET_ACCESS_MODES) + a.setOpenAccessRoute(colour.get) + i.setAccessright(a) + } + i.setPid(result.getPid) + result.getInstance().add(i) + } + } if (!filterResult(result)) List() else diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala index 52e73fb9c..7da68d461 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala @@ -5,6 +5,8 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Dataset => OafDataset} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, lower} +import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger) @@ -18,12 +20,45 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger log.info("sourcePath: {}", sourcePath) val targetPath = parser.get("targetPath") log.info("targetPath: {}", targetPath) + val unpaywallPath = parser.get("unpaywallPath") + log.info("unpaywallPath: {}", unpaywallPath) val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) require(vocabularies != null) - transformCrossref(spark, sourcePath, targetPath, vocabularies) + transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies) + + } + + def transformUnpayWall(spark: SparkSession, unpaywallPath: String, crossrefPath: String): Dataset[UnpayWall] = { + val schema = new StructType() + .add(StructField("doi", StringType)) + .add(StructField("is_oa", BooleanType)) + .add( + StructField( + "best_oa_location", + new StructType() + .add("host_type", StringType) + .add("license", StringType) + .add("url", StringType) + ) + ) + .add("oa_status", StringType) + + import spark.implicits._ + val cId = spark.read + .schema(new StructType().add("DOI", StringType)) + .json(crossrefPath) + .withColumn("doi", lower(col("DOI"))) + + val uw = spark.read + .schema(schema) + .json(unpaywallPath) + .withColumn("doi", lower(col("doi"))) + .where("is_oa = true and best_oa_location.url is not null") + + uw.join(cId, uw("doi") === cId("doi"), "leftsemi").as[UnpayWall].cache() } @@ -31,12 +66,19 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger spark: SparkSession, sourcePath: String, targetPath: String, + unpaywallPath: String, vocabularies: VocabularyGroup ): Unit = { import spark.implicits._ - val dump = spark.read.text(sourcePath).as[String] - dump - .flatMap(s => Crossref2Oaf.convert(s, vocabularies)) + val dump: Dataset[String] = spark.read.text(sourcePath).as[String] + + val uw = transformUnpayWall(spark, unpaywallPath, sourcePath) + + val crId = dump.map(s => Crossref2Oaf.extract_doi(s)) + + crId + .joinWith(uw, crId("doi") === uw("doi"), "left") + .flatMap(s => Crossref2Oaf.convert(s._1, s._2, vocabularies)) .write .mode(SaveMode.Overwrite) .partitionBy("oafType") diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala index a12c2a659..6254a7d8e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala @@ -21,7 +21,6 @@ class CrossrefMappingTest extends AbstractVocabularyTest { super.setUpVocabulary() } - @Test def testMapping(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("TransformCrossref").getOrCreate() @@ -32,6 +31,7 @@ class CrossrefMappingTest extends AbstractVocabularyTest { spark, sourcePath = "/home/sandro/Downloads/crossref", targetPath = "/home/sandro/Downloads/crossref_transformed", + unpaywallPath = null, vocabularies = vocabularies ) diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index bbdc80b1d..912b6211e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -71,6 +71,8 @@ object UnpayWallToOAF { } def convertToOAF(input: String): Publication = { + + val pub = new Publication implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats