added fix to avoid duplication of results

This commit is contained in:
Miriam Baglioni 2020-05-22 18:42:25 +02:00
parent 29066a6b46
commit 0d1ec1913f
1 changed files with 14 additions and 14 deletions

View File

@ -95,20 +95,20 @@ public class PrepareResultCommunitySet {
result_organizationset result_organizationset
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.toJavaRDD() .toJavaRDD()
.mapToPair(value -> new Tuple2<>(value.getResultId(), value)) .mapToPair(value -> new Tuple2<>(value.getResultId(), value))
.reduceByKey((a, b) -> { .reduceByKey((a, b) -> {
ArrayList<String> cl = a.getCommunityList(); ArrayList<String> cl = a.getCommunityList();
b.getCommunityList().stream().forEach(s -> { b.getCommunityList().stream().forEach(s -> {
if (!cl.contains(s)) { if (!cl.contains(s)) {
cl.add(s); cl.add(s);
} }
}); });
a.setCommunityList(cl); a.setCommunityList(cl);
return a; return a;
}) })
.map(value -> OBJECT_MAPPER.writeValueAsString(value._2())) .map(value -> OBJECT_MAPPER.writeValueAsString(value._2()))
.saveAsTextFile(outputPath, GzipCodec.class); .saveAsTextFile(outputPath, GzipCodec.class);
// .write() // .write()
// .mode(SaveMode.Overwrite) // .mode(SaveMode.Overwrite)
// .option("compression", "gzip") // .option("compression", "gzip")