diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index c011cbd20..fa3be973d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -164,12 +164,18 @@ object SparkProcessMAG { .write.mode(SaveMode.Overwrite) .save(s"$workingPath/mag_publication") + spark.read.load(s"$workingPath/mag_publication").as[Publication] + .filter(p => p.getId == null) + .groupByKey(p => p.getId) + .reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) + .map(_._2) + .write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") - val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication] - .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) - .map(_._2) - - spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") +// val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication] +// .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) +// .map(_._2) +// +// spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") } }