diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 1628fca39..ddf1b3049 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.common.ModelSupport; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.io.Text; @@ -115,7 +116,7 @@ public class SparkAtomicActionScoreJob implements Serializable { "where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'") .as(Encoders.bean(PreparedResult.class)); - Dataset tmp = bipScores + bipScores .joinWith( preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")), "inner") @@ -123,9 +124,7 @@ public class SparkAtomicActionScoreJob implements Serializable { BipScore ret = value._1(); ret.setId(value._2().getId()); return ret; - }, Encoders.bean(BipScore.class)); - - tmp + }, Encoders.bean(BipScore.class)) .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = inputClazz.newInstance(); @@ -143,18 +142,9 @@ public class SparkAtomicActionScoreJob implements Serializable { aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - ; + } - public static Dataset getBipScoreDataset(Dataset bipdeserialized) { - return bipdeserialized - .flatMap((FlatMapFunction) bip -> bip.keySet().stream().map(key -> { - BipScore bs = new BipScore(); - bs.setId(key); - bs.setScoreList(bip.get(key)); - return bs; - }).collect(Collectors.toList()).iterator(), Encoders.bean(BipScore.class)); - } private static List getMeasure(BipScore value) { return value