diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index de9cb4c4f..3b0ee8cca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.collection.crossref import com.fasterxml.jackson.databind.ObjectMapper + import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf._ @@ -28,8 +29,6 @@ import scala.collection.mutable import scala.io.Source import scala.util.matching.Regex -case class CrossrefDT(doi: String, json: String, timestamp: Long) {} - case class mappingAffiliation(name: String) {} case class mappingAuthor( @@ -44,8 +43,6 @@ case class funderInfo(id: String, uri: String, name: String, synonym: List[Strin case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {} -case class CrossrefResult(oafType: String, body: String) {} - case class UnpayWall(doi: String, is_oa: Boolean, best_oa_location: UnpayWallOALocation, oa_status: String) {} case class UnpayWallOALocation(license: Option[String], url: String, host_type: Option[String]) {} @@ -616,17 +613,48 @@ case object Crossref2Oaf { null } - def extract_doi(input: String): CrossrefDT = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - CrossrefDT(doi = (json \ "DOI").extract[String].toLowerCase, json = input, 0) + object TransformationType extends Enumeration { + type TransformationType = Value + val OnlyRelation, OnlyResult, All = Value + } + import TransformationType._ + + def mergeUnpayWall(r: Result, uw: UnpayWall): Result = { + if (uw != null) { + + r.setCollectedfrom(List(r.getCollectedfrom.get(0), createUnpayWallCollectedFrom()).asJava) + val i: Instance = new Instance() + i.setCollectedfrom(createUnpayWallCollectedFrom()) + if (uw.best_oa_location != null) { + i.setUrl(List(uw.best_oa_location.url).asJava) + if (uw.best_oa_location.license.isDefined) { + i.setLicense(field[String](uw.best_oa_location.license.get, null)) + } + val colour = get_unpaywall_color(uw.oa_status) + if (colour.isDefined) { + val a = new AccessRight + a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN) + a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN) + a.setSchemeid(ModelConstants.DNET_ACCESS_MODES) + a.setSchemename(ModelConstants.DNET_ACCESS_MODES) + a.setOpenAccessRoute(colour.get) + i.setAccessright(a) + } + i.setInstancetype(r.getInstance().get(0).getInstancetype) + i.setInstanceTypeMapping(r.getInstance().get(0).getInstanceTypeMapping) + i.setPid(r.getPid) + r.setInstance(List(r.getInstance().get(0), i).asJava) + } + + } + r } - def convert(input: CrossrefDT, uw: UnpayWall, vocabularies: VocabularyGroup): List[CrossrefResult] = { + def convert(input: String, vocabularies: VocabularyGroup, mode: TransformationType): List[Oaf] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input.json) + lazy val json: json4s.JValue = parse(input) - var resultList: List[CrossrefResult] = List() + var resultList: List[Oaf] = List() val objectType = (json \ "type").extractOrElse[String](null) if (objectType == null) @@ -645,65 +673,70 @@ case object Crossref2Oaf { if (result == null || result.getId == null) return List() - val funderList: List[mappingFunder] = - (json \ "funder").extractOrElse[List[mappingFunder]](List()) - - if (funderList.nonEmpty) { - resultList = resultList ::: mappingFunderToRelations( - funderList, - result.getId, - createCrossrefCollectedFrom(), - result.getDataInfo, - result.getLastupdatetimestamp - ).map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s))) - } - result match { case publication: Publication => convertPublication(publication, json, typology._1) case dataset: Dataset => convertDataset(dataset) } - val doisReference: List[String] = for { - JObject(reference_json) <- json \ "reference" - JField("DOI", JString(doi_json)) <- reference_json - } yield doi_json + //RELATION SECTION + if (mode == OnlyRelation || mode == All) { + val funderList: List[mappingFunder] = + (json \ "funder").extractOrElse[List[mappingFunder]](List()) - if (doisReference != null && doisReference.nonEmpty) { - val citation_relations: List[Relation] = generateCitationRelations(doisReference, result) - resultList = resultList ::: citation_relations.map(s => - CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s)) - ) - } + if (funderList.nonEmpty) { + resultList = resultList ::: mappingFunderToRelations( + funderList, + result.getId, + createCrossrefCollectedFrom(), + result.getDataInfo, + result.getLastupdatetimestamp + ) + // .map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s))) + } + val doisReference: List[String] = for { + JObject(reference_json) <- json \ "reference" + JField("DOI", JString(doi_json)) <- reference_json + } yield doi_json - if (uw != null) { - result.getCollectedfrom.add(createUnpayWallCollectedFrom()) - val i: Instance = new Instance() - i.setCollectedfrom(createUnpayWallCollectedFrom()) - if (uw.best_oa_location != null) { - - i.setUrl(List(uw.best_oa_location.url).asJava) - if (uw.best_oa_location.license.isDefined) { - i.setLicense(field[String](uw.best_oa_location.license.get, null)) - } - - val colour = get_unpaywall_color(uw.oa_status) - if (colour.isDefined) { - val a = new AccessRight - a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN) - a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN) - a.setSchemeid(ModelConstants.DNET_ACCESS_MODES) - a.setSchemename(ModelConstants.DNET_ACCESS_MODES) - a.setOpenAccessRoute(colour.get) - i.setAccessright(a) - } - i.setPid(result.getPid) - result.getInstance().add(i) + if (doisReference != null && doisReference.nonEmpty) { + val citation_relations: List[Relation] = generateCitationRelations(doisReference, result) + resultList = resultList ::: citation_relations } } if (!filterResult(result)) List() - else - resultList ::: List(result).map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s))) + else { + if (mode == OnlyResult || mode == All) + resultList ::: List(result) + else + resultList + } + + // if (uw != null) { +// result.getCollectedfrom.add(createUnpayWallCollectedFrom()) +// val i: Instance = new Instance() +// i.setCollectedfrom(createUnpayWallCollectedFrom()) +// if (uw.best_oa_location != null) { +// +// i.setUrl(List(uw.best_oa_location.url).asJava) +// if (uw.best_oa_location.license.isDefined) { +// i.setLicense(field[String](uw.best_oa_location.license.get, null)) +// } +// +// val colour = get_unpaywall_color(uw.oa_status) +// if (colour.isDefined) { +// val a = new AccessRight +// a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN) +// a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN) +// a.setSchemeid(ModelConstants.DNET_ACCESS_MODES) +// a.setSchemename(ModelConstants.DNET_ACCESS_MODES) +// a.setOpenAccessRoute(colour.get) +// i.setAccessright(a) +// } +// i.setPid(result.getPid) +// result.getInstance().add(i) +// } +// } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala deleted file mode 100644 index 45beb5b55..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala +++ /dev/null @@ -1,66 +0,0 @@ -package eu.dnetlib.dhp.collection.crossref - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.utils.DoiCleaningRule -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} -import org.apache.spark.{SparkConf, SparkContext} -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.jackson.JsonMethods.parse -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object GenerateCrossrefDataset { - - val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass) - - implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] - - def crossrefElement(meta: String): CrossrefDT = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(meta) - val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String]) - val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] - CrossrefDT(doi, meta, timestamp) - - } - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser( - Source - .fromInputStream( - getClass.getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json" - ) - ) - .mkString - ) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") - - val spark: SparkSession = SparkSession - .builder() - .config(conf) - .appName(GenerateCrossrefDataset.getClass.getSimpleName) - .master(master) - .getOrCreate() - val sc: SparkContext = spark.sparkContext - - import spark.implicits._ - - val tmp: RDD[String] = sc.textFile(sourcePath, 6000) - - spark - .createDataset(tmp) - .map(entry => crossrefElement(entry)) - .write - .mode(SaveMode.Overwrite) - .save(targetPath) - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala index 7da68d461..85f1b86c6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala @@ -1,8 +1,10 @@ package eu.dnetlib.dhp.collection.crossref +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lower} @@ -70,21 +72,38 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger vocabularies: VocabularyGroup ): Unit = { import spark.implicits._ + + val mapper = new ObjectMapper() + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val dump: Dataset[String] = spark.read.text(sourcePath).as[String] - - val uw = transformUnpayWall(spark, unpaywallPath, sourcePath) - - val crId = dump.map(s => Crossref2Oaf.extract_doi(s)) - - crId - .joinWith(uw, crId("doi") === uw("doi"), "left") - .flatMap(s => Crossref2Oaf.convert(s._1, s._2, vocabularies)) + dump + .flatMap(s => Crossref2Oaf.convert(s, vocabularies, TransformationType.OnlyRelation)) + .as[Oaf] + .map(r => mapper.writeValueAsString(r)) .write .mode(SaveMode.Overwrite) - .partitionBy("oafType") .option("compression", "gzip") .text(targetPath) - + val uw = transformUnpayWall(spark, unpaywallPath, sourcePath) + val resultCrossref: Dataset[(String, Result)] = dump + .flatMap(s => Crossref2Oaf.convert(s, vocabularies, TransformationType.OnlyResult)) + .as[Oaf] + .map(r => r.asInstanceOf[Result]) + .map(r => (r.getPid.get(0).getValue, r))(Encoders.tuple(Encoders.STRING, resultEncoder)) + resultCrossref + .joinWith(uw, resultCrossref("_1").equalTo(uw("doi")), "left") + .map(k => { + mergeUnpayWall(k._1._2, k._2) + }) + .map(r => mapper.writeValueAsString(r)) + .as[Result] + .write + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(s"$targetPath") } }