Draft SparkPropagateOrcidAuthors

This commit is contained in:
Giambattista Bloisi 2024-10-30 15:23:12 +01:00
parent d67f125614
commit aeaedeed01
1 changed files with 63 additions and 0 deletions

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.enrich.orcid
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.utils.OrcidAuthor
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
class SparkPropagateOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
extends SparkEnrichGraphWithOrcidAuthors(propertyPath, args, log: Logger) {
override def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = {
val relEnc = Encoders.bean(classOf[Relation])
ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1))
.foreach(e => {
val resultType = e._1.name()
val enc = Encoders.bean(e._2)
val orcidDnet = spark.read
.load("$graphPath/$resultType")
.as[Result]
.map(
result =>
(
result.getId,
result.getAuthor.asScala.map(a => OrcidAuthor("extract ORCID", a.getSurname, a.getName, a.getFullname, null))
)
)
.where("size(_2) > 0")
.selectExpr("_1 as id", "_2 as orcid_authors")
val result =
spark.read.schema(enc.schema).json(s"$graphPath/$resultType").selectExpr("id", "author as graph_authors")
val supplements = spark.read.schema(relEnc.schema).json(s"$graphPath/relation").where("relclass IN('isSupplementedBy', 'isSupplementOf')").selectExpr("source as id", "target")
result
.join(supplements, Seq("id"))
.join(orcidDnet, orcidDnet("id") === col("target"))
.drop("target")
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(s"$targetPath/${resultType}_unmatched")
})
}
}
object SparkPropagateOrcidAuthors {
val log: Logger = LoggerFactory.getLogger(SparkPropagateOrcidAuthors.getClass)
def main(args: Array[String]): Unit = {
new SparkPropagateOrcidAuthors("/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json", args, log)
.initialize()
.run()
}
}