diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 68b17c31f..a9d498f90 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -101,21 +101,20 @@ public class SparkAtomicActionJob { Dataset programme = readPath(spark, programmePath, CSVProgramme.class); Dataset topic = readPath(spark, topicPath, EXCELTopic.class); - project + Dataset aaproject = project .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") - .joinWith(topic, project.col("topics").equalTo(topic.col("code")), "left") - .map((MapFunction, EXCELTopic>, Project>) c -> { - Tuple2 projectprogramme = c._1(); - CSVProject csvProject = projectprogramme._1(); - Optional ocsvProgramme = Optional.ofNullable(projectprogramme._2()); + .map((MapFunction, Project>) c -> { + // Tuple2 projectprogramme = c._1(); + CSVProject csvProject = c._1(); + Optional ocsvProgramme = Optional.ofNullable(c._2()); - String topicdescription = Optional +// String topicdescription = Optional +// .ofNullable(c._2()) +// .map(t -> t.getTitle()) +// .orElse(null); + + return Optional .ofNullable(c._2()) - .map(t -> t.getTitle()) - .orElse(null); - - Project p = Optional - .ofNullable(projectprogramme._2()) .map(csvProgramme -> { Project pp = new Project(); pp @@ -136,14 +135,26 @@ public class SparkAtomicActionJob { setLevels(h2020classification, ocsvProgramme.get().getClassification()); h2020classification.setH2020Programme(pm); pp.setH2020classification(Arrays.asList(h2020classification)); - if (topicdescription != null) { - pp.setH2020topicdescription(topicdescription); - } +// if (topicdescription != null) { +// pp.setH2020topicdescription(topicdescription); +// } return pp; }) .orElse(null); - return p; + // return p; + }, Encoders.bean(Project.class)); + + aaproject + .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code"))) + .map((MapFunction, Project>) p -> { + Optional op = Optional.ofNullable(p._2()); + if (op.isPresent()) { + Project rp = p._1(); + rp.setH2020topicdescription(op.get().getTitle()); + return rp; + } + return p._1(); }, Encoders.bean(Project.class)) .filter(Objects::nonNull) .groupByKey(