[orcidPropagatio] -

This commit is contained in:
Miriam Baglioni 2024-12-19 15:14:09 +01:00
parent ec4a90f669
commit 3021dfda77
1 changed files with 14 additions and 2 deletions

View File

@ -7,6 +7,9 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
import eu.dnetlib.dhp.schema.oaf.Result
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -39,7 +42,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
} }
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = { private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
implicit val oafEntityEncoder: Encoder[Result] = Encoders.kryo[Result]
ModelSupport.entityTypes.asScala ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1)) .filter(e => ModelSupport.isResult(e._1))
.foreach(e => { .foreach(e => {
@ -62,7 +65,13 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
when(size(col("enriched_author")).gt(0), col("enriched_author")) when(size(col("enriched_author")).gt(0), col("enriched_author"))
.otherwise(col("author")) .otherwise(col("author"))
) )
.drop("enriched_author") .drop("enriched_author").as[Result]
.groupByKey(r => r.getId)(Encoders[String])
.mapGroups((key: String, group: Iterator[Result]) => {
var r = group.next()
group.foreach(r1 => r = MergeUtils.mergeResult(r,r1))
r
}).as[Result]
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -72,6 +81,9 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
} }
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
private def analisys(targetPath: String, classid: String, provenance: String): Unit = { private def analisys(targetPath: String, classid: String, provenance: String): Unit = {