diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index a583b7bfa2..fdc12c6629 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -120,7 +120,6 @@ public class SparkAtomicActionJob { .map((MapFunction, Project>) c -> { CSVProject csvProject = c._1(); - Optional ocsvProgramme = Optional.ofNullable(c._2()); return Optional .ofNullable(c._2()) @@ -135,9 +134,9 @@ public class SparkAtomicActionJob { H2020Programme pm = new H2020Programme(); H2020Classification h2020classification = new H2020Classification(); pm.setCode(csvProject.getProgramme()); - h2020classification.setClassification(ocsvProgramme.get().getClassification()); + h2020classification.setClassification(csvProgramme.getClassification()); h2020classification.setH2020Programme(pm); - setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short()); + setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short()); // setProgramme(h2020classification, ocsvProgramme.get().getClassification()); pp.setH2020classification(Arrays.asList(h2020classification)); @@ -145,10 +144,11 @@ public class SparkAtomicActionJob { }) .orElse(null); - }, Encoders.bean(Project.class)); + }, Encoders.bean(Project.class)) + .filter(Objects::nonNull); aaproject - .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code"))) + .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left") .map((MapFunction, Project>) p -> { Optional op = Optional.ofNullable(p._2()); Project rp = p._1();