1
0
Fork 0

Hosted By Map - removed the aggregator for the datasource (it is no more needed) and added a new aggregator for the results. Changed also the hostedBYMap aggregator

This commit is contained in:
Miriam Baglioni 2021-08-02 19:34:44 +02:00
parent ff1ce75e33
commit 72df8f9232
1 changed files with 1 additions and 35 deletions

View File

@ -62,42 +62,7 @@ object Aggregators {
override def outputEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType]) override def outputEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType])
}.toColumn }.toColumn
def hostedByToSingleDSId(df: Dataset[ HostedByItemType]): Dataset[ HostedByItemType] = {
val transformedData : Dataset[HostedByItemType] = df
.groupByKey(_.id)(Encoders.STRING)
.agg(Aggregators.hostedByToDSAggregator)
.map{
case (id:String , res: HostedByItemType) => res
}(Encoders.product[HostedByItemType])
transformedData
}
def hostedByToDSAggregator: TypedColumn[HostedByItemType, HostedByItemType] = new Aggregator[HostedByItemType, HostedByItemType, HostedByItemType] {
override def zero: HostedByItemType = HostedByItemType("","","","","",false)
override def reduce(b: HostedByItemType, a:HostedByItemType): HostedByItemType = {
return merge(b, a)
}
override def merge(b1: HostedByItemType, b2: HostedByItemType): HostedByItemType = {
if (b1 == null){
return b2
}
if(b2 == null){
return b1
}
if(!b1.id.equals("")){
return HostedByItemType(b1.id, b1.officialname, b1.issn, b1.eissn, b1.lissn, b1.openAccess || b2.openAccess)
}
return HostedByItemType(b2.id, b2.officialname, b2.issn, b2.eissn, b2.lissn, b1.openAccess || b2.openAccess)
}
override def finish(reduction: HostedByItemType): HostedByItemType = reduction
override def bufferEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType]
override def outputEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType]
}.toColumn
def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{ def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{
@ -115,6 +80,7 @@ object Aggregators {
} }
if(!b1.getHb_id.equals("")){ if(!b1.getHb_id.equals("")){
b1.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) b1.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess)
return b1
} }
b2.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) b2.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess)
b2 b2