[stats wf] indicators across stats dbs & updates in the org ids #248

Closed
dimitris.pierrakos wants to merge 1742 commits from beta into beta2master_sept_2022
1 changed files with 24 additions and 3 deletions
Showing only changes of commit c18b8048c3 - Show all commits

View File

@ -151,11 +151,32 @@ public class CleanCfHbSparkJob {
private static <T extends Result> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
return r -> Stream
.concat(
r.getCollectedfrom().stream().map(KeyValue::getKey),
Optional
.ofNullable(r.getCollectedfrom())
.map(cf -> cf.stream().map(KeyValue::getKey))
.orElse(Stream.empty()),
Stream
.concat(
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
Optional
.ofNullable(r.getInstance())
.map(
instances -> instances
.stream()
.map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse("")))
.orElse(Stream.empty())
.filter(StringUtils::isNotBlank),
Optional
.ofNullable(r.getInstance())
.map(
instances -> instances
.stream()
.map(
i -> Optional
.ofNullable(i.getCollectedfrom())
.map(KeyValue::getKey)
.orElse("")))
.orElse(Stream.empty())
.filter(StringUtils::isNotBlank)))
.distinct()
.filter(StringUtils::isNotBlank)
.map(cfHb -> asIdCfHbMapping(r.getId(), cfHb))