forked from D-Net/dnet-hadoop
Improved Crossref mapping to include also unpaywall tested
This commit is contained in:
parent
ece56f0178
commit
73a67c0e4a
|
@ -1,6 +1,7 @@
|
||||||
package eu.dnetlib.dhp.collection.crossref
|
package eu.dnetlib.dhp.collection.crossref
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
import eu.dnetlib.dhp.schema.oaf._
|
import eu.dnetlib.dhp.schema.oaf._
|
||||||
|
@ -28,8 +29,6 @@ import scala.collection.mutable
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
import scala.util.matching.Regex
|
import scala.util.matching.Regex
|
||||||
|
|
||||||
case class CrossrefDT(doi: String, json: String, timestamp: Long) {}
|
|
||||||
|
|
||||||
case class mappingAffiliation(name: String) {}
|
case class mappingAffiliation(name: String) {}
|
||||||
|
|
||||||
case class mappingAuthor(
|
case class mappingAuthor(
|
||||||
|
@ -44,8 +43,6 @@ case class funderInfo(id: String, uri: String, name: String, synonym: List[Strin
|
||||||
|
|
||||||
case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {}
|
case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {}
|
||||||
|
|
||||||
case class CrossrefResult(oafType: String, body: String) {}
|
|
||||||
|
|
||||||
case class UnpayWall(doi: String, is_oa: Boolean, best_oa_location: UnpayWallOALocation, oa_status: String) {}
|
case class UnpayWall(doi: String, is_oa: Boolean, best_oa_location: UnpayWallOALocation, oa_status: String) {}
|
||||||
|
|
||||||
case class UnpayWallOALocation(license: Option[String], url: String, host_type: Option[String]) {}
|
case class UnpayWallOALocation(license: Option[String], url: String, host_type: Option[String]) {}
|
||||||
|
@ -616,17 +613,48 @@ case object Crossref2Oaf {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
|
||||||
def extract_doi(input: String): CrossrefDT = {
|
object TransformationType extends Enumeration {
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
type TransformationType = Value
|
||||||
lazy val json: json4s.JValue = parse(input)
|
val OnlyRelation, OnlyResult, All = Value
|
||||||
CrossrefDT(doi = (json \ "DOI").extract[String].toLowerCase, json = input, 0)
|
}
|
||||||
|
import TransformationType._
|
||||||
|
|
||||||
|
def mergeUnpayWall(r: Result, uw: UnpayWall): Result = {
|
||||||
|
if (uw != null) {
|
||||||
|
|
||||||
|
r.setCollectedfrom(List(r.getCollectedfrom.get(0), createUnpayWallCollectedFrom()).asJava)
|
||||||
|
val i: Instance = new Instance()
|
||||||
|
i.setCollectedfrom(createUnpayWallCollectedFrom())
|
||||||
|
if (uw.best_oa_location != null) {
|
||||||
|
i.setUrl(List(uw.best_oa_location.url).asJava)
|
||||||
|
if (uw.best_oa_location.license.isDefined) {
|
||||||
|
i.setLicense(field[String](uw.best_oa_location.license.get, null))
|
||||||
|
}
|
||||||
|
val colour = get_unpaywall_color(uw.oa_status)
|
||||||
|
if (colour.isDefined) {
|
||||||
|
val a = new AccessRight
|
||||||
|
a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN)
|
||||||
|
a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN)
|
||||||
|
a.setSchemeid(ModelConstants.DNET_ACCESS_MODES)
|
||||||
|
a.setSchemename(ModelConstants.DNET_ACCESS_MODES)
|
||||||
|
a.setOpenAccessRoute(colour.get)
|
||||||
|
i.setAccessright(a)
|
||||||
|
}
|
||||||
|
i.setInstancetype(r.getInstance().get(0).getInstancetype)
|
||||||
|
i.setInstanceTypeMapping(r.getInstance().get(0).getInstanceTypeMapping)
|
||||||
|
i.setPid(r.getPid)
|
||||||
|
r.setInstance(List(r.getInstance().get(0), i).asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
r
|
||||||
}
|
}
|
||||||
|
|
||||||
def convert(input: CrossrefDT, uw: UnpayWall, vocabularies: VocabularyGroup): List[CrossrefResult] = {
|
def convert(input: String, vocabularies: VocabularyGroup, mode: TransformationType): List[Oaf] = {
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
lazy val json: json4s.JValue = parse(input.json)
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
|
||||||
var resultList: List[CrossrefResult] = List()
|
var resultList: List[Oaf] = List()
|
||||||
|
|
||||||
val objectType = (json \ "type").extractOrElse[String](null)
|
val objectType = (json \ "type").extractOrElse[String](null)
|
||||||
if (objectType == null)
|
if (objectType == null)
|
||||||
|
@ -645,65 +673,70 @@ case object Crossref2Oaf {
|
||||||
if (result == null || result.getId == null)
|
if (result == null || result.getId == null)
|
||||||
return List()
|
return List()
|
||||||
|
|
||||||
val funderList: List[mappingFunder] =
|
|
||||||
(json \ "funder").extractOrElse[List[mappingFunder]](List())
|
|
||||||
|
|
||||||
if (funderList.nonEmpty) {
|
|
||||||
resultList = resultList ::: mappingFunderToRelations(
|
|
||||||
funderList,
|
|
||||||
result.getId,
|
|
||||||
createCrossrefCollectedFrom(),
|
|
||||||
result.getDataInfo,
|
|
||||||
result.getLastupdatetimestamp
|
|
||||||
).map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s)))
|
|
||||||
}
|
|
||||||
|
|
||||||
result match {
|
result match {
|
||||||
case publication: Publication => convertPublication(publication, json, typology._1)
|
case publication: Publication => convertPublication(publication, json, typology._1)
|
||||||
case dataset: Dataset => convertDataset(dataset)
|
case dataset: Dataset => convertDataset(dataset)
|
||||||
}
|
}
|
||||||
|
|
||||||
val doisReference: List[String] = for {
|
//RELATION SECTION
|
||||||
JObject(reference_json) <- json \ "reference"
|
if (mode == OnlyRelation || mode == All) {
|
||||||
JField("DOI", JString(doi_json)) <- reference_json
|
val funderList: List[mappingFunder] =
|
||||||
} yield doi_json
|
(json \ "funder").extractOrElse[List[mappingFunder]](List())
|
||||||
|
|
||||||
if (doisReference != null && doisReference.nonEmpty) {
|
if (funderList.nonEmpty) {
|
||||||
val citation_relations: List[Relation] = generateCitationRelations(doisReference, result)
|
resultList = resultList ::: mappingFunderToRelations(
|
||||||
resultList = resultList ::: citation_relations.map(s =>
|
funderList,
|
||||||
CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s))
|
result.getId,
|
||||||
)
|
createCrossrefCollectedFrom(),
|
||||||
}
|
result.getDataInfo,
|
||||||
|
result.getLastupdatetimestamp
|
||||||
|
)
|
||||||
|
// .map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s)))
|
||||||
|
}
|
||||||
|
val doisReference: List[String] = for {
|
||||||
|
JObject(reference_json) <- json \ "reference"
|
||||||
|
JField("DOI", JString(doi_json)) <- reference_json
|
||||||
|
} yield doi_json
|
||||||
|
|
||||||
if (uw != null) {
|
if (doisReference != null && doisReference.nonEmpty) {
|
||||||
result.getCollectedfrom.add(createUnpayWallCollectedFrom())
|
val citation_relations: List[Relation] = generateCitationRelations(doisReference, result)
|
||||||
val i: Instance = new Instance()
|
resultList = resultList ::: citation_relations
|
||||||
i.setCollectedfrom(createUnpayWallCollectedFrom())
|
|
||||||
if (uw.best_oa_location != null) {
|
|
||||||
|
|
||||||
i.setUrl(List(uw.best_oa_location.url).asJava)
|
|
||||||
if (uw.best_oa_location.license.isDefined) {
|
|
||||||
i.setLicense(field[String](uw.best_oa_location.license.get, null))
|
|
||||||
}
|
|
||||||
|
|
||||||
val colour = get_unpaywall_color(uw.oa_status)
|
|
||||||
if (colour.isDefined) {
|
|
||||||
val a = new AccessRight
|
|
||||||
a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN)
|
|
||||||
a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN)
|
|
||||||
a.setSchemeid(ModelConstants.DNET_ACCESS_MODES)
|
|
||||||
a.setSchemename(ModelConstants.DNET_ACCESS_MODES)
|
|
||||||
a.setOpenAccessRoute(colour.get)
|
|
||||||
i.setAccessright(a)
|
|
||||||
}
|
|
||||||
i.setPid(result.getPid)
|
|
||||||
result.getInstance().add(i)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!filterResult(result))
|
if (!filterResult(result))
|
||||||
List()
|
List()
|
||||||
else
|
else {
|
||||||
resultList ::: List(result).map(s => CrossrefResult(s.getClass.getSimpleName, mapper.writeValueAsString(s)))
|
if (mode == OnlyResult || mode == All)
|
||||||
|
resultList ::: List(result)
|
||||||
|
else
|
||||||
|
resultList
|
||||||
|
}
|
||||||
|
|
||||||
|
// if (uw != null) {
|
||||||
|
// result.getCollectedfrom.add(createUnpayWallCollectedFrom())
|
||||||
|
// val i: Instance = new Instance()
|
||||||
|
// i.setCollectedfrom(createUnpayWallCollectedFrom())
|
||||||
|
// if (uw.best_oa_location != null) {
|
||||||
|
//
|
||||||
|
// i.setUrl(List(uw.best_oa_location.url).asJava)
|
||||||
|
// if (uw.best_oa_location.license.isDefined) {
|
||||||
|
// i.setLicense(field[String](uw.best_oa_location.license.get, null))
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// val colour = get_unpaywall_color(uw.oa_status)
|
||||||
|
// if (colour.isDefined) {
|
||||||
|
// val a = new AccessRight
|
||||||
|
// a.setClassid(ModelConstants.ACCESS_RIGHT_OPEN)
|
||||||
|
// a.setClassname(ModelConstants.ACCESS_RIGHT_OPEN)
|
||||||
|
// a.setSchemeid(ModelConstants.DNET_ACCESS_MODES)
|
||||||
|
// a.setSchemename(ModelConstants.DNET_ACCESS_MODES)
|
||||||
|
// a.setOpenAccessRoute(colour.get)
|
||||||
|
// i.setAccessright(a)
|
||||||
|
// }
|
||||||
|
// i.setPid(result.getPid)
|
||||||
|
// result.getInstance().add(i)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,66 +0,0 @@
|
||||||
package eu.dnetlib.dhp.collection.crossref
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.DoiCleaningRule
|
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
|
||||||
import org.apache.spark.{SparkConf, SparkContext}
|
|
||||||
import org.json4s
|
|
||||||
import org.json4s.DefaultFormats
|
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
|
||||||
|
|
||||||
import scala.io.Source
|
|
||||||
|
|
||||||
object GenerateCrossrefDataset {
|
|
||||||
|
|
||||||
val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass)
|
|
||||||
|
|
||||||
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
|
|
||||||
|
|
||||||
def crossrefElement(meta: String): CrossrefDT = {
|
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
|
||||||
lazy val json: json4s.JValue = parse(meta)
|
|
||||||
val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String])
|
|
||||||
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
|
|
||||||
CrossrefDT(doi, meta, timestamp)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val conf = new SparkConf
|
|
||||||
val parser = new ArgumentApplicationParser(
|
|
||||||
Source
|
|
||||||
.fromInputStream(
|
|
||||||
getClass.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.mkString
|
|
||||||
)
|
|
||||||
parser.parseArgument(args)
|
|
||||||
val master = parser.get("master")
|
|
||||||
val sourcePath = parser.get("sourcePath")
|
|
||||||
val targetPath = parser.get("targetPath")
|
|
||||||
|
|
||||||
val spark: SparkSession = SparkSession
|
|
||||||
.builder()
|
|
||||||
.config(conf)
|
|
||||||
.appName(GenerateCrossrefDataset.getClass.getSimpleName)
|
|
||||||
.master(master)
|
|
||||||
.getOrCreate()
|
|
||||||
val sc: SparkContext = spark.sparkContext
|
|
||||||
|
|
||||||
import spark.implicits._
|
|
||||||
|
|
||||||
val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
|
|
||||||
|
|
||||||
spark
|
|
||||||
.createDataset(tmp)
|
|
||||||
.map(entry => crossrefElement(entry))
|
|
||||||
.write
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.save(targetPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,8 +1,10 @@
|
||||||
package eu.dnetlib.dhp.collection.crossref
|
package eu.dnetlib.dhp.collection.crossref
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
|
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall}
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions.{col, lower}
|
import org.apache.spark.sql.functions.{col, lower}
|
||||||
|
@ -70,21 +72,38 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
|
||||||
vocabularies: VocabularyGroup
|
vocabularies: VocabularyGroup
|
||||||
): Unit = {
|
): Unit = {
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||||
|
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||||
|
|
||||||
val dump: Dataset[String] = spark.read.text(sourcePath).as[String]
|
val dump: Dataset[String] = spark.read.text(sourcePath).as[String]
|
||||||
|
dump
|
||||||
val uw = transformUnpayWall(spark, unpaywallPath, sourcePath)
|
.flatMap(s => Crossref2Oaf.convert(s, vocabularies, TransformationType.OnlyRelation))
|
||||||
|
.as[Oaf]
|
||||||
val crId = dump.map(s => Crossref2Oaf.extract_doi(s))
|
.map(r => mapper.writeValueAsString(r))
|
||||||
|
|
||||||
crId
|
|
||||||
.joinWith(uw, crId("doi") === uw("doi"), "left")
|
|
||||||
.flatMap(s => Crossref2Oaf.convert(s._1, s._2, vocabularies))
|
|
||||||
.write
|
.write
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.partitionBy("oafType")
|
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.text(targetPath)
|
.text(targetPath)
|
||||||
|
val uw = transformUnpayWall(spark, unpaywallPath, sourcePath)
|
||||||
|
val resultCrossref: Dataset[(String, Result)] = dump
|
||||||
|
.flatMap(s => Crossref2Oaf.convert(s, vocabularies, TransformationType.OnlyResult))
|
||||||
|
.as[Oaf]
|
||||||
|
.map(r => r.asInstanceOf[Result])
|
||||||
|
.map(r => (r.getPid.get(0).getValue, r))(Encoders.tuple(Encoders.STRING, resultEncoder))
|
||||||
|
resultCrossref
|
||||||
|
.joinWith(uw, resultCrossref("_1").equalTo(uw("doi")), "left")
|
||||||
|
.map(k => {
|
||||||
|
mergeUnpayWall(k._1._2, k._2)
|
||||||
|
})
|
||||||
|
.map(r => mapper.writeValueAsString(r))
|
||||||
|
.as[Result]
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.text(s"$targetPath")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue