diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala index 285f28b90..717409ba6 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala @@ -7,6 +7,9 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.slf4j.{Logger, LoggerFactory} import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE +import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils +import org.apache.spark.sql.expressions.Aggregator import scala.collection.JavaConverters._ @@ -39,7 +42,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str } private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = { - + implicit val oafEntityEncoder: Encoder[Result] = Encoders.kryo[Result] ModelSupport.entityTypes.asScala .filter(e => ModelSupport.isResult(e._1)) .foreach(e => { @@ -62,7 +65,13 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str when(size(col("enriched_author")).gt(0), col("enriched_author")) .otherwise(col("author")) ) - .drop("enriched_author") + .drop("enriched_author").as[Result] + .groupByKey(r => r.getId)(Encoders[String]) + .mapGroups((key: String, group: Iterator[Result]) => { + var r = group.next() + group.foreach(r1 => r = MergeUtils.mergeResult(r,r1)) + r + }).as[Result] .write .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -72,6 +81,9 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str } + + + def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit private def analisys(targetPath: String, classid: String, provenance: String): Unit = {