diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 9323c994cf..2cbd530979 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -59,52 +59,6 @@ object SparkGenerateDoiBoost { val workingDirPath = parser.get("workingPath") val openaireOrganizationPath = parser.get("openaireOrganizationPath") - val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { - override def zero: Publication = new Publication - - override def reduce(b: Publication, a: (String, Publication)): Publication = { - - if (b == null) { - if (a != null && a._2 != null) { - a._2.setId(a._1) - return a._2 - } - } else { - if (a != null && a._2 != null) { - b.mergeFrom(a._2) - b.setId(a._1) - val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) - b.setAuthor(authors) - return b - } - } - new Publication - } - - override def merge(b1: Publication, b2: Publication): Publication = { - if (b1 == null) { - if (b2 != null) - return b2 - } else { - if (b2 != null) { - b1.mergeFrom(b2) - val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) - b1.setAuthor(authors) - if (b2.getId != null && b2.getId.nonEmpty) - b1.setId(b2.getId) - return b1 - } - } - new Publication - } - - override def finish(reduction: Publication): Publication = reduction - - override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication] - - override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication] - } - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] @@ -175,8 +129,33 @@ object SparkGenerateDoiBoost { .map(DoiBoostMappingUtil.fixPublication) .map(p => (p.getId, p)) .groupByKey(_._1) - .agg(crossrefAggregator.toColumn) - .map(p => p._2) + .reduceGroups((left, right) => { + //Check left is not null + if (left != null && left._1 != null) { + //If right is null then return left + if (right == null || right._2 == null) + left + else { + // Here Left and Right are not null + // So we have to merge + val b1 = left._2 + val b2 = right._2 + b1.mergeFrom(b2) + b1.mergeOAFDataInfo(b2) + val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId != null && b2.getId.nonEmpty) + b1.setId(b2.getId) + //Return publication Merged + (b1.getId, b1) + } + } else { + // Left is Null so we return right + right + } + }) + .filter(s => s != null && s._2 != null) + .map(s => s._2._2) .write .mode(SaveMode.Overwrite) .save(s"$workingDirPath/doiBoostPublicationFiltered")