From aeaedeed0174710a873a0b892834bee477823210 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 30 Oct 2024 15:23:12 +0100 Subject: [PATCH] Draft SparkPropagateOrcidAuthors --- .../orcid/SparkPropagateOrcidAuthors.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkPropagateOrcidAuthors.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkPropagateOrcidAuthors.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkPropagateOrcidAuthors.scala new file mode 100644 index 000000000..2511830a2 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkPropagateOrcidAuthors.scala @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.enrich.orcid + +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Relation, Result} +import eu.dnetlib.dhp.utils.OrcidAuthor +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +class SparkPropagateOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) + extends SparkEnrichGraphWithOrcidAuthors(propertyPath, args, log: Logger) { + + override def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = { + val relEnc = Encoders.bean(classOf[Relation]) + + ModelSupport.entityTypes.asScala + .filter(e => ModelSupport.isResult(e._1)) + .foreach(e => { + val resultType = e._1.name() + val enc = Encoders.bean(e._2) + + val orcidDnet = spark.read + .load("$graphPath/$resultType") + .as[Result] + .map( + result => + ( + result.getId, + result.getAuthor.asScala.map(a => OrcidAuthor("extract ORCID", a.getSurname, a.getName, a.getFullname, null)) + ) + ) + .where("size(_2) > 0") + .selectExpr("_1 as id", "_2 as orcid_authors") + + val result = + spark.read.schema(enc.schema).json(s"$graphPath/$resultType").selectExpr("id", "author as graph_authors") + + val supplements = spark.read.schema(relEnc.schema).json(s"$graphPath/relation").where("relclass IN('isSupplementedBy', 'isSupplementOf')").selectExpr("source as id", "target") + + result + .join(supplements, Seq("id")) + .join(orcidDnet, orcidDnet("id") === col("target")) + .drop("target") + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(s"$targetPath/${resultType}_unmatched") + }) + } +} + +object SparkPropagateOrcidAuthors { + + val log: Logger = LoggerFactory.getLogger(SparkPropagateOrcidAuthors.getClass) + + def main(args: Array[String]): Unit = { + new SparkPropagateOrcidAuthors("/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json", args, log) + .initialize() + .run() + } +}