dnet-hadoop/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala

143 lines
4.9 KiB
Scala

package eu.dnetlib.dhp.oa.graph.hostedbymap
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn}
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
case class HostedByInfo(id: String, officialname: String, journal_id: String, provenance : String, id_type: String) {}
object Aggregators {
def getId(s1:String, s2:String) : String = {
if (s1.startsWith("10|")){
return s1}
s2
}
def getValue(s1:String, s2:String) : String = {
if(!s1.equals("")){
return s1
}
s2
}
def explodeHostedByItemType(df: Dataset[(String, HostedByItemType)]): Dataset[(String, HostedByItemType)] = {
val transformedData : Dataset[(String, HostedByItemType)] = df
.groupByKey(_._1)(Encoders.STRING)
.agg(Aggregators.hostedByAggregator)
.map{
case (id:String , res:(String, HostedByItemType)) => res
}(Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType]))
transformedData
}
val hostedByAggregator: TypedColumn[(String, HostedByItemType), (String, HostedByItemType)] = new Aggregator[(String, HostedByItemType), (String, HostedByItemType), (String, HostedByItemType)] {
override def zero: (String, HostedByItemType) = ("", HostedByItemType("","","","","",false))
override def reduce(b: (String, HostedByItemType), a:(String,HostedByItemType)): (String, HostedByItemType) = {
return merge(b, a)
}
override def merge(b1: (String, HostedByItemType), b2: (String, HostedByItemType)): (String, HostedByItemType) = {
if (b1 == null){
return b2
}
if(b2 == null){
return b1
}
if(b1._2.id.startsWith("10|")){
return (b1._1, HostedByItemType(b1._2.id, b1._2.officialname, b1._2.issn, b1._2.eissn, b1._2.lissn, b1._2.openAccess || b2._2.openAccess))
}
return (b2._1, HostedByItemType(b2._2.id, b2._2.officialname, b2._2.issn, b2._2.eissn, b2._2.lissn, b1._2.openAccess || b2._2.openAccess))
}
override def finish(reduction: (String,HostedByItemType)): (String, HostedByItemType) = reduction
override def bufferEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType])
override def outputEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType])
}.toColumn
def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{
override def zero: EntityInfo = EntityInfo.newInstance("","","")
override def reduce(b: EntityInfo, a:EntityInfo): EntityInfo = {
return merge(b, a)
}
override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = {
if (b1 == null){
return b2
}
if(b2 == null){
return b1
}
if(!b1.getHostedById.equals("")){
b1.setOpenAccess(b1.getOpenAccess || b2.getOpenAccess)
return b1
}
b2.setOpenAccess(b1.getOpenAccess || b2.getOpenAccess)
b2
}
override def finish(reduction: EntityInfo): EntityInfo = reduction
override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
}.toColumn
def resultToSingleId(df:Dataset[EntityInfo]): Dataset[EntityInfo] = {
val transformedData : Dataset[EntityInfo] = df
.groupByKey(_.getId)(Encoders.STRING)
.agg(Aggregators.resultToSingleIdAggregator)
.map{
case (id:String , res: EntityInfo) => res
}(Encoders.bean(classOf[EntityInfo]))
transformedData
}
def datasourceToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{
override def zero: EntityInfo = EntityInfo.newInstance("","","")
override def reduce(b: EntityInfo, a:EntityInfo): EntityInfo = {
return merge(b, a)
}
override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = {
if (b1 == null){
return b2
}
if(b2 == null){
return b1
}
if(!b1.getHostedById.equals("")){
return b1
}
b2
}
override def finish(reduction: EntityInfo): EntityInfo = reduction
override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
}.toColumn
def datasourceToSingleId(df:Dataset[EntityInfo]): Dataset[EntityInfo] = {
val transformedData : Dataset[EntityInfo] = df
.groupByKey(_.getHostedById)(Encoders.STRING)
.agg(Aggregators.datasourceToSingleIdAggregator)
.map{
case (id:String , res: EntityInfo) => res
}(Encoders.bean(classOf[EntityInfo]))
transformedData
}
}