diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index dfe48542c..2cd37d9ea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -125,16 +125,17 @@ public class SparkAtomicActionScoreJob implements Serializable { return ret; }, Encoders.bean(BipScore.class)) .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { + .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = new Result(); + ret.setDataInfo(getDataInfo()); BipScore first = it.next(); ret.setId(first.getId()); ret.setMeasures(getMeasure(first)); it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value))); - return (I) ret; - }, Encoders.bean(inputClazz)) + return ret; + }, Encoders.bean(Result.class)) .toJavaRDD() .map(p -> new AtomicAction(inputClazz, p)) .mapToPair(