forked from D-Net/dnet-hadoop
Add into ORCID workflow a method that extracts orcid directly to the dump generated by Enrico
This commit is contained in:
parent
1542196a33
commit
479abd10cb
|
@ -4,16 +4,23 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
|||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||
import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI}
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo}
|
||||
import org.apache.commons.lang.StringUtils
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.json4s
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST._
|
||||
import org.json4s.jackson.JsonMethods._
|
||||
|
||||
|
||||
case class ORCIDItem(oid:String,name:String,surname:String,creditName:String,errorCode:String){}
|
||||
case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){}
|
||||
case class OrcidAuthor(oid:String, name:Option[String], surname:Option[String], creditName:Option[String], otherNames:Option[List[String]], errorCode:Option[String]){}
|
||||
case class OrcidWork(oid:String, doi:String)
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -46,8 +53,52 @@ object ORCIDToOAF {
|
|||
}
|
||||
|
||||
|
||||
def convertTOOAF(input:OrcidDOI) :Publication = {
|
||||
val doi = input.getDoi
|
||||
def strValid(s:Option[String]) : Boolean = {
|
||||
s.isDefined && s.get.nonEmpty
|
||||
}
|
||||
|
||||
def authorValid(author:OrcidAuthor): Boolean ={
|
||||
if (strValid(author.name) && strValid(author.surname)) {
|
||||
return true
|
||||
}
|
||||
if (strValid(author.surname)) {
|
||||
return true
|
||||
}
|
||||
if (strValid(author.creditName)) {
|
||||
return true
|
||||
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
|
||||
def extractDOIWorks(input:String): List[OrcidWork] = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: json4s.JValue = parse(input)
|
||||
|
||||
val oid = (json \ "workDetail" \"oid").extract[String]
|
||||
val doi:List[(String, String)] = for {
|
||||
JObject(extIds) <- json \ "workDetail" \"extIds"
|
||||
JField("type", JString(typeValue)) <- extIds
|
||||
JField("value", JString(value)) <- extIds
|
||||
if "doi".equalsIgnoreCase(typeValue)
|
||||
} yield (typeValue, value)
|
||||
if (doi.nonEmpty) {
|
||||
return doi.map(l =>OrcidWork(oid, l._2))
|
||||
}
|
||||
List()
|
||||
}
|
||||
|
||||
def convertORCIDAuthor(input:String): OrcidAuthor = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: json4s.JValue = parse(input)
|
||||
|
||||
(json \"authorData" ).extractOrElse[OrcidAuthor](null)
|
||||
}
|
||||
|
||||
|
||||
def convertTOOAF(input:ORCIDItem) :Publication = {
|
||||
val doi = input.doi
|
||||
val pub:Publication = new Publication
|
||||
pub.setPid(List(createSP(doi.toLowerCase, "doi", ModelConstants.DNET_PID_TYPES)).asJava)
|
||||
pub.setDataInfo(generateDataInfo())
|
||||
|
@ -58,8 +109,8 @@ object ORCIDToOAF {
|
|||
|
||||
try{
|
||||
|
||||
val l:List[Author]= input.getAuthors.asScala.map(a=> {
|
||||
generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid)
|
||||
val l:List[Author]= input.authors.map(a=> {
|
||||
generateAuthor(a)
|
||||
})(collection.breakOut)
|
||||
|
||||
pub.setAuthor(l.asJava)
|
||||
|
@ -80,16 +131,20 @@ object ORCIDToOAF {
|
|||
di
|
||||
}
|
||||
|
||||
def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = {
|
||||
def generateAuthor(o : OrcidAuthor): Author = {
|
||||
val a = new Author
|
||||
a.setName(given)
|
||||
a.setSurname(family)
|
||||
if (fullName!= null && fullName.nonEmpty)
|
||||
a.setFullname(fullName)
|
||||
else
|
||||
a.setFullname(s"$given $family")
|
||||
if (StringUtils.isNotBlank(orcid))
|
||||
a.setPid(List(createSP(orcid, ModelConstants.ORCID, ModelConstants.DNET_PID_TYPES, generateOricPIDDatainfo())).asJava)
|
||||
if (strValid(o.name)) {
|
||||
a.setName(o.name.get.capitalize)
|
||||
}
|
||||
if (strValid(o.surname)) {
|
||||
a.setSurname(o.surname.get.capitalize)
|
||||
}
|
||||
if(strValid(o.name) && strValid(o.surname))
|
||||
a.setFullname(s"${o.name.get.capitalize} ${o.surname.get.capitalize}")
|
||||
else if (strValid(o.creditName))
|
||||
a.setFullname(o.creditName.get)
|
||||
if (StringUtils.isNotBlank(o.oid))
|
||||
a.setPid(List(createSP(o.oid, ModelConstants.ORCID, ModelConstants.DNET_PID_TYPES, generateOricPIDDatainfo())).asJava)
|
||||
|
||||
a
|
||||
}
|
||||
|
|
|
@ -5,68 +5,48 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|||
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.expressions.Aggregator
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object SparkConvertORCIDToOAF {
|
||||
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
|
||||
|
||||
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
|
||||
|
||||
override def zero: Publication = new Publication()
|
||||
|
||||
override def reduce(b: Publication, a: (String, Publication)): Publication = {
|
||||
b.mergeFrom(a._2)
|
||||
b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
|
||||
if (b.getId == null)
|
||||
b.setId(a._2.getId)
|
||||
b
|
||||
}
|
||||
|
||||
|
||||
override def merge(wx: Publication, wy: Publication): Publication = {
|
||||
wx.mergeFrom(wy)
|
||||
wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
|
||||
if(wx.getId == null && wy.getId.nonEmpty)
|
||||
wx.setId(wy.getId)
|
||||
wx
|
||||
}
|
||||
override def finish(reduction: Publication): Publication = reduction
|
||||
|
||||
override def bufferEncoder: Encoder[Publication] =
|
||||
Encoders.kryo(classOf[Publication])
|
||||
|
||||
override def outputEncoder: Encoder[Publication] =
|
||||
Encoders.kryo(classOf[Publication])
|
||||
}
|
||||
|
||||
def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
|
||||
def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = {
|
||||
import spark.implicits._
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
|
||||
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
|
||||
|
||||
val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI])))
|
||||
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
|
||||
|
||||
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
|
||||
|
||||
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
|
||||
|
||||
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
|
||||
|
||||
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
|
||||
|
||||
works.joinWith(authors, authors("oid").equalTo(works("oid")))
|
||||
.map(i =>{
|
||||
val doi = i._1.doi
|
||||
val author = i._2
|
||||
(doi, author)
|
||||
}).groupBy(col("_1").alias("doi"))
|
||||
.agg(collect_list(col("_2")).alias("authors"))
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
|
||||
|
||||
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
|
||||
|
||||
logger.info("Converting ORCID to OAF")
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null)
|
||||
.map(d => (d.getId, d))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(getPublicationAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
||||
parser.parseArgument(args)
|
||||
|
@ -78,10 +58,10 @@ object SparkConvertORCIDToOAF {
|
|||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
val workingPath = parser.get("workingPath")
|
||||
val targetPath = parser.get("targetPath")
|
||||
run(spark, sourcePath, targetPath)
|
||||
run(spark, sourcePath, workingPath, targetPath)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
[
|
||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
|
||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true},
|
||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": true},
|
||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
|
||||
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||
|
||||
|
|
|
@ -74,6 +74,11 @@
|
|||
<!-- ORCID Parameters -->
|
||||
<property>
|
||||
<name>inputPathOrcid</name>
|
||||
<description>the ORCID input path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>workingPathOrcid</name>
|
||||
<description>the ORCID working path</description>
|
||||
</property>
|
||||
|
||||
|
@ -295,6 +300,7 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
|
||||
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
|
|
|
@ -2,12 +2,13 @@ package eu.dnetlib.doiboost.orcid
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||
import eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF.getClass
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.io.TempDir
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.nio.file.Path
|
||||
import scala.io.Source
|
||||
|
||||
class MappingORCIDToOAFTest {
|
||||
|
@ -24,27 +25,37 @@ class MappingORCIDToOAFTest {
|
|||
})
|
||||
}
|
||||
|
||||
// @Test
|
||||
// def testOAFConvert():Unit ={
|
||||
//
|
||||
// val spark: SparkSession =
|
||||
// SparkSession
|
||||
// .builder()
|
||||
// .appName(getClass.getSimpleName)
|
||||
// .master("local[*]").getOrCreate()
|
||||
//
|
||||
//
|
||||
// SparkConvertORCIDToOAF.run( spark,"/Users/sandro/Downloads/orcid", "/Users/sandro/Downloads/orcid_oaf")
|
||||
// implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
//
|
||||
// val df = spark.read.load("/Users/sandro/Downloads/orcid_oaf").as[Publication]
|
||||
// println(df.first.getId)
|
||||
// println(mapper.writeValueAsString(df.first()))
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
// }
|
||||
@Test
|
||||
def testOAFConvert(@TempDir testDir: Path):Unit ={
|
||||
val sourcePath:String = getClass.getResource("/eu/dnetlib/doiboost/orcid/datasets").getPath
|
||||
val targetPath: String =s"${testDir.toString}/output/orcidPublication"
|
||||
val workingPath =s"${testDir.toString}/wp/"
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.appName(getClass.getSimpleName)
|
||||
.master("local[*]").getOrCreate()
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
import spark.implicits._
|
||||
|
||||
SparkConvertORCIDToOAF.run( spark,sourcePath, workingPath, targetPath)
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
|
||||
|
||||
|
||||
val oA = spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem].count()
|
||||
|
||||
|
||||
|
||||
val p: Dataset[Publication] = spark.read.load(targetPath).as[Publication]
|
||||
|
||||
assertTrue(oA == p.count())
|
||||
println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first()))
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue