diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index eb7325af5..f48226d71 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -13,6 +13,7 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -105,6 +106,7 @@ public class CleanCfHbSparkJob { resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) + .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) .write() .mode(SaveMode.Overwrite) .json(resolvedPath); @@ -134,9 +136,15 @@ public class CleanCfHbSparkJob { private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { return t -> { - t._1().setMasterId(t._2().getMasterId()); - t._1().setMasterName(t._2().getMasterName()); - return t._1(); + final IdCfHbMapping mapping = t._1(); + Optional + .ofNullable(t._2()) + .ifPresent(t2 -> { + mapping.setMasterId(t2.getMasterId()); + mapping.setMasterName(t2.getMasterName()); + + }); + return mapping; }; }