[UsageCount] add check in case the datasource is not matched against those present in the graph #414
|
@ -111,8 +111,11 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
resultModel
|
resultModel
|
||||||
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
|
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
|
||||||
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
|
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
|
||||||
UsageStatsResultModel usrm = t2._1();
|
if(Optional.ofNullable(t2._2()).isPresent())
|
||||||
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
|
||||||
|
else
|
||||||
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
|
||||||
|
return usrm;
|
||||||
return usrm;
|
return usrm;
|
||||||
}, Encoders.bean(UsageStatsResultModel.class))
|
}, Encoders.bean(UsageStatsResultModel.class))
|
||||||
.write()
|
.write()
|
||||||
|
|
Loading…
Reference in New Issue