dnet-hadoop/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostebymap/Aggregators.scala

55 lines
1.9 KiB
Scala

package eu.dnetlib.dhp.oa.graph.hostebymap
import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
case class HostedByInfo(id: String, officialname: String, journal_id: String, provenance : String, id_type: String) {}
object Aggregators {
def getId(s1:String, s2:String) : String = {
if (!s1.equals("")){
return s1}
s2
}
def createHostedByItemTypes(df: Dataset[HostedByItemType]): Dataset[HostedByItemType] = {
val transformedData : Dataset[HostedByItemType] = df
.groupByKey(_.id)(Encoders.STRING)
.agg(Aggregators.hostedByAggregator)
.map{
case (id:String , res:HostedByItemType) => res
}(Encoders.product[HostedByItemType])
transformedData
}
val hostedByAggregator: TypedColumn[HostedByItemType, HostedByItemType] = new Aggregator[HostedByItemType, HostedByItemType, HostedByItemType] {
override def zero: HostedByItemType = HostedByItemType("","","","","",false)
override def reduce(b: HostedByItemType, a:HostedByItemType): HostedByItemType = {
return merge(b, a)
}
override def merge(b1: HostedByItemType, b2: HostedByItemType): HostedByItemType = {
if (b1 == null){
return b2
}
if(b2 == null){
return b1
}
HostedByItemType(getId(b1.id, b2.id), getId(b1.officialname, b2.officialname), getId(b1.issn, b2.issn), getId(b1.eissn, b2.eissn), getId(b1.lissn, b2.lissn), b1.openAccess || b2.openAccess)
}
override def finish(reduction: HostedByItemType): HostedByItemType = reduction
override def bufferEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType]
override def outputEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType]
}.toColumn
}