From 34229970e6df31c3d5520e6151b53e31943ad35c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 8 Jan 2021 16:29:17 +0100 Subject: [PATCH] [BIP! Scores integration] Create updates as Result rather than subclasses; Result considers also metrics in the mergeFrom operation --- .../actionmanager/bipfinder/SparkAtomicActionScoreJob.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index dfe48542c..2cd37d9ea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -125,16 +125,17 @@ public class SparkAtomicActionScoreJob implements Serializable { return ret; }, Encoders.bean(BipScore.class)) .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { + .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = new Result(); + ret.setDataInfo(getDataInfo()); BipScore first = it.next(); ret.setId(first.getId()); ret.setMeasures(getMeasure(first)); it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value))); - return (I) ret; - }, Encoders.bean(inputClazz)) + return ret; + }, Encoders.bean(Result.class)) .toJavaRDD() .map(p -> new AtomicAction(inputClazz, p)) .mapToPair(