forked from D-Net/dnet-hadoop
[CountryPropagation] refactoring
This commit is contained in:
parent
c298c148cb
commit
5e0b8f9b5f
|
@ -102,7 +102,7 @@ public class SparkCountryPropagationJob {
|
|||
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
||||
return t -> {
|
||||
Optional.ofNullable(t._2()).ifPresent(r -> {
|
||||
if(Optional.ofNullable(t._1().getCountry()).isPresent())
|
||||
if (Optional.ofNullable(t._1().getCountry()).isPresent())
|
||||
t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet()));
|
||||
else
|
||||
t._1().setCountry(merge(null, t._2().getCountrySet()));
|
||||
|
@ -113,12 +113,13 @@ public class SparkCountryPropagationJob {
|
|||
|
||||
private static List<Country> merge(List<Country> c1, List<CountrySbs> c2) {
|
||||
HashSet<String> countries = new HashSet<>();
|
||||
if(Optional.ofNullable(c1).isPresent()){
|
||||
countries = c1.stream().map(Qualifier::getClassid)
|
||||
if (Optional.ofNullable(c1).isPresent()) {
|
||||
countries = c1
|
||||
.stream()
|
||||
.map(Qualifier::getClassid)
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
|
||||
HashSet<String> finalCountries = countries;
|
||||
return c2
|
||||
.stream()
|
||||
|
|
Loading…
Reference in New Issue