forked from D-Net/dnet-hadoop
Moved Crossref Mapping on dhp-aggregations,
refactored code, avoid to use utility for create part of the oaf defined in DOIBoostMappingUtils, used instead utility in OafMappingUtils
This commit is contained in:
parent
cbd4e5e4bb
commit
c532831718
|
@ -1,8 +1,12 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
public class DoiCleaningRule {
|
public class DoiCleaningRule {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static String clean(final String doi) {
|
public static String clean(final String doi) {
|
||||||
return doi
|
return doi
|
||||||
.toLowerCase()
|
.toLowerCase()
|
||||||
|
@ -11,4 +15,27 @@ public class DoiCleaningRule {
|
||||||
.replaceFirst(CleaningFunctions.DOI_PREFIX_REGEX, CleaningFunctions.DOI_PREFIX);
|
.replaceFirst(CleaningFunctions.DOI_PREFIX_REGEX, CleaningFunctions.DOI_PREFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String normalizeDoi(final String input){
|
||||||
|
if (input == null)
|
||||||
|
return null;
|
||||||
|
final String replaced = input
|
||||||
|
.replaceAll("\\n|\\r|\\t|\\s", "")
|
||||||
|
.toLowerCase()
|
||||||
|
.replaceFirst(CleaningFunctions.DOI_PREFIX_REGEX, CleaningFunctions.DOI_PREFIX);
|
||||||
|
if (StringUtils.isEmpty(replaced))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
if (!replaced.contains("10."))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
|
||||||
|
final String ret = replaced.substring(replaced.indexOf("10."));
|
||||||
|
|
||||||
|
if (!ret.startsWith(CleaningFunctions.DOI_PREFIX))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,819 @@
|
||||||
|
package eu.dnetlib.dhp.collection.crossref
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
|
import eu.dnetlib.dhp.schema.oaf._
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.{field, qualifier, structuredProperty, subject}
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.{DoiCleaningRule, GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
|
import org.apache.commons.lang.StringUtils
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.JsonAST._
|
||||||
|
import org.json4s.jackson.JsonMethods._
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
import java.time.LocalDate
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
import java.util
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.collection.mutable
|
||||||
|
import scala.io.Source
|
||||||
|
import scala.util.matching.Regex
|
||||||
|
|
||||||
|
case class CrossrefDT(doi: String, json: String, timestamp: Long) {}
|
||||||
|
|
||||||
|
case class mappingAffiliation(name: String) {}
|
||||||
|
|
||||||
|
case class mappingAuthor(
|
||||||
|
given: Option[String],
|
||||||
|
family: Option[String],
|
||||||
|
sequence: Option[String],
|
||||||
|
ORCID: Option[String],
|
||||||
|
affiliation: Option[mappingAffiliation]
|
||||||
|
) {}
|
||||||
|
|
||||||
|
case class funderInfo(id: String, uri: String, name: String, synonym: List[String]) {}
|
||||||
|
|
||||||
|
case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {}
|
||||||
|
|
||||||
|
case object Crossref2Oaf {
|
||||||
|
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
|
||||||
|
|
||||||
|
val irishFunder: List[funderInfo] = {
|
||||||
|
val s = Source
|
||||||
|
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref/irish_funder.json"))
|
||||||
|
.mkString
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: org.json4s.JValue = parse(s)
|
||||||
|
json.extract[List[funderInfo]]
|
||||||
|
}
|
||||||
|
|
||||||
|
def createCrossrefCollectedFrom(): KeyValue = {
|
||||||
|
|
||||||
|
val cf = new KeyValue
|
||||||
|
cf.setValue("Crossref");
|
||||||
|
cf.setKey(ModelConstants.CROSSREF_ID)
|
||||||
|
cf
|
||||||
|
|
||||||
|
}
|
||||||
|
def generateDataInfo(): DataInfo = {
|
||||||
|
generateDataInfo("0.9")
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateDataInfo(trust: String): DataInfo = {
|
||||||
|
val di = new DataInfo
|
||||||
|
di.setDeletedbyinference(false)
|
||||||
|
di.setInferred(false)
|
||||||
|
di.setInvisible(false)
|
||||||
|
di.setTrust(trust)
|
||||||
|
di.setProvenanceaction(
|
||||||
|
OafMapperUtils.qualifier(
|
||||||
|
ModelConstants.SYSIMPORT_ACTIONSET,
|
||||||
|
ModelConstants.SYSIMPORT_ACTIONSET,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
di
|
||||||
|
}
|
||||||
|
def getOpenAccessQualifier(): AccessRight = {
|
||||||
|
|
||||||
|
OafMapperUtils.accessRight(
|
||||||
|
ModelConstants.ACCESS_RIGHT_OPEN,
|
||||||
|
"Open Access",
|
||||||
|
ModelConstants.DNET_ACCESS_MODES,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def getRestrictedQualifier(): AccessRight = {
|
||||||
|
OafMapperUtils.accessRight(
|
||||||
|
"RESTRICTED",
|
||||||
|
"Restricted",
|
||||||
|
ModelConstants.DNET_ACCESS_MODES,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES
|
||||||
|
)
|
||||||
|
}
|
||||||
|
def getUnknownQualifier(): AccessRight = {
|
||||||
|
OafMapperUtils.accessRight(
|
||||||
|
ModelConstants.UNKNOWN,
|
||||||
|
ModelConstants.NOT_AVAILABLE,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def getEmbargoedAccessQualifier(): AccessRight = {
|
||||||
|
OafMapperUtils.accessRight(
|
||||||
|
"EMBARGO",
|
||||||
|
"Embargo",
|
||||||
|
ModelConstants.DNET_ACCESS_MODES,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def getClosedAccessQualifier(): AccessRight = {
|
||||||
|
OafMapperUtils.accessRight(
|
||||||
|
"CLOSED",
|
||||||
|
"Closed Access",
|
||||||
|
ModelConstants.DNET_ACCESS_MODES,
|
||||||
|
ModelConstants.DNET_ACCESS_MODES
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def decideAccessRight(lic: Field[String], date: String): AccessRight = {
|
||||||
|
if (lic == null) {
|
||||||
|
//Default value Unknown
|
||||||
|
return getUnknownQualifier()
|
||||||
|
}
|
||||||
|
val license: String = lic.getValue
|
||||||
|
//CC licenses
|
||||||
|
if (
|
||||||
|
license.startsWith("cc") ||
|
||||||
|
license.startsWith("http://creativecommons.org/licenses") ||
|
||||||
|
license.startsWith("https://creativecommons.org/licenses") ||
|
||||||
|
|
||||||
|
//ACS Publications Author choice licenses (considered OPEN also by Unpaywall)
|
||||||
|
license.equals("http://pubs.acs.org/page/policy/authorchoice_ccby_termsofuse.html") ||
|
||||||
|
license.equals("http://pubs.acs.org/page/policy/authorchoice_termsofuse.html") ||
|
||||||
|
license.equals("http://pubs.acs.org/page/policy/authorchoice_ccbyncnd_termsofuse.html") ||
|
||||||
|
|
||||||
|
//APA (considered OPEN also by Unpaywall)
|
||||||
|
license.equals("http://www.apa.org/pubs/journals/resources/open-access.aspx")
|
||||||
|
) {
|
||||||
|
|
||||||
|
val oaq: AccessRight = getOpenAccessQualifier()
|
||||||
|
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
|
||||||
|
return oaq
|
||||||
|
}
|
||||||
|
|
||||||
|
//OUP (BUT ONLY AFTER 12 MONTHS FROM THE PUBLICATION DATE, OTHERWISE THEY ARE EMBARGOED)
|
||||||
|
if (
|
||||||
|
license.equals(
|
||||||
|
"https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model"
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
val now = java.time.LocalDate.now
|
||||||
|
|
||||||
|
try {
|
||||||
|
val pub_date = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd"))
|
||||||
|
if (((now.toEpochDay - pub_date.toEpochDay) / 365.0) > 1) {
|
||||||
|
val oaq: AccessRight = getOpenAccessQualifier()
|
||||||
|
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
|
||||||
|
return oaq
|
||||||
|
} else {
|
||||||
|
return getEmbargoedAccessQualifier()
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case e: Exception => {
|
||||||
|
try {
|
||||||
|
val pub_date =
|
||||||
|
LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"))
|
||||||
|
if (((now.toEpochDay - pub_date.toEpochDay) / 365.0) > 1) {
|
||||||
|
val oaq: AccessRight = getOpenAccessQualifier()
|
||||||
|
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
|
||||||
|
return oaq
|
||||||
|
} else {
|
||||||
|
return getEmbargoedAccessQualifier()
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case ex: Exception => return getClosedAccessQualifier()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
getClosedAccessQualifier()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def getIrishId(doi: String): Option[String] = {
|
||||||
|
val id = doi.split("/").last
|
||||||
|
irishFunder
|
||||||
|
.find(f => id.equalsIgnoreCase(f.id) || (f.synonym.nonEmpty && f.synonym.exists(s => s.equalsIgnoreCase(id))))
|
||||||
|
.map(f => f.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
def mappingResult(result: Result, json: JValue, cobjCategory: String, originalType: String): Result = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
|
||||||
|
//MAPPING Crossref DOI into PID
|
||||||
|
val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String])
|
||||||
|
result.setPid(List(structuredProperty(
|
||||||
|
doi,
|
||||||
|
qualifier(
|
||||||
|
PidType.doi.toString,
|
||||||
|
PidType.doi.toString,
|
||||||
|
ModelConstants.DNET_PID_TYPES,
|
||||||
|
ModelConstants.DNET_PID_TYPES
|
||||||
|
),
|
||||||
|
null
|
||||||
|
)).asJava)
|
||||||
|
|
||||||
|
//MAPPING Crossref DOI into OriginalId
|
||||||
|
//and Other Original Identifier of dataset like clinical-trial-number
|
||||||
|
val clinicalTrialNumbers = for (JString(ctr) <- json \ "clinical-trial-number") yield ctr
|
||||||
|
val alternativeIds = for (JString(ids) <- json \ "alternative-id") yield ids
|
||||||
|
val tmp = clinicalTrialNumbers ::: alternativeIds ::: List(doi)
|
||||||
|
|
||||||
|
val originalIds = new util.ArrayList(tmp.filter(id => id != null).asJava)
|
||||||
|
result.setOriginalId(originalIds)
|
||||||
|
|
||||||
|
// Add DataInfo
|
||||||
|
result.setDataInfo(generateDataInfo())
|
||||||
|
|
||||||
|
result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long])
|
||||||
|
result.setDateofcollection((json \ "indexed" \ "date-time").extract[String])
|
||||||
|
|
||||||
|
result.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava)
|
||||||
|
|
||||||
|
// Publisher ( Name of work's publisher mapped into Result/Publisher)
|
||||||
|
val publisher = (json \ "publisher").extractOrElse[String](null)
|
||||||
|
if (publisher != null && publisher.nonEmpty)
|
||||||
|
result.setPublisher(field(publisher, null))
|
||||||
|
|
||||||
|
// TITLE
|
||||||
|
val mainTitles =
|
||||||
|
for { JString(title) <- json \ "title" if title.nonEmpty } yield {
|
||||||
|
structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, null)
|
||||||
|
}
|
||||||
|
val originalTitles = for {
|
||||||
|
JString(title) <- json \ "original-title" if title.nonEmpty
|
||||||
|
} yield structuredProperty(title, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null)
|
||||||
|
val shortTitles = for {
|
||||||
|
JString(title) <- json \ "short-title" if title.nonEmpty
|
||||||
|
} yield structuredProperty(title, ModelConstants.ALTERNATIVE_TITLE_QUALIFIER, null)
|
||||||
|
val subtitles =
|
||||||
|
for { JString(title) <- json \ "subtitle" if title.nonEmpty } yield structuredProperty(
|
||||||
|
title,
|
||||||
|
ModelConstants.SUBTITLE_QUALIFIER, null)
|
||||||
|
result.setTitle((mainTitles ::: originalTitles ::: shortTitles ::: subtitles).asJava)
|
||||||
|
|
||||||
|
// DESCRIPTION
|
||||||
|
val descriptionList =
|
||||||
|
for { JString(description) <- json \ "abstract" } yield field[String](description, null)
|
||||||
|
result.setDescription(descriptionList.asJava)
|
||||||
|
|
||||||
|
// Source
|
||||||
|
val sourceList = for {
|
||||||
|
JString(source) <- json \ "source" if source != null && source.nonEmpty
|
||||||
|
} yield field(source, null)
|
||||||
|
result.setSource(sourceList.asJava)
|
||||||
|
|
||||||
|
//RELEVANT DATE Mapping
|
||||||
|
val createdDate = generateDate(
|
||||||
|
(json \ "created" \ "date-time").extract[String],
|
||||||
|
(json \ "created" \ "date-parts").extract[List[List[Int]]],
|
||||||
|
"created",
|
||||||
|
ModelConstants.DNET_DATACITE_DATE
|
||||||
|
)
|
||||||
|
val postedDate = generateDate(
|
||||||
|
(json \ "posted" \ "date-time").extractOrElse[String](null),
|
||||||
|
(json \ "posted" \ "date-parts").extract[List[List[Int]]],
|
||||||
|
"available",
|
||||||
|
ModelConstants.DNET_DATACITE_DATE
|
||||||
|
)
|
||||||
|
val acceptedDate = generateDate(
|
||||||
|
(json \ "accepted" \ "date-time").extractOrElse[String](null),
|
||||||
|
(json \ "accepted" \ "date-parts").extract[List[List[Int]]],
|
||||||
|
"accepted",
|
||||||
|
ModelConstants.DNET_DATACITE_DATE
|
||||||
|
)
|
||||||
|
val publishedPrintDate = generateDate(
|
||||||
|
(json \ "published-print" \ "date-time").extractOrElse[String](null),
|
||||||
|
(json \ "published-print" \ "date-parts").extract[List[List[Int]]],
|
||||||
|
"published-print",
|
||||||
|
ModelConstants.DNET_DATACITE_DATE
|
||||||
|
)
|
||||||
|
val publishedOnlineDate = generateDate(
|
||||||
|
(json \ "published-online" \ "date-time").extractOrElse[String](null),
|
||||||
|
(json \ "published-online" \ "date-parts").extract[List[List[Int]]],
|
||||||
|
"published-online",
|
||||||
|
ModelConstants.DNET_DATACITE_DATE
|
||||||
|
)
|
||||||
|
|
||||||
|
val issuedDate = extractDate(
|
||||||
|
(json \ "issued" \ "date-time").extractOrElse[String](null),
|
||||||
|
(json \ "issued" \ "date-parts").extract[List[List[Int]]]
|
||||||
|
)
|
||||||
|
if (StringUtils.isNotBlank(issuedDate)) {
|
||||||
|
result.setDateofacceptance(field(issuedDate,null))
|
||||||
|
} else {
|
||||||
|
result.setDateofacceptance(field(createdDate.getValue,null))
|
||||||
|
}
|
||||||
|
result.setRelevantdate(
|
||||||
|
List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate)
|
||||||
|
.filter(p => p != null)
|
||||||
|
.asJava
|
||||||
|
)
|
||||||
|
|
||||||
|
//Mapping Subject
|
||||||
|
val subjectList: List[String] = (json \ "subject").extractOrElse[List[String]](List())
|
||||||
|
|
||||||
|
if (subjectList.nonEmpty) {
|
||||||
|
result.setSubject(
|
||||||
|
subjectList.map(s => subject(s,ModelConstants.SUBTITLE_QUALIFIER,null)).asJava
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Mapping Author
|
||||||
|
val authorList: List[mappingAuthor] =
|
||||||
|
(json \ "author").extract[List[mappingAuthor]].filter(a => a.family.isDefined)
|
||||||
|
|
||||||
|
val sorted_list = authorList.sortWith((a: mappingAuthor, b: mappingAuthor) =>
|
||||||
|
a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first")
|
||||||
|
)
|
||||||
|
|
||||||
|
result.setAuthor(sorted_list.zipWithIndex.map { case (a, index) =>
|
||||||
|
generateAuhtor(a.given.orNull, a.family.get, a.ORCID.orNull, index)
|
||||||
|
}.asJava)
|
||||||
|
|
||||||
|
// Mapping instance
|
||||||
|
val instance = new Instance()
|
||||||
|
val license = for {
|
||||||
|
JObject(license) <- json \ "license"
|
||||||
|
JField("URL", JString(lic)) <- license
|
||||||
|
JField("content-version", JString(content_version)) <- license
|
||||||
|
} yield (field[String](lic,null), content_version)
|
||||||
|
val l = license.filter(d => StringUtils.isNotBlank(d._1.getValue))
|
||||||
|
if (l.nonEmpty) {
|
||||||
|
if (l exists (d => d._2.equals("vor"))) {
|
||||||
|
for (d <- l) {
|
||||||
|
if (d._2.equals("vor")) {
|
||||||
|
instance.setLicense(d._1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
instance.setLicense(l.head._1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ticket #6281 added pid to Instance
|
||||||
|
instance.setPid(result.getPid)
|
||||||
|
|
||||||
|
val has_review = json \ "relation" \ "has-review" \ "id"
|
||||||
|
|
||||||
|
if (has_review != JNothing) {
|
||||||
|
instance.setRefereed(
|
||||||
|
OafMapperUtils.qualifier(
|
||||||
|
"0001",
|
||||||
|
"peerReviewed",
|
||||||
|
ModelConstants.DNET_REVIEW_LEVELS,
|
||||||
|
ModelConstants.DNET_REVIEW_LEVELS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.setAccessright(
|
||||||
|
decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue)
|
||||||
|
)
|
||||||
|
instance.setInstancetype(
|
||||||
|
OafMapperUtils.qualifier(
|
||||||
|
cobjCategory.substring(0, 4),
|
||||||
|
cobjCategory.substring(5),
|
||||||
|
ModelConstants.DNET_PUBLICATION_RESOURCE,
|
||||||
|
ModelConstants.DNET_PUBLICATION_RESOURCE
|
||||||
|
)
|
||||||
|
)
|
||||||
|
//ADD ORIGINAL TYPE to the mapping
|
||||||
|
val itm = new InstanceTypeMapping
|
||||||
|
itm.setOriginalType(originalType)
|
||||||
|
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
|
||||||
|
instance.setInstanceTypeMapping(List(itm).asJava)
|
||||||
|
result.setResourcetype(
|
||||||
|
OafMapperUtils.qualifier(
|
||||||
|
cobjCategory.substring(0, 4),
|
||||||
|
cobjCategory.substring(5),
|
||||||
|
ModelConstants.DNET_PUBLICATION_RESOURCE,
|
||||||
|
ModelConstants.DNET_PUBLICATION_RESOURCE
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
instance.setCollectedfrom(createCrossrefCollectedFrom())
|
||||||
|
if (StringUtils.isNotBlank(issuedDate)) {
|
||||||
|
instance.setDateofacceptance(field(issuedDate,null))
|
||||||
|
} else {
|
||||||
|
instance.setDateofacceptance(field(createdDate.getValue,null))
|
||||||
|
}
|
||||||
|
val s: List[String] = List("https://doi.org/" + doi)
|
||||||
|
// val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null && p.toLowerCase().contains(doi.toLowerCase())).distinct
|
||||||
|
// if (links.nonEmpty) {
|
||||||
|
// instance.setUrl(links.asJava)
|
||||||
|
// }
|
||||||
|
if (s.nonEmpty) {
|
||||||
|
instance.setUrl(s.asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
result.setInstance(List(instance).asJava)
|
||||||
|
|
||||||
|
//IMPORTANT
|
||||||
|
//The old method result.setId(generateIdentifier(result, doi))
|
||||||
|
//is replaced using IdentifierFactory, but the old identifier
|
||||||
|
//is preserved among the originalId(s)
|
||||||
|
val oldId = generateIdentifier(result, doi)
|
||||||
|
result.setId(oldId)
|
||||||
|
|
||||||
|
val newId = IdentifierFactory.createDOIBoostIdentifier(result)
|
||||||
|
if (!oldId.equalsIgnoreCase(newId)) {
|
||||||
|
result.getOriginalId.add(oldId)
|
||||||
|
}
|
||||||
|
result.setId(newId)
|
||||||
|
|
||||||
|
if (result.getId == null)
|
||||||
|
null
|
||||||
|
else
|
||||||
|
result
|
||||||
|
}
|
||||||
|
def generateIdentifier(oaf: Result, doi: String): String = {
|
||||||
|
val id = DHPUtils.md5(doi.toLowerCase)
|
||||||
|
s"50|doiboost____|$id"
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateAuhtor(given: String, family: String, orcid: String, index: Int): Author = {
|
||||||
|
val a = new Author
|
||||||
|
a.setName(given)
|
||||||
|
a.setSurname(family)
|
||||||
|
a.setFullname(s"$given $family")
|
||||||
|
a.setRank(index + 1)
|
||||||
|
if (StringUtils.isNotBlank(orcid))
|
||||||
|
a.setPid(
|
||||||
|
List(
|
||||||
|
structuredProperty(
|
||||||
|
orcid,
|
||||||
|
qualifier( ModelConstants.ORCID_PENDING, ModelConstants.ORCID_PENDING, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES),
|
||||||
|
generateDataInfo()
|
||||||
|
)
|
||||||
|
).asJava
|
||||||
|
)
|
||||||
|
|
||||||
|
a
|
||||||
|
}
|
||||||
|
|
||||||
|
def convert(input: String): List[Oaf] = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
|
||||||
|
var resultList: List[Oaf] = List()
|
||||||
|
|
||||||
|
val objectType = (json \ "type").extractOrElse[String](null)
|
||||||
|
val objectSubType = (json \ "subtype").extractOrElse[String](null)
|
||||||
|
if (objectType == null)
|
||||||
|
return resultList
|
||||||
|
|
||||||
|
val result = generateItemFromType(objectType, objectSubType)
|
||||||
|
if (result == null)
|
||||||
|
return List()
|
||||||
|
val cOBJCategory = mappingCrossrefSubType.getOrElse(
|
||||||
|
objectType,
|
||||||
|
mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")
|
||||||
|
)
|
||||||
|
|
||||||
|
val originalType = if (mappingCrossrefSubType.contains(objectType)) objectType else objectSubType
|
||||||
|
mappingResult(result, json, cOBJCategory, originalType)
|
||||||
|
if (result == null || result.getId == null)
|
||||||
|
return List()
|
||||||
|
|
||||||
|
val funderList: List[mappingFunder] =
|
||||||
|
(json \ "funder").extractOrElse[List[mappingFunder]](List())
|
||||||
|
|
||||||
|
if (funderList.nonEmpty) {
|
||||||
|
resultList = resultList ::: mappingFunderToRelations(
|
||||||
|
funderList,
|
||||||
|
result.getId,
|
||||||
|
createCrossrefCollectedFrom(),
|
||||||
|
result.getDataInfo,
|
||||||
|
result.getLastupdatetimestamp
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
result match {
|
||||||
|
case publication: Publication => convertPublication(publication, json, cOBJCategory)
|
||||||
|
case dataset: Dataset => convertDataset(dataset)
|
||||||
|
}
|
||||||
|
|
||||||
|
val doisReference: List[String] = for {
|
||||||
|
JObject(reference_json) <- json \ "reference"
|
||||||
|
JField("DOI", JString(doi_json)) <- reference_json
|
||||||
|
} yield doi_json
|
||||||
|
|
||||||
|
if (doisReference != null && doisReference.nonEmpty) {
|
||||||
|
val citation_relations: List[Relation] = generateCitationRelations(doisReference, result)
|
||||||
|
resultList = resultList ::: citation_relations
|
||||||
|
}
|
||||||
|
resultList = resultList ::: List(result)
|
||||||
|
resultList
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createCiteRelation(source: Result, targetPid: String, targetPidType: String): List[Relation] = {
|
||||||
|
|
||||||
|
val targetId = IdentifierFactory.idFromPid("50", targetPidType, targetPid, true)
|
||||||
|
|
||||||
|
val from = new Relation
|
||||||
|
from.setSource(source.getId)
|
||||||
|
from.setTarget(targetId)
|
||||||
|
from.setRelType(ModelConstants.RESULT_RESULT)
|
||||||
|
from.setRelClass(ModelConstants.CITES)
|
||||||
|
from.setSubRelType(ModelConstants.CITATION)
|
||||||
|
from.setCollectedfrom(source.getCollectedfrom)
|
||||||
|
from.setDataInfo(source.getDataInfo)
|
||||||
|
from.setLastupdatetimestamp(source.getLastupdatetimestamp)
|
||||||
|
|
||||||
|
List(from)
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateCitationRelations(dois: List[String], result: Result): List[Relation] = {
|
||||||
|
dois.flatMap(d => createCiteRelation(result, d, "doi"))
|
||||||
|
}
|
||||||
|
|
||||||
|
def mappingFunderToRelations(
|
||||||
|
funders: List[mappingFunder],
|
||||||
|
sourceId: String,
|
||||||
|
cf: KeyValue,
|
||||||
|
di: DataInfo,
|
||||||
|
ts: Long
|
||||||
|
): List[Relation] = {
|
||||||
|
|
||||||
|
val queue = new mutable.Queue[Relation]
|
||||||
|
|
||||||
|
def snsfRule(award: String): String = {
|
||||||
|
val tmp1 = StringUtils.substringAfter(award, "_")
|
||||||
|
val tmp2 = StringUtils.substringBefore(tmp1, "/")
|
||||||
|
logger.debug(s"From $award to $tmp2")
|
||||||
|
tmp2
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractECAward(award: String): String = {
|
||||||
|
val awardECRegex: Regex = "[0-9]{4,9}".r
|
||||||
|
if (awardECRegex.findAllIn(award).hasNext)
|
||||||
|
return awardECRegex.findAllIn(award).max
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateRelation(sourceId: String, targetId: String, relClass: String): Relation = {
|
||||||
|
|
||||||
|
val r = new Relation
|
||||||
|
r.setSource(sourceId)
|
||||||
|
r.setTarget(targetId)
|
||||||
|
r.setRelType(ModelConstants.RESULT_PROJECT)
|
||||||
|
r.setRelClass(relClass)
|
||||||
|
r.setSubRelType(ModelConstants.OUTCOME)
|
||||||
|
r.setCollectedfrom(List(cf).asJava)
|
||||||
|
r.setDataInfo(di)
|
||||||
|
r.setLastupdatetimestamp(ts)
|
||||||
|
r
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateSimpleRelationFromAward(
|
||||||
|
funder: mappingFunder,
|
||||||
|
nsPrefix: String,
|
||||||
|
extractField: String => String
|
||||||
|
): Unit = {
|
||||||
|
if (funder.award.isDefined && funder.award.get.nonEmpty)
|
||||||
|
funder.award.get
|
||||||
|
.map(extractField)
|
||||||
|
.filter(a => a != null && a.nonEmpty)
|
||||||
|
.foreach(award => {
|
||||||
|
val targetId = getProjectId(nsPrefix, DHPUtils.md5(award))
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
def getProjectId(nsPrefix: String, targetId: String): String = {
|
||||||
|
s"40|$nsPrefix::$targetId"
|
||||||
|
}
|
||||||
|
|
||||||
|
if (funders != null)
|
||||||
|
funders.foreach(funder => {
|
||||||
|
if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) {
|
||||||
|
|
||||||
|
if (getIrishId(funder.DOI.get).isDefined) {
|
||||||
|
val nsPrefix = getIrishId(funder.DOI.get).get.padTo(12, '_')
|
||||||
|
val targetId = getProjectId(nsPrefix, "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
}
|
||||||
|
|
||||||
|
funder.DOI.get match {
|
||||||
|
case "10.13039/100010663" | "10.13039/100010661" | "10.13039/501100007601" | "10.13039/501100000780" |
|
||||||
|
"10.13039/100010665" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
|
||||||
|
case "10.13039/100011199" | "10.13039/100004431" | "10.13039/501100004963" | "10.13039/501100000780" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
|
||||||
|
case "10.13039/501100000781" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
|
||||||
|
generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_____he", extractECAward)
|
||||||
|
case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a)
|
||||||
|
case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a)
|
||||||
|
case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a)
|
||||||
|
case "10.13039/501100001602" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "sfi_________", a => a.replace("SFI", ""))
|
||||||
|
case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a)
|
||||||
|
case "10.13039/501100000038" =>
|
||||||
|
val targetId = getProjectId("nserc_______", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
case "10.13039/501100000155" =>
|
||||||
|
val targetId = getProjectId("sshrc_______", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
case "10.13039/501100000024" =>
|
||||||
|
val targetId = getProjectId("cihr________", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
|
||||||
|
case "10.13039/100020031" =>
|
||||||
|
val targetId = getProjectId("tara________", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
|
||||||
|
case "10.13039/501100005416" => generateSimpleRelationFromAward(funder, "rcn_________", a => a)
|
||||||
|
case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a)
|
||||||
|
case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward)
|
||||||
|
case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a => a)
|
||||||
|
case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward)
|
||||||
|
case "10.13039/501100003407" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "miur________", a => a)
|
||||||
|
val targetId = getProjectId("miur________", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
case "10.13039/501100006588" | "10.13039/501100004488" =>
|
||||||
|
generateSimpleRelationFromAward(
|
||||||
|
funder,
|
||||||
|
"irb_hr______",
|
||||||
|
a => a.replaceAll("Project No.", "").replaceAll("HRZZ-", "")
|
||||||
|
)
|
||||||
|
case "10.13039/501100006769" => generateSimpleRelationFromAward(funder, "rsf_________", a => a)
|
||||||
|
case "10.13039/501100001711" => generateSimpleRelationFromAward(funder, "snsf________", snsfRule)
|
||||||
|
case "10.13039/501100004410" => generateSimpleRelationFromAward(funder, "tubitakf____", a => a)
|
||||||
|
case "10.13039/100004440" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "wt__________", a => a)
|
||||||
|
val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
//ASAP
|
||||||
|
case "10.13039/100018231" => generateSimpleRelationFromAward(funder, "asap________", a => a)
|
||||||
|
//CHIST-ERA
|
||||||
|
case "10.13039/501100001942" =>
|
||||||
|
val targetId = getProjectId("chistera____", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
//HE
|
||||||
|
case "10.13039/100018693" | "10.13039/100018694" | "10.13039/100019188" | "10.13039/100019180" |
|
||||||
|
"10.13039/100018695" | "10.13039/100019185" | "10.13039/100019186" | "10.13039/100019187" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_____he", extractECAward)
|
||||||
|
//FCT
|
||||||
|
case "10.13039/501100001871" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "fct_________", a => a)
|
||||||
|
//NHMRC
|
||||||
|
case "10.13039/501100000925" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "nhmrc_______", a => a)
|
||||||
|
//NIH
|
||||||
|
case "10.13039/100000002" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "nih_________", a => a)
|
||||||
|
//NWO
|
||||||
|
case "10.13039/501100003246" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "nwo_________", a => a)
|
||||||
|
//UKRI
|
||||||
|
case "10.13039/100014013" | "10.13039/501100000267" | "10.13039/501100000268" | "10.13039/501100000269" |
|
||||||
|
"10.13039/501100000266" | "10.13039/501100006041" | "10.13039/501100000265" | "10.13039/501100000270" |
|
||||||
|
"10.13039/501100013589" | "10.13039/501100000271" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "ukri________", a => a)
|
||||||
|
|
||||||
|
case _ => logger.debug("no match for " + funder.DOI.get)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
funder.name match {
|
||||||
|
case "European Union’s Horizon 2020 research and innovation program" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
|
||||||
|
case "European Union's" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
|
||||||
|
generateSimpleRelationFromAward(funder, "corda_____he", extractECAward)
|
||||||
|
case "The French National Research Agency (ANR)" | "The French National Research Agency" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "anr_________", a => a)
|
||||||
|
case "CONICYT, Programa de Formación de Capital Humano Avanzado" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "conicytf____", a => a)
|
||||||
|
case "Wellcome Trust Masters Fellowship" =>
|
||||||
|
generateSimpleRelationFromAward(funder, "wt__________", a => a)
|
||||||
|
val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63")
|
||||||
|
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||||
|
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||||
|
case _ => logger.debug("no match for " + funder.name)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
queue.toList
|
||||||
|
}
|
||||||
|
|
||||||
|
def convertDataset(dataset: Dataset): Unit = {
|
||||||
|
// TODO check if there are other info to map into the Dataset
|
||||||
|
}
|
||||||
|
|
||||||
|
def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
val containerTitles = for { JString(ct) <- json \ "container-title" } yield ct
|
||||||
|
|
||||||
|
//Mapping book
|
||||||
|
if (cobjCategory.toLowerCase.contains("book")) {
|
||||||
|
val ISBN = for { JString(isbn) <- json \ "ISBN" } yield isbn
|
||||||
|
if (ISBN.nonEmpty && containerTitles.nonEmpty) {
|
||||||
|
val source = s"${containerTitles.head} ISBN: ${ISBN.head}"
|
||||||
|
if (publication.getSource != null) {
|
||||||
|
val l: List[Field[String]] = publication.getSource.asScala.toList
|
||||||
|
val ll: List[Field[String]] = l ::: List(field(source,null))
|
||||||
|
publication.setSource(ll.asJava)
|
||||||
|
} else
|
||||||
|
publication.setSource(List(field(source,null)).asJava)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Mapping Journal
|
||||||
|
|
||||||
|
val issnInfos = for {
|
||||||
|
JArray(issn_types) <- json \ "issn-type"
|
||||||
|
JObject(issn_type) <- issn_types
|
||||||
|
JField("type", JString(tp)) <- issn_type
|
||||||
|
JField("value", JString(vl)) <- issn_type
|
||||||
|
} yield Tuple2(tp, vl)
|
||||||
|
|
||||||
|
val volume = (json \ "volume").extractOrElse[String](null)
|
||||||
|
if (containerTitles.nonEmpty) {
|
||||||
|
val journal = new Journal
|
||||||
|
journal.setName(containerTitles.head)
|
||||||
|
if (issnInfos.nonEmpty) {
|
||||||
|
|
||||||
|
issnInfos.foreach(tp => {
|
||||||
|
tp._1 match {
|
||||||
|
case "electronic" => journal.setIssnOnline(tp._2)
|
||||||
|
case "print" => journal.setIssnPrinted(tp._2)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
journal.setVol(volume)
|
||||||
|
val page = (json \ "page").extractOrElse[String](null)
|
||||||
|
if (page != null) {
|
||||||
|
val pp = page.split("-")
|
||||||
|
if (pp.nonEmpty)
|
||||||
|
journal.setSp(pp.head)
|
||||||
|
if (pp.size > 1)
|
||||||
|
journal.setEp(pp(1))
|
||||||
|
}
|
||||||
|
publication.setJournal(journal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractDate(dt: String, datePart: List[List[Int]]): String = {
|
||||||
|
if (StringUtils.isNotBlank(dt))
|
||||||
|
return GraphCleaningFunctions.cleanDate(dt)
|
||||||
|
if (datePart != null && datePart.size == 1) {
|
||||||
|
val res = datePart.head
|
||||||
|
if (res.size == 3) {
|
||||||
|
val dp = f"${res.head}-${res(1)}%02d-${res(2)}%02d"
|
||||||
|
if (dp.length == 10) {
|
||||||
|
return GraphCleaningFunctions.cleanDate(dp)
|
||||||
|
}
|
||||||
|
} else if (res.size == 2) {
|
||||||
|
val dp = f"${res.head}-${res(1)}%02d-01"
|
||||||
|
return GraphCleaningFunctions.cleanDate(dp)
|
||||||
|
} else if (res.size == 1) {
|
||||||
|
return GraphCleaningFunctions.cleanDate(s"${res.head}-01-01")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
null
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateDate(
|
||||||
|
dt: String,
|
||||||
|
datePart: List[List[Int]],
|
||||||
|
classId: String,
|
||||||
|
schemeId: String
|
||||||
|
): StructuredProperty = {
|
||||||
|
val dp = extractDate(dt, datePart)
|
||||||
|
if (StringUtils.isNotBlank(dp))
|
||||||
|
return structuredProperty(dp, qualifier(classId,classId, schemeId, schemeId),null)
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateItemFromType(objectType: String, objectSubType: String): Result = {
|
||||||
|
if (mappingCrossrefType.contains(objectType)) {
|
||||||
|
if (mappingCrossrefType(objectType).equalsIgnoreCase("publication"))
|
||||||
|
return new Publication()
|
||||||
|
if (mappingCrossrefType(objectType).equalsIgnoreCase("dataset"))
|
||||||
|
return new Dataset()
|
||||||
|
}
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package eu.dnetlib.dhp.collection.crossref
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.hadoop.io.{IntWritable, Text}
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
object CrossrefDataset {
|
||||||
|
|
||||||
|
val logger: Logger = LoggerFactory.getLogger(CrossrefDataset.getClass)
|
||||||
|
|
||||||
|
def to_item(input: String): CrossrefDT = {
|
||||||
|
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
val ts: Long = (json \ "indexed" \ "timestamp").extract[Long]
|
||||||
|
val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String])
|
||||||
|
CrossrefDT(doi, input, ts)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
|
||||||
|
val conf: SparkConf = new SparkConf()
|
||||||
|
val parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils.toString(
|
||||||
|
CrossrefDataset.getClass.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val spark: SparkSession =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(CrossrefDataset.getClass.getSimpleName)
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate()
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
|
val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable {
|
||||||
|
|
||||||
|
override def zero: CrossrefDT = null
|
||||||
|
|
||||||
|
override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = {
|
||||||
|
if (b == null)
|
||||||
|
return a
|
||||||
|
if (a == null)
|
||||||
|
return b
|
||||||
|
|
||||||
|
if (a.timestamp > b.timestamp) {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = {
|
||||||
|
if (b == null)
|
||||||
|
return a
|
||||||
|
if (a == null)
|
||||||
|
return b
|
||||||
|
|
||||||
|
if (a.timestamp > b.timestamp) {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
|
||||||
|
|
||||||
|
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
|
||||||
|
}
|
||||||
|
|
||||||
|
val workingPath: String = parser.get("workingPath")
|
||||||
|
|
||||||
|
val main_ds: Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT]
|
||||||
|
|
||||||
|
val update =
|
||||||
|
spark.createDataset(
|
||||||
|
spark.sparkContext
|
||||||
|
.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text])
|
||||||
|
.map(i => CrossrefImporter.decompressBlob(i._2.toString))
|
||||||
|
.map(i => to_item(i))
|
||||||
|
)
|
||||||
|
|
||||||
|
main_ds
|
||||||
|
.union(update)
|
||||||
|
.groupByKey(_.doi)
|
||||||
|
.agg(crossrefAggregator.toColumn)
|
||||||
|
.map(s => s._2)
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(s"$workingPath/crossref_ds_updated")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
package eu.dnetlib.dhp.collection.crossref
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.DoiCleaningRule
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.apache.spark.{SparkConf, SparkContext}
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
object GenerateCrossrefDataset {
|
||||||
|
|
||||||
|
val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass)
|
||||||
|
|
||||||
|
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
|
||||||
|
|
||||||
|
def crossrefElement(meta: String): CrossrefDT = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(meta)
|
||||||
|
val doi: String = DoiCleaningRule.normalizeDoi((json \ "DOI").extract[String])
|
||||||
|
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
|
||||||
|
CrossrefDT(doi, meta, timestamp)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val conf = new SparkConf
|
||||||
|
val parser = new ArgumentApplicationParser(
|
||||||
|
Source
|
||||||
|
.fromInputStream(
|
||||||
|
getClass.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.mkString
|
||||||
|
)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val master = parser.get("master")
|
||||||
|
val sourcePath = parser.get("sourcePath")
|
||||||
|
val targetPath = parser.get("targetPath")
|
||||||
|
|
||||||
|
val spark: SparkSession = SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(GenerateCrossrefDataset.getClass.getSimpleName)
|
||||||
|
.master(master)
|
||||||
|
.getOrCreate()
|
||||||
|
val sc: SparkContext = spark.sparkContext
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
|
val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
|
||||||
|
|
||||||
|
spark
|
||||||
|
.createDataset(tmp)
|
||||||
|
.map(entry => crossrefElement(entry))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue