diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index a9d498f90..19b22f18e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -104,14 +104,10 @@ public class SparkAtomicActionJob { Dataset aaproject = project .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") .map((MapFunction, Project>) c -> { - // Tuple2 projectprogramme = c._1(); + CSVProject csvProject = c._1(); Optional ocsvProgramme = Optional.ofNullable(c._2()); -// String topicdescription = Optional -// .ofNullable(c._2()) -// .map(t -> t.getTitle()) -// .orElse(null); return Optional .ofNullable(c._2()) @@ -126,35 +122,26 @@ public class SparkAtomicActionJob { H2020Programme pm = new H2020Programme(); H2020Classification h2020classification = new H2020Classification(); pm.setCode(csvProject.getProgramme()); - if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) { - pm.setDescription(csvProgramme.getShortTitle()); - } else { - pm.setDescription(csvProgramme.getTitle()); - } h2020classification.setClassification(ocsvProgramme.get().getClassification()); - setLevels(h2020classification, ocsvProgramme.get().getClassification()); + setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification()); h2020classification.setH2020Programme(pm); pp.setH2020classification(Arrays.asList(h2020classification)); -// if (topicdescription != null) { -// pp.setH2020topicdescription(topicdescription); -// } + return pp; }) .orElse(null); - // return p; }, Encoders.bean(Project.class)); aaproject .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code"))) .map((MapFunction, Project>) p -> { Optional op = Optional.ofNullable(p._2()); + Project rp = p._1(); if (op.isPresent()) { - Project rp = p._1(); rp.setH2020topicdescription(op.get().getTitle()); - return rp; } - return p._1(); + return rp; }, Encoders.bean(Project.class)) .filter(Objects::nonNull) .groupByKey( @@ -176,7 +163,7 @@ public class SparkAtomicActionJob { } - private static void setLevels(H2020Classification h2020Classification, String classification) { + private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) { String[] tmp = classification.split(" \\| "); h2020Classification.setLevel1(tmp[0]); if (tmp.length > 1) { @@ -185,6 +172,7 @@ public class SparkAtomicActionJob { if (tmp.length > 2) { h2020Classification.setLevel3(tmp[2]); } + h2020Classification.getH2020Programme().setDescription(tmp[tmp.length-1]); } public static Dataset readPath(