From c5328317183fc5a24aa84a4363b7d772b2fb089f Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Wed, 13 Mar 2024 06:56:10 +0100 Subject: [PATCH] Moved Crossref Mapping on dhp-aggregations, refactored code, avoid to use utility for create part of the oaf defined in DOIBoostMappingUtils, used instead utility in OafMappingUtils --- .../dhp/schema/oaf/utils/DoiCleaningRule.java | 27 + .../collection/crossref/Crossref2Oaf.scala | 819 ++++++++++++++++++ .../collection/crossref/CrossrefDataset.scala | 106 +++ .../crossref/GenerateCrossrefDataset.scala | 66 ++ 4 files changed, 1018 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/CrossrefDataset.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/DoiCleaningRule.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/DoiCleaningRule.java index 1a7482685..396919462 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/DoiCleaningRule.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/DoiCleaningRule.java @@ -1,8 +1,12 @@ package eu.dnetlib.dhp.schema.oaf.utils; +import org.apache.commons.lang3.StringUtils; + public class DoiCleaningRule { + + public static String clean(final String doi) { return doi .toLowerCase() @@ -11,4 +15,27 @@ public class DoiCleaningRule { .replaceFirst(CleaningFunctions.DOI_PREFIX_REGEX, CleaningFunctions.DOI_PREFIX); } + public static String normalizeDoi(final String input){ + if (input == null) + return null; + final String replaced = input + .replaceAll("\\n|\\r|\\t|\\s", "") + .toLowerCase() + .replaceFirst(CleaningFunctions.DOI_PREFIX_REGEX, CleaningFunctions.DOI_PREFIX); + if (StringUtils.isEmpty(replaced)) + return null; + + if (!replaced.contains("10.")) + return null; + + + final String ret = replaced.substring(replaced.indexOf("10.")); + + if (!ret.startsWith(CleaningFunctions.DOI_PREFIX)) + return null; + + return ret; + + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala new file mode 100644 index 000000000..6ad28e857 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -0,0 +1,819 @@ +package eu.dnetlib.dhp.collection.crossref + +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf._ +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.{field, qualifier, structuredProperty, subject} +import eu.dnetlib.dhp.schema.oaf.utils.{DoiCleaningRule, GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType} +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.lang.StringUtils +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ +import org.slf4j.{Logger, LoggerFactory} + +import java.time.LocalDate +import java.time.format.DateTimeFormatter +import java.util +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.io.Source +import scala.util.matching.Regex + +case class CrossrefDT(doi: String, json: String, timestamp: Long) {} + +case class mappingAffiliation(name: String) {} + +case class mappingAuthor( + given: Option[String], + family: Option[String], + sequence: Option[String], + ORCID: Option[String], + affiliation: Option[mappingAffiliation] +) {} + +case class funderInfo(id: String, uri: String, name: String, synonym: List[String]) {} + +case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {} + +case object Crossref2Oaf { + val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) + + val irishFunder: List[funderInfo] = { + val s = Source + .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref/irish_funder.json")) + .mkString + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: org.json4s.JValue = parse(s) + json.extract[List[funderInfo]] + } + + def createCrossrefCollectedFrom(): KeyValue = { + + val cf = new KeyValue + cf.setValue("Crossref"); + cf.setKey(ModelConstants.CROSSREF_ID) + cf + + } + def generateDataInfo(): DataInfo = { + generateDataInfo("0.9") + } + + def generateDataInfo(trust: String): DataInfo = { + val di = new DataInfo + di.setDeletedbyinference(false) + di.setInferred(false) + di.setInvisible(false) + di.setTrust(trust) + di.setProvenanceaction( + OafMapperUtils.qualifier( + ModelConstants.SYSIMPORT_ACTIONSET, + ModelConstants.SYSIMPORT_ACTIONSET, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS + ) + ) + di + } + def getOpenAccessQualifier(): AccessRight = { + + OafMapperUtils.accessRight( + ModelConstants.ACCESS_RIGHT_OPEN, + "Open Access", + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + } + + def getRestrictedQualifier(): AccessRight = { + OafMapperUtils.accessRight( + "RESTRICTED", + "Restricted", + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + } + def getUnknownQualifier(): AccessRight = { + OafMapperUtils.accessRight( + ModelConstants.UNKNOWN, + ModelConstants.NOT_AVAILABLE, + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + } + + def getEmbargoedAccessQualifier(): AccessRight = { + OafMapperUtils.accessRight( + "EMBARGO", + "Embargo", + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + } + + def getClosedAccessQualifier(): AccessRight = { + OafMapperUtils.accessRight( + "CLOSED", + "Closed Access", + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + } + + def decideAccessRight(lic: Field[String], date: String): AccessRight = { + if (lic == null) { + //Default value Unknown + return getUnknownQualifier() + } + val license: String = lic.getValue + //CC licenses + if ( + license.startsWith("cc") || + license.startsWith("http://creativecommons.org/licenses") || + license.startsWith("https://creativecommons.org/licenses") || + + //ACS Publications Author choice licenses (considered OPEN also by Unpaywall) + license.equals("http://pubs.acs.org/page/policy/authorchoice_ccby_termsofuse.html") || + license.equals("http://pubs.acs.org/page/policy/authorchoice_termsofuse.html") || + license.equals("http://pubs.acs.org/page/policy/authorchoice_ccbyncnd_termsofuse.html") || + + //APA (considered OPEN also by Unpaywall) + license.equals("http://www.apa.org/pubs/journals/resources/open-access.aspx") + ) { + + val oaq: AccessRight = getOpenAccessQualifier() + oaq.setOpenAccessRoute(OpenAccessRoute.hybrid) + return oaq + } + + //OUP (BUT ONLY AFTER 12 MONTHS FROM THE PUBLICATION DATE, OTHERWISE THEY ARE EMBARGOED) + if ( + license.equals( + "https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model" + ) + ) { + val now = java.time.LocalDate.now + + try { + val pub_date = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd")) + if (((now.toEpochDay - pub_date.toEpochDay) / 365.0) > 1) { + val oaq: AccessRight = getOpenAccessQualifier() + oaq.setOpenAccessRoute(OpenAccessRoute.hybrid) + return oaq + } else { + return getEmbargoedAccessQualifier() + } + } catch { + case e: Exception => { + try { + val pub_date = + LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")) + if (((now.toEpochDay - pub_date.toEpochDay) / 365.0) > 1) { + val oaq: AccessRight = getOpenAccessQualifier() + oaq.setOpenAccessRoute(OpenAccessRoute.hybrid) + return oaq + } else { + return getEmbargoedAccessQualifier() + } + } catch { + case ex: Exception => return getClosedAccessQualifier() + } + } + + } + + } + + getClosedAccessQualifier() + + } + + + def getIrishId(doi: String): Option[String] = { + val id = doi.split("/").last + irishFunder + .find(f => id.equalsIgnoreCase(f.id) || (f.synonym.nonEmpty && f.synonym.exists(s => s.equalsIgnoreCase(id)))) + .map(f => f.id) + } + + def mappingResult(result: Result, json: JValue, cobjCategory: String, originalType: String): Result = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + + //MAPPING Crossref DOI into PID + val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String]) + result.setPid(List(structuredProperty( + doi, + qualifier( + PidType.doi.toString, + PidType.doi.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null + )).asJava) + + //MAPPING Crossref DOI into OriginalId + //and Other Original Identifier of dataset like clinical-trial-number + val clinicalTrialNumbers = for (JString(ctr) <- json \ "clinical-trial-number") yield ctr + val alternativeIds = for (JString(ids) <- json \ "alternative-id") yield ids + val tmp = clinicalTrialNumbers ::: alternativeIds ::: List(doi) + + val originalIds = new util.ArrayList(tmp.filter(id => id != null).asJava) + result.setOriginalId(originalIds) + + // Add DataInfo + result.setDataInfo(generateDataInfo()) + + result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long]) + result.setDateofcollection((json \ "indexed" \ "date-time").extract[String]) + + result.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava) + + // Publisher ( Name of work's publisher mapped into Result/Publisher) + val publisher = (json \ "publisher").extractOrElse[String](null) + if (publisher != null && publisher.nonEmpty) + result.setPublisher(field(publisher, null)) + + // TITLE + val mainTitles = + for { JString(title) <- json \ "title" if title.nonEmpty } yield { + structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, null) + } + val originalTitles = for { + JString(title) <- json \ "original-title" if title.nonEmpty + } yield structuredProperty(title, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) + val shortTitles = for { + JString(title) <- json \ "short-title" if title.nonEmpty + } yield structuredProperty(title, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null) + val subtitles = + for { JString(title) <- json \ "subtitle" if title.nonEmpty } yield structuredProperty( + title, + ModelConstants.SUBTITLE_QUALIFIER, null) + result.setTitle((mainTitles ::: originalTitles ::: shortTitles ::: subtitles).asJava) + + // DESCRIPTION + val descriptionList = + for { JString(description) <- json \ "abstract" } yield field[String](description, null) + result.setDescription(descriptionList.asJava) + + // Source + val sourceList = for { + JString(source) <- json \ "source" if source != null && source.nonEmpty + } yield field(source, null) + result.setSource(sourceList.asJava) + + //RELEVANT DATE Mapping + val createdDate = generateDate( + (json \ "created" \ "date-time").extract[String], + (json \ "created" \ "date-parts").extract[List[List[Int]]], + "created", + ModelConstants.DNET_DATACITE_DATE + ) + val postedDate = generateDate( + (json \ "posted" \ "date-time").extractOrElse[String](null), + (json \ "posted" \ "date-parts").extract[List[List[Int]]], + "available", + ModelConstants.DNET_DATACITE_DATE + ) + val acceptedDate = generateDate( + (json \ "accepted" \ "date-time").extractOrElse[String](null), + (json \ "accepted" \ "date-parts").extract[List[List[Int]]], + "accepted", + ModelConstants.DNET_DATACITE_DATE + ) + val publishedPrintDate = generateDate( + (json \ "published-print" \ "date-time").extractOrElse[String](null), + (json \ "published-print" \ "date-parts").extract[List[List[Int]]], + "published-print", + ModelConstants.DNET_DATACITE_DATE + ) + val publishedOnlineDate = generateDate( + (json \ "published-online" \ "date-time").extractOrElse[String](null), + (json \ "published-online" \ "date-parts").extract[List[List[Int]]], + "published-online", + ModelConstants.DNET_DATACITE_DATE + ) + + val issuedDate = extractDate( + (json \ "issued" \ "date-time").extractOrElse[String](null), + (json \ "issued" \ "date-parts").extract[List[List[Int]]] + ) + if (StringUtils.isNotBlank(issuedDate)) { + result.setDateofacceptance(field(issuedDate,null)) + } else { + result.setDateofacceptance(field(createdDate.getValue,null)) + } + result.setRelevantdate( + List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate) + .filter(p => p != null) + .asJava + ) + + //Mapping Subject + val subjectList: List[String] = (json \ "subject").extractOrElse[List[String]](List()) + + if (subjectList.nonEmpty) { + result.setSubject( + subjectList.map(s => subject(s,ModelConstants.SUBTITLE_QUALIFIER,null)).asJava + ) + } + + //Mapping Author + val authorList: List[mappingAuthor] = + (json \ "author").extract[List[mappingAuthor]].filter(a => a.family.isDefined) + + val sorted_list = authorList.sortWith((a: mappingAuthor, b: mappingAuthor) => + a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first") + ) + + result.setAuthor(sorted_list.zipWithIndex.map { case (a, index) => + generateAuhtor(a.given.orNull, a.family.get, a.ORCID.orNull, index) + }.asJava) + + // Mapping instance + val instance = new Instance() + val license = for { + JObject(license) <- json \ "license" + JField("URL", JString(lic)) <- license + JField("content-version", JString(content_version)) <- license + } yield (field[String](lic,null), content_version) + val l = license.filter(d => StringUtils.isNotBlank(d._1.getValue)) + if (l.nonEmpty) { + if (l exists (d => d._2.equals("vor"))) { + for (d <- l) { + if (d._2.equals("vor")) { + instance.setLicense(d._1) + } + } + } else { + instance.setLicense(l.head._1) + } + } + + // Ticket #6281 added pid to Instance + instance.setPid(result.getPid) + + val has_review = json \ "relation" \ "has-review" \ "id" + + if (has_review != JNothing) { + instance.setRefereed( + OafMapperUtils.qualifier( + "0001", + "peerReviewed", + ModelConstants.DNET_REVIEW_LEVELS, + ModelConstants.DNET_REVIEW_LEVELS + ) + ) + } + + instance.setAccessright( + decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue) + ) + instance.setInstancetype( + OafMapperUtils.qualifier( + cobjCategory.substring(0, 4), + cobjCategory.substring(5), + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + ) + //ADD ORIGINAL TYPE to the mapping + val itm = new InstanceTypeMapping + itm.setOriginalType(originalType) + itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1) + instance.setInstanceTypeMapping(List(itm).asJava) + result.setResourcetype( + OafMapperUtils.qualifier( + cobjCategory.substring(0, 4), + cobjCategory.substring(5), + ModelConstants.DNET_PUBLICATION_RESOURCE, + ModelConstants.DNET_PUBLICATION_RESOURCE + ) + ) + + instance.setCollectedfrom(createCrossrefCollectedFrom()) + if (StringUtils.isNotBlank(issuedDate)) { + instance.setDateofacceptance(field(issuedDate,null)) + } else { + instance.setDateofacceptance(field(createdDate.getValue,null)) + } + val s: List[String] = List("https://doi.org/" + doi) + // val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null && p.toLowerCase().contains(doi.toLowerCase())).distinct + // if (links.nonEmpty) { + // instance.setUrl(links.asJava) + // } + if (s.nonEmpty) { + instance.setUrl(s.asJava) + } + + result.setInstance(List(instance).asJava) + + //IMPORTANT + //The old method result.setId(generateIdentifier(result, doi)) + //is replaced using IdentifierFactory, but the old identifier + //is preserved among the originalId(s) + val oldId = generateIdentifier(result, doi) + result.setId(oldId) + + val newId = IdentifierFactory.createDOIBoostIdentifier(result) + if (!oldId.equalsIgnoreCase(newId)) { + result.getOriginalId.add(oldId) + } + result.setId(newId) + + if (result.getId == null) + null + else + result + } + def generateIdentifier(oaf: Result, doi: String): String = { + val id = DHPUtils.md5(doi.toLowerCase) + s"50|doiboost____|$id" + } + + def generateAuhtor(given: String, family: String, orcid: String, index: Int): Author = { + val a = new Author + a.setName(given) + a.setSurname(family) + a.setFullname(s"$given $family") + a.setRank(index + 1) + if (StringUtils.isNotBlank(orcid)) + a.setPid( + List( + structuredProperty( + orcid, + qualifier( ModelConstants.ORCID_PENDING, ModelConstants.ORCID_PENDING, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES), + generateDataInfo() + ) + ).asJava + ) + + a + } + + def convert(input: String): List[Oaf] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + var resultList: List[Oaf] = List() + + val objectType = (json \ "type").extractOrElse[String](null) + val objectSubType = (json \ "subtype").extractOrElse[String](null) + if (objectType == null) + return resultList + + val result = generateItemFromType(objectType, objectSubType) + if (result == null) + return List() + val cOBJCategory = mappingCrossrefSubType.getOrElse( + objectType, + mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type") + ) + + val originalType = if (mappingCrossrefSubType.contains(objectType)) objectType else objectSubType + mappingResult(result, json, cOBJCategory, originalType) + if (result == null || result.getId == null) + return List() + + val funderList: List[mappingFunder] = + (json \ "funder").extractOrElse[List[mappingFunder]](List()) + + if (funderList.nonEmpty) { + resultList = resultList ::: mappingFunderToRelations( + funderList, + result.getId, + createCrossrefCollectedFrom(), + result.getDataInfo, + result.getLastupdatetimestamp + ) + } + + result match { + case publication: Publication => convertPublication(publication, json, cOBJCategory) + case dataset: Dataset => convertDataset(dataset) + } + + val doisReference: List[String] = for { + JObject(reference_json) <- json \ "reference" + JField("DOI", JString(doi_json)) <- reference_json + } yield doi_json + + if (doisReference != null && doisReference.nonEmpty) { + val citation_relations: List[Relation] = generateCitationRelations(doisReference, result) + resultList = resultList ::: citation_relations + } + resultList = resultList ::: List(result) + resultList + } + + private def createCiteRelation(source: Result, targetPid: String, targetPidType: String): List[Relation] = { + + val targetId = IdentifierFactory.idFromPid("50", targetPidType, targetPid, true) + + val from = new Relation + from.setSource(source.getId) + from.setTarget(targetId) + from.setRelType(ModelConstants.RESULT_RESULT) + from.setRelClass(ModelConstants.CITES) + from.setSubRelType(ModelConstants.CITATION) + from.setCollectedfrom(source.getCollectedfrom) + from.setDataInfo(source.getDataInfo) + from.setLastupdatetimestamp(source.getLastupdatetimestamp) + + List(from) + } + + def generateCitationRelations(dois: List[String], result: Result): List[Relation] = { + dois.flatMap(d => createCiteRelation(result, d, "doi")) + } + + def mappingFunderToRelations( + funders: List[mappingFunder], + sourceId: String, + cf: KeyValue, + di: DataInfo, + ts: Long + ): List[Relation] = { + + val queue = new mutable.Queue[Relation] + + def snsfRule(award: String): String = { + val tmp1 = StringUtils.substringAfter(award, "_") + val tmp2 = StringUtils.substringBefore(tmp1, "/") + logger.debug(s"From $award to $tmp2") + tmp2 + + } + + def extractECAward(award: String): String = { + val awardECRegex: Regex = "[0-9]{4,9}".r + if (awardECRegex.findAllIn(award).hasNext) + return awardECRegex.findAllIn(award).max + null + } + + def generateRelation(sourceId: String, targetId: String, relClass: String): Relation = { + + val r = new Relation + r.setSource(sourceId) + r.setTarget(targetId) + r.setRelType(ModelConstants.RESULT_PROJECT) + r.setRelClass(relClass) + r.setSubRelType(ModelConstants.OUTCOME) + r.setCollectedfrom(List(cf).asJava) + r.setDataInfo(di) + r.setLastupdatetimestamp(ts) + r + + } + + def generateSimpleRelationFromAward( + funder: mappingFunder, + nsPrefix: String, + extractField: String => String + ): Unit = { + if (funder.award.isDefined && funder.award.get.nonEmpty) + funder.award.get + .map(extractField) + .filter(a => a != null && a.nonEmpty) + .foreach(award => { + val targetId = getProjectId(nsPrefix, DHPUtils.md5(award)) + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + }) + } + + def getProjectId(nsPrefix: String, targetId: String): String = { + s"40|$nsPrefix::$targetId" + } + + if (funders != null) + funders.foreach(funder => { + if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) { + + if (getIrishId(funder.DOI.get).isDefined) { + val nsPrefix = getIrishId(funder.DOI.get).get.padTo(12, '_') + val targetId = getProjectId(nsPrefix, "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + } + + funder.DOI.get match { + case "10.13039/100010663" | "10.13039/100010661" | "10.13039/501100007601" | "10.13039/501100000780" | + "10.13039/100010665" => + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + case "10.13039/100011199" | "10.13039/100004431" | "10.13039/501100004963" | "10.13039/501100000780" => + generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + case "10.13039/501100000781" => + generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + generateSimpleRelationFromAward(funder, "corda_____he", extractECAward) + case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a) + case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a) + case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a) + case "10.13039/501100001602" => + generateSimpleRelationFromAward(funder, "sfi_________", a => a.replace("SFI", "")) + case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a) + case "10.13039/501100000038" => + val targetId = getProjectId("nserc_______", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case "10.13039/501100000155" => + val targetId = getProjectId("sshrc_______", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case "10.13039/501100000024" => + val targetId = getProjectId("cihr________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + + case "10.13039/100020031" => + val targetId = getProjectId("tara________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + + case "10.13039/501100005416" => generateSimpleRelationFromAward(funder, "rcn_________", a => a) + case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a) + case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward) + case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a => a) + case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward) + case "10.13039/501100003407" => + generateSimpleRelationFromAward(funder, "miur________", a => a) + val targetId = getProjectId("miur________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case "10.13039/501100006588" | "10.13039/501100004488" => + generateSimpleRelationFromAward( + funder, + "irb_hr______", + a => a.replaceAll("Project No.", "").replaceAll("HRZZ-", "") + ) + case "10.13039/501100006769" => generateSimpleRelationFromAward(funder, "rsf_________", a => a) + case "10.13039/501100001711" => generateSimpleRelationFromAward(funder, "snsf________", snsfRule) + case "10.13039/501100004410" => generateSimpleRelationFromAward(funder, "tubitakf____", a => a) + case "10.13039/100004440" => + generateSimpleRelationFromAward(funder, "wt__________", a => a) + val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //ASAP + case "10.13039/100018231" => generateSimpleRelationFromAward(funder, "asap________", a => a) + //CHIST-ERA + case "10.13039/501100001942" => + val targetId = getProjectId("chistera____", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //HE + case "10.13039/100018693" | "10.13039/100018694" | "10.13039/100019188" | "10.13039/100019180" | + "10.13039/100018695" | "10.13039/100019185" | "10.13039/100019186" | "10.13039/100019187" => + generateSimpleRelationFromAward(funder, "corda_____he", extractECAward) + //FCT + case "10.13039/501100001871" => + generateSimpleRelationFromAward(funder, "fct_________", a => a) + //NHMRC + case "10.13039/501100000925" => + generateSimpleRelationFromAward(funder, "nhmrc_______", a => a) + //NIH + case "10.13039/100000002" => + generateSimpleRelationFromAward(funder, "nih_________", a => a) + //NWO + case "10.13039/501100003246" => + generateSimpleRelationFromAward(funder, "nwo_________", a => a) + //UKRI + case "10.13039/100014013" | "10.13039/501100000267" | "10.13039/501100000268" | "10.13039/501100000269" | + "10.13039/501100000266" | "10.13039/501100006041" | "10.13039/501100000265" | "10.13039/501100000270" | + "10.13039/501100013589" | "10.13039/501100000271" => + generateSimpleRelationFromAward(funder, "ukri________", a => a) + + case _ => logger.debug("no match for " + funder.DOI.get) + + } + + } else { + funder.name match { + case "European Union’s Horizon 2020 research and innovation program" => + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + case "European Union's" => + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + generateSimpleRelationFromAward(funder, "corda_____he", extractECAward) + case "The French National Research Agency (ANR)" | "The French National Research Agency" => + generateSimpleRelationFromAward(funder, "anr_________", a => a) + case "CONICYT, Programa de Formación de Capital Humano Avanzado" => + generateSimpleRelationFromAward(funder, "conicytf____", a => a) + case "Wellcome Trust Masters Fellowship" => + generateSimpleRelationFromAward(funder, "wt__________", a => a) + val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case _ => logger.debug("no match for " + funder.name) + + } + } + + }) + queue.toList + } + + def convertDataset(dataset: Dataset): Unit = { + // TODO check if there are other info to map into the Dataset + } + + def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + val containerTitles = for { JString(ct) <- json \ "container-title" } yield ct + + //Mapping book + if (cobjCategory.toLowerCase.contains("book")) { + val ISBN = for { JString(isbn) <- json \ "ISBN" } yield isbn + if (ISBN.nonEmpty && containerTitles.nonEmpty) { + val source = s"${containerTitles.head} ISBN: ${ISBN.head}" + if (publication.getSource != null) { + val l: List[Field[String]] = publication.getSource.asScala.toList + val ll: List[Field[String]] = l ::: List(field(source,null)) + publication.setSource(ll.asJava) + } else + publication.setSource(List(field(source,null)).asJava) + } + } else { + // Mapping Journal + + val issnInfos = for { + JArray(issn_types) <- json \ "issn-type" + JObject(issn_type) <- issn_types + JField("type", JString(tp)) <- issn_type + JField("value", JString(vl)) <- issn_type + } yield Tuple2(tp, vl) + + val volume = (json \ "volume").extractOrElse[String](null) + if (containerTitles.nonEmpty) { + val journal = new Journal + journal.setName(containerTitles.head) + if (issnInfos.nonEmpty) { + + issnInfos.foreach(tp => { + tp._1 match { + case "electronic" => journal.setIssnOnline(tp._2) + case "print" => journal.setIssnPrinted(tp._2) + } + }) + } + journal.setVol(volume) + val page = (json \ "page").extractOrElse[String](null) + if (page != null) { + val pp = page.split("-") + if (pp.nonEmpty) + journal.setSp(pp.head) + if (pp.size > 1) + journal.setEp(pp(1)) + } + publication.setJournal(journal) + } + } + } + + def extractDate(dt: String, datePart: List[List[Int]]): String = { + if (StringUtils.isNotBlank(dt)) + return GraphCleaningFunctions.cleanDate(dt) + if (datePart != null && datePart.size == 1) { + val res = datePart.head + if (res.size == 3) { + val dp = f"${res.head}-${res(1)}%02d-${res(2)}%02d" + if (dp.length == 10) { + return GraphCleaningFunctions.cleanDate(dp) + } + } else if (res.size == 2) { + val dp = f"${res.head}-${res(1)}%02d-01" + return GraphCleaningFunctions.cleanDate(dp) + } else if (res.size == 1) { + return GraphCleaningFunctions.cleanDate(s"${res.head}-01-01") + } + } + null + + } + + def generateDate( + dt: String, + datePart: List[List[Int]], + classId: String, + schemeId: String + ): StructuredProperty = { + val dp = extractDate(dt, datePart) + if (StringUtils.isNotBlank(dp)) + return structuredProperty(dp, qualifier(classId,classId, schemeId, schemeId),null) + null + } + + def generateItemFromType(objectType: String, objectSubType: String): Result = { + if (mappingCrossrefType.contains(objectType)) { + if (mappingCrossrefType(objectType).equalsIgnoreCase("publication")) + return new Publication() + if (mappingCrossrefType(objectType).equalsIgnoreCase("dataset")) + return new Dataset() + } + null + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/CrossrefDataset.scala new file mode 100644 index 000000000..f93acc599 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/CrossrefDataset.scala @@ -0,0 +1,106 @@ +package eu.dnetlib.dhp.collection.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object CrossrefDataset { + + val logger: Logger = LoggerFactory.getLogger(CrossrefDataset.getClass) + + def to_item(input: String): CrossrefDT = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val ts: Long = (json \ "indexed" \ "timestamp").extract[Long] + val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) + CrossrefDT(doi, input, ts) + + } + + def main(args: Array[String]): Unit = { + + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser( + IOUtils.toString( + CrossrefDataset.getClass.getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json" + ) + ) + ) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(CrossrefDataset.getClass.getSimpleName) + .master(parser.get("master")) + .getOrCreate() + import spark.implicits._ + + val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable { + + override def zero: CrossrefDT = null + + override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + if (a.timestamp > b.timestamp) { + return a + } + b + } + + override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + if (a.timestamp > b.timestamp) { + return a + } + b + } + + override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def finish(reduction: CrossrefDT): CrossrefDT = reduction + } + + val workingPath: String = parser.get("workingPath") + + val main_ds: Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] + + val update = + spark.createDataset( + spark.sparkContext + .sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) + .map(i => CrossrefImporter.decompressBlob(i._2.toString)) + .map(i => to_item(i)) + ) + + main_ds + .union(update) + .groupByKey(_.doi) + .agg(crossrefAggregator.toColumn) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/crossref_ds_updated") + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala new file mode 100644 index 000000000..c1deddb02 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/GenerateCrossrefDataset.scala @@ -0,0 +1,66 @@ +package eu.dnetlib.dhp.collection.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.utils.DoiCleaningRule +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object GenerateCrossrefDataset { + + val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass) + + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + + def crossrefElement(meta: String): CrossrefDT = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(meta) + val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String]) + val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] + CrossrefDT(doi, meta, timestamp) + + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser( + Source + .fromInputStream( + getClass.getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json" + ) + ) + .mkString + ) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession + .builder() + .config(conf) + .appName(GenerateCrossrefDataset.getClass.getSimpleName) + .master(master) + .getOrCreate() + val sc: SparkContext = spark.sparkContext + + import spark.implicits._ + + val tmp: RDD[String] = sc.textFile(sourcePath, 6000) + + spark + .createDataset(tmp) + .map(entry => crossrefElement(entry)) + .write + .mode(SaveMode.Overwrite) + .save(targetPath) + } + +}