diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index 7cc26745b..531b415ed 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -151,11 +151,32 @@ public class CleanCfHbSparkJob { private static FlatMapFunction flattenCfHbFn() { return r -> Stream .concat( - r.getCollectedfrom().stream().map(KeyValue::getKey), + Optional + .ofNullable(r.getCollectedfrom()) + .map(cf -> cf.stream().map(KeyValue::getKey)) + .orElse(Stream.empty()), Stream .concat( - r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), - r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank), + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map( + i -> Optional + .ofNullable(i.getCollectedfrom()) + .map(KeyValue::getKey) + .orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank))) .distinct() .filter(StringUtils::isNotBlank) .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb))