forked from antonis.lempesis/dnet-hadoop
refactoring and added "left" as join type to be 100% sure to get the whole set of projects
This commit is contained in:
parent
e07c3ba089
commit
1ee8f13580
|
@ -120,7 +120,6 @@ public class SparkAtomicActionJob {
|
|||
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
|
||||
|
||||
CSVProject csvProject = c._1();
|
||||
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2());
|
||||
|
||||
return Optional
|
||||
.ofNullable(c._2())
|
||||
|
@ -135,9 +134,9 @@ public class SparkAtomicActionJob {
|
|||
H2020Programme pm = new H2020Programme();
|
||||
H2020Classification h2020classification = new H2020Classification();
|
||||
pm.setCode(csvProject.getProgramme());
|
||||
h2020classification.setClassification(ocsvProgramme.get().getClassification());
|
||||
h2020classification.setClassification(csvProgramme.getClassification());
|
||||
h2020classification.setH2020Programme(pm);
|
||||
setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
|
||||
setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
|
||||
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
||||
pp.setH2020classification(Arrays.asList(h2020classification));
|
||||
|
||||
|
@ -145,10 +144,11 @@ public class SparkAtomicActionJob {
|
|||
})
|
||||
.orElse(null);
|
||||
|
||||
}, Encoders.bean(Project.class));
|
||||
}, Encoders.bean(Project.class))
|
||||
.filter(Objects::nonNull);
|
||||
|
||||
aaproject
|
||||
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")))
|
||||
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left")
|
||||
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
|
||||
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
|
||||
Project rp = p._1();
|
||||
|
|
Loading…
Reference in New Issue