From 72df8f92320c7d0ddcecece72a8e3502f5b03022 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 2 Aug 2021 19:34:44 +0200 Subject: [PATCH] Hosted By Map - removed the aggregator for the datasource (it is no more needed) and added a new aggregator for the results. Changed also the hostedBYMap aggregator --- .../oa/graph/hostedbymap/Aggregators.scala | 36 +------------------ 1 file changed, 1 insertion(+), 35 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala index 8077efe30..af4ac4507 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala @@ -62,42 +62,7 @@ object Aggregators { override def outputEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType]) }.toColumn - def hostedByToSingleDSId(df: Dataset[ HostedByItemType]): Dataset[ HostedByItemType] = { - val transformedData : Dataset[HostedByItemType] = df - .groupByKey(_.id)(Encoders.STRING) - .agg(Aggregators.hostedByToDSAggregator) - .map{ - case (id:String , res: HostedByItemType) => res - }(Encoders.product[HostedByItemType]) - transformedData - } - - def hostedByToDSAggregator: TypedColumn[HostedByItemType, HostedByItemType] = new Aggregator[HostedByItemType, HostedByItemType, HostedByItemType] { - override def zero: HostedByItemType = HostedByItemType("","","","","",false) - - override def reduce(b: HostedByItemType, a:HostedByItemType): HostedByItemType = { - return merge(b, a) - } - override def merge(b1: HostedByItemType, b2: HostedByItemType): HostedByItemType = { - if (b1 == null){ - return b2 - } - if(b2 == null){ - return b1 - } - if(!b1.id.equals("")){ - return HostedByItemType(b1.id, b1.officialname, b1.issn, b1.eissn, b1.lissn, b1.openAccess || b2.openAccess) - - } - return HostedByItemType(b2.id, b2.officialname, b2.issn, b2.eissn, b2.lissn, b1.openAccess || b2.openAccess) - - } - override def finish(reduction: HostedByItemType): HostedByItemType = reduction - override def bufferEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] - - override def outputEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] - }.toColumn def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{ @@ -115,6 +80,7 @@ object Aggregators { } if(!b1.getHb_id.equals("")){ b1.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) + return b1 } b2.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) b2