[orcidenrichment] Fix lambda to avoid requiring serialization on enclosing class

This commit is contained in:
Giambattista Bloisi 2024-11-21 16:24:17 +01:00
parent 2639fb5da2
commit 12f781169d
1 changed files with 8 additions and 4 deletions

View File

@ -25,14 +25,16 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
log.info(s"targetPath is '$targetPath'") log.info(s"targetPath is '$targetPath'")
val workingDir = parser.get("workingDir") val workingDir = parser.get("workingDir")
log.info(s"targetPath is '$workingDir'") log.info(s"targetPath is '$workingDir'")
val classid = Option(parser.get("matchingSource")).map(_=>ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID) val classid =
Option(parser.get("matchingSource")).map(_ => ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID)
log.info(s"classid is '$classid'") log.info(s"classid is '$classid'")
val provenance = Option(parser.get("matchingSource")).map(_=>PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT") val provenance =
Option(parser.get("matchingSource")).map(_ => PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT")
log.info(s"targetPath is '$workingDir'") log.info(s"targetPath is '$workingDir'")
createTemporaryData(spark, graphPath, orcidPath, workingDir) createTemporaryData(spark, graphPath, orcidPath, workingDir)
analisys(workingDir,classid,provenance) analisys(workingDir, classid, provenance)
generateGraph(spark, graphPath, workingDir, targetPath) generateGraph(spark, graphPath, workingDir, targetPath)
} }
@ -75,13 +77,15 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
.filter(e => ModelSupport.isResult(e._1)) .filter(e => ModelSupport.isResult(e._1))
.foreach(e => { .foreach(e => {
val resultType = e._1.name() val resultType = e._1.name()
val c = classid
val p = provenance
spark.read spark.read
.parquet(s"$targetPath/${resultType}_unmatched") .parquet(s"$targetPath/${resultType}_unmatched")
.where("size(graph_authors) > 0") .where("size(graph_authors) > 0")
.as[MatchData](Encoders.bean(classOf[MatchData])) .as[MatchData](Encoders.bean(classOf[MatchData]))
.map(md => { .map(md => {
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, classid, provenance) ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, c, p)
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult])) })(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
.write .write
.option("compression", "gzip") .option("compression", "gzip")