From a7bf314fd2a8c0fb1075eb2738023e7d802d0364 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 4 Aug 2021 10:13:30 +0200 Subject: [PATCH] Hosted By Map - added new aggregator to get just one result per datasource id --- .../oa/graph/hostedbymap/Aggregators.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala index af4ac4507..8198b197a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala @@ -103,4 +103,40 @@ object Aggregators { transformedData } + def datasourceToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{ + override def zero: EntityInfo = EntityInfo.newInstance("","","") + + override def reduce(b: EntityInfo, a:EntityInfo): EntityInfo = { + return merge(b, a) + } + override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = { + if (b1 == null){ + return b2 + } + if(b2 == null){ + return b1 + } + if(!b1.getHb_id.equals("")){ + return b1 + } + b2 + + } + override def finish(reduction: EntityInfo): EntityInfo = reduction + override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + + override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + }.toColumn + + + def datasourceToSingleId(df:Dataset[EntityInfo]): Dataset[EntityInfo] = { + val transformedData : Dataset[EntityInfo] = df + .groupByKey(_.getHb_id)(Encoders.STRING) + .agg(Aggregators.datasourceToSingleIdAggregator) + .map{ + case (id:String , res: EntityInfo) => res + }(Encoders.bean(classOf[EntityInfo])) + + transformedData + } }