diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 11f9828db4..f5672b7f46 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset = import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -15,6 +16,9 @@ import scala.collection.JavaConverters._ object SparkGenerateDoiBoost { + + + def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -33,6 +37,54 @@ object SparkGenerateDoiBoost { val hostedByMapPath = parser.get("hostedByMapPath") val workingDirPath = parser.get("workingPath") + val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { + override def zero: Publication = new Publication + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + + if (b == null) { + if (a != null && a._2 != null) { + a._2.setId(a._1) + return a._2 + } + } + else { + if (a != null && a._2 != null) { + b.mergeFrom(a._2) + b.setId(a._1) + val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) + b.setAuthor(authors) + return b + } + } + new Publication + } + + override def merge(b1: Publication, b2: Publication): Publication = { + if (b1 == null) { + if (b2 != null) + return b2 + } + else { + if (b2 != null ) { + b1.mergeFrom(b2) + val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId!= null && b2.getId.nonEmpty) + b1.setId(b2.getId) + return b1 + } + } + new Publication + } + + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication] + + override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication] + } + implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] @@ -77,6 +129,10 @@ object SparkGenerateDoiBoost { doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left") .map(DoiBoostMappingUtil.fixPublication) + .map(p => (p.getId, p)) + .groupByKey(_._1) + .agg(crossrefAggregator.toColumn) + .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered") val affiliationPath = parser.get("affiliationPath")