master #59

Closed
claudio.atzori wants to merge 3221 commits from master into stable_ids
1 changed files with 4 additions and 14 deletions
Showing only changes of commit 17680296b9 - Show all commits

View File

@ -8,6 +8,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.io.Text;
@ -115,7 +116,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
"where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'")
.as(Encoders.bean(PreparedResult.class));
Dataset<BipScore> tmp = bipScores
bipScores
.joinWith(
preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")),
"inner")
@ -123,9 +124,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
BipScore ret = value._1();
ret.setId(value._2().getId());
return ret;
}, Encoders.bean(BipScore.class));
tmp
}, Encoders.bean(BipScore.class))
.groupByKey((MapFunction<BipScore, String>) value -> value.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, BipScore, I>) (k, it) -> {
Result ret = inputClazz.newInstance();
@ -143,18 +142,9 @@ public class SparkAtomicActionScoreJob implements Serializable {
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
;
}
public static Dataset<BipScore> getBipScoreDataset(Dataset<BipDeserialize> bipdeserialized) {
return bipdeserialized
.flatMap((FlatMapFunction<BipDeserialize, BipScore>) bip -> bip.keySet().stream().map(key -> {
BipScore bs = new BipScore();
bs.setId(key);
bs.setScoreList(bip.get(key));
return bs;
}).collect(Collectors.toList()).iterator(), Encoders.bean(BipScore.class));
}
private static List<Measure> getMeasure(BipScore value) {
return value