hostedbymap #136
|
@ -103,4 +103,40 @@ object Aggregators {
|
|||
transformedData
|
||||
}
|
||||
|
||||
def datasourceToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{
|
||||
override def zero: EntityInfo = EntityInfo.newInstance("","","")
|
||||
|
||||
override def reduce(b: EntityInfo, a:EntityInfo): EntityInfo = {
|
||||
return merge(b, a)
|
||||
}
|
||||
override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = {
|
||||
if (b1 == null){
|
||||
return b2
|
||||
}
|
||||
if(b2 == null){
|
||||
return b1
|
||||
}
|
||||
if(!b1.getHb_id.equals("")){
|
||||
return b1
|
||||
}
|
||||
b2
|
||||
|
||||
}
|
||||
override def finish(reduction: EntityInfo): EntityInfo = reduction
|
||||
override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
||||
|
||||
override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
||||
}.toColumn
|
||||
|
||||
|
||||
def datasourceToSingleId(df:Dataset[EntityInfo]): Dataset[EntityInfo] = {
|
||||
val transformedData : Dataset[EntityInfo] = df
|
||||
.groupByKey(_.getHb_id)(Encoders.STRING)
|
||||
.agg(Aggregators.datasourceToSingleIdAggregator)
|
||||
.map{
|
||||
case (id:String , res: EntityInfo) => res
|
||||
}(Encoders.bean(classOf[EntityInfo]))
|
||||
|
||||
transformedData
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue