diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index 64be5e79a5..af1d4ac370 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -15,6 +15,12 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + def fixORCIDItem(item :ORCIDItem):ORCIDItem = { + item.authors = item.authors.groupBy(_.oid).map(_._2.head) + item + } + + def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = { import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] @@ -37,7 +43,8 @@ object SparkConvertORCIDToOAF { val author = i._2 (doi, author) }).groupBy(col("_1").alias("doi")) - .agg(collect_list(col("_2")).alias("authors")) + .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] + .map(s => fixORCIDItem(s)) .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]