diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 56aa953b4..25cd82248 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -102,21 +102,27 @@ public class SparkCountryPropagationJob { private static MapFunction, R> getCountryMergeFn() { return t -> { Optional.ofNullable(t._2()).ifPresent(r -> { - t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet())); + if(Optional.ofNullable(t._1().getCountry()).isPresent()) + t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet())); + else + t._1().setCountry(merge(null, t._2().getCountrySet())); }); return t._1(); }; } private static List merge(List c1, List c2) { - HashSet countries = c1 - .stream() - .map(Qualifier::getClassid) - .collect(Collectors.toCollection(HashSet::new)); + HashSet countries = new HashSet<>(); + if(Optional.ofNullable(c1).isPresent()){ + countries = c1.stream().map(Qualifier::getClassid) + .collect(Collectors.toCollection(HashSet::new)); + } + + HashSet finalCountries = countries; return c2 .stream() - .filter(c -> !countries.contains(c.getClassid())) + .filter(c -> !finalCountries.contains(c.getClassid())) .map(c -> getCountry(c.getClassid(), c.getClassname())) .collect(Collectors.toList()); }