applied unique function on the final dataset

pull/106/head
Sandro La Bruzzo 3 years ago
parent 67085da305
commit a16e5299f9

@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset =
import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
@ -15,6 +16,9 @@ import scala.collection.JavaConverters._
object SparkGenerateDoiBoost {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
@ -33,6 +37,54 @@ object SparkGenerateDoiBoost {
val hostedByMapPath = parser.get("hostedByMapPath")
val workingDirPath = parser.get("workingPath")
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
override def zero: Publication = new Publication
override def reduce(b: Publication, a: (String, Publication)): Publication = {
if (b == null) {
if (a != null && a._2 != null) {
a._2.setId(a._1)
return a._2
}
}
else {
if (a != null && a._2 != null) {
b.mergeFrom(a._2)
b.setId(a._1)
val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
b.setAuthor(authors)
return b
}
}
new Publication
}
override def merge(b1: Publication, b2: Publication): Publication = {
if (b1 == null) {
if (b2 != null)
return b2
}
else {
if (b2 != null ) {
b1.mergeFrom(b2)
val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
if (b2.getId!= null && b2.getId.nonEmpty)
b1.setId(b2.getId)
return b1
}
}
new Publication
}
override def finish(reduction: Publication): Publication = reduction
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
}
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
@ -77,6 +129,10 @@ object SparkGenerateDoiBoost {
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
.map(DoiBoostMappingUtil.fixPublication)
.map(p => (p.getId, p))
.groupByKey(_._1)
.agg(crossrefAggregator.toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
val affiliationPath = parser.get("affiliationPath")

Loading…
Cancel
Save