forked from antonis.lempesis/dnet-hadoop
[CountryPropagation] fix NPE issue
This commit is contained in:
parent
eaf9385ae5
commit
c298c148cb
|
@ -102,21 +102,27 @@ public class SparkCountryPropagationJob {
|
||||||
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
||||||
return t -> {
|
return t -> {
|
||||||
Optional.ofNullable(t._2()).ifPresent(r -> {
|
Optional.ofNullable(t._2()).ifPresent(r -> {
|
||||||
|
if(Optional.ofNullable(t._1().getCountry()).isPresent())
|
||||||
t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet()));
|
t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet()));
|
||||||
|
else
|
||||||
|
t._1().setCountry(merge(null, t._2().getCountrySet()));
|
||||||
});
|
});
|
||||||
return t._1();
|
return t._1();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Country> merge(List<Country> c1, List<CountrySbs> c2) {
|
private static List<Country> merge(List<Country> c1, List<CountrySbs> c2) {
|
||||||
HashSet<String> countries = c1
|
HashSet<String> countries = new HashSet<>();
|
||||||
.stream()
|
if(Optional.ofNullable(c1).isPresent()){
|
||||||
.map(Qualifier::getClassid)
|
countries = c1.stream().map(Qualifier::getClassid)
|
||||||
.collect(Collectors.toCollection(HashSet::new));
|
.collect(Collectors.toCollection(HashSet::new));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
HashSet<String> finalCountries = countries;
|
||||||
return c2
|
return c2
|
||||||
.stream()
|
.stream()
|
||||||
.filter(c -> !countries.contains(c.getClassid()))
|
.filter(c -> !finalCountries.contains(c.getClassid()))
|
||||||
.map(c -> getCountry(c.getClassid(), c.getClassname()))
|
.map(c -> getCountry(c.getClassid(), c.getClassname()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue