[cleaning] avoid NPE

This commit is contained in:
Claudio Atzori 2022-12-08 18:40:48 +01:00
parent 730228d73d
commit 389dd25430
1 changed files with 11 additions and 3 deletions

View File

@ -13,6 +13,7 @@ import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
@ -105,6 +106,7 @@ public class CleanCfHbSparkJob {
resolved resolved
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(resolvedPath); .json(resolvedPath);
@ -134,9 +136,15 @@ public class CleanCfHbSparkJob {
private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() { private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
return t -> { return t -> {
t._1().setMasterId(t._2().getMasterId()); final IdCfHbMapping mapping = t._1();
t._1().setMasterName(t._2().getMasterName()); Optional
return t._1(); .ofNullable(t._2())
.ifPresent(t2 -> {
mapping.setMasterId(t2.getMasterId());
mapping.setMasterName(t2.getMasterName());
});
return mapping;
}; };
} }