changed the programme.desxcription by using the same value used in the classification instead of the short title or the title

This commit is contained in:
Miriam Baglioni 2020-10-01 10:31:33 +02:00
parent f6587c91f3
commit 416bda6066
1 changed files with 7 additions and 19 deletions

View File

@ -104,14 +104,10 @@ public class SparkAtomicActionJob {
Dataset<Project> aaproject = project Dataset<Project> aaproject = project
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> { .map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
// Tuple2<CSVProject, CSVProgramme> projectprogramme = c._1();
CSVProject csvProject = c._1(); CSVProject csvProject = c._1();
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2()); Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2());
// String topicdescription = Optional
// .ofNullable(c._2())
// .map(t -> t.getTitle())
// .orElse(null);
return Optional return Optional
.ofNullable(c._2()) .ofNullable(c._2())
@ -126,35 +122,26 @@ public class SparkAtomicActionJob {
H2020Programme pm = new H2020Programme(); H2020Programme pm = new H2020Programme();
H2020Classification h2020classification = new H2020Classification(); H2020Classification h2020classification = new H2020Classification();
pm.setCode(csvProject.getProgramme()); pm.setCode(csvProject.getProgramme());
if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) {
pm.setDescription(csvProgramme.getShortTitle());
} else {
pm.setDescription(csvProgramme.getTitle());
}
h2020classification.setClassification(ocsvProgramme.get().getClassification()); h2020classification.setClassification(ocsvProgramme.get().getClassification());
setLevels(h2020classification, ocsvProgramme.get().getClassification()); setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification());
h2020classification.setH2020Programme(pm); h2020classification.setH2020Programme(pm);
pp.setH2020classification(Arrays.asList(h2020classification)); pp.setH2020classification(Arrays.asList(h2020classification));
// if (topicdescription != null) {
// pp.setH2020topicdescription(topicdescription);
// }
return pp; return pp;
}) })
.orElse(null); .orElse(null);
// return p;
}, Encoders.bean(Project.class)); }, Encoders.bean(Project.class));
aaproject aaproject
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code"))) .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")))
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> { .map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
Optional<EXCELTopic> op = Optional.ofNullable(p._2()); Optional<EXCELTopic> op = Optional.ofNullable(p._2());
Project rp = p._1();
if (op.isPresent()) { if (op.isPresent()) {
Project rp = p._1();
rp.setH2020topicdescription(op.get().getTitle()); rp.setH2020topicdescription(op.get().getTitle());
return rp;
} }
return p._1(); return rp;
}, Encoders.bean(Project.class)) }, Encoders.bean(Project.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.groupByKey( .groupByKey(
@ -176,7 +163,7 @@ public class SparkAtomicActionJob {
} }
private static void setLevels(H2020Classification h2020Classification, String classification) { private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) {
String[] tmp = classification.split(" \\| "); String[] tmp = classification.split(" \\| ");
h2020Classification.setLevel1(tmp[0]); h2020Classification.setLevel1(tmp[0]);
if (tmp.length > 1) { if (tmp.length > 1) {
@ -185,6 +172,7 @@ public class SparkAtomicActionJob {
if (tmp.length > 2) { if (tmp.length > 2) {
h2020Classification.setLevel3(tmp[2]); h2020Classification.setLevel3(tmp[2]);
} }
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length-1]);
} }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(