diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 79b9e8183a..bc26e44d6b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -60,7 +60,7 @@ object SparkGenerateDoiBoost { val openaireOrganizationPath = parser.get("openaireOrganizationPath") val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { - override def zero: Publication = new Publication + override def zero: Publication = null override def reduce(b: Publication, a: (String, Publication)): Publication = { @@ -177,12 +177,43 @@ object SparkGenerateDoiBoost { .map(DoiBoostMappingUtil.fixPublication) .map(p => (p.getId, p)) .groupByKey(_._1) - .agg(crossrefAggregator.toColumn) - .map(p => p._2) + .reduceGroups((left, right) => + { + //Check left is not null + if (left != null && left._1 != null) + { + //If right is null then return left + if (right == null || right._2 == null) + left + else { + // Here Left and Right are not null + // So we have to merge + val b1 = left._2 + val b2 = right._2 + b1.mergeFrom(b2) + b1.mergeOAFDataInfo(b2) + val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId != null && b2.getId.nonEmpty) + b1.setId(b2.getId) + //Return publication Merged + (b1.getId, b1) + } + } + else { + // Left is Null so we return right + right + } + } + + ) + .filter(s => s!= null && s._2!=null) + .map(s => s._2) .write .mode(SaveMode.Overwrite) .save(s"$workingDirPath/doiBoostPublicationFiltered") + val affiliationPath = parser.get("affiliationPath") val paperAffiliationPath = parser.get("paperAffiliationPath")