diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 1a0afb9811..6664d36ec8 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -102,6 +103,7 @@ public class SparkBulkTagJob { ResultTagger resultTagger = new ResultTagger(); readPath(spark, inputPath, resultClazz) .map(patchResult(), Encoders.bean(resultClazz)) + .filter(Objects::nonNull) .map( (MapFunction) value -> resultTagger .enrichContextCriteria( @@ -124,6 +126,9 @@ public class SparkBulkTagJob { // TODO remove this hack as soon as the values fixed by this method will be provided as NON null private static MapFunction patchResult() { return r -> { + if(r.getDataInfo() == null){ + return null; + } if (r.getDataInfo().getDeletedbyinference() == null) { r.getDataInfo().setDeletedbyinference(false); }