forked from D-Net/dnet-hadoop
changed the logic to add the topic description to the project
This commit is contained in:
parent
0a035e3630
commit
7e73bb88b3
|
@ -101,21 +101,20 @@ public class SparkAtomicActionJob {
|
||||||
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
|
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
|
||||||
Dataset<EXCELTopic> topic = readPath(spark, topicPath, EXCELTopic.class);
|
Dataset<EXCELTopic> topic = readPath(spark, topicPath, EXCELTopic.class);
|
||||||
|
|
||||||
project
|
Dataset<Project> aaproject = project
|
||||||
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
|
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
|
||||||
.joinWith(topic, project.col("topics").equalTo(topic.col("code")), "left")
|
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
|
||||||
.map((MapFunction<Tuple2<Tuple2<CSVProject, CSVProgramme>, EXCELTopic>, Project>) c -> {
|
// Tuple2<CSVProject, CSVProgramme> projectprogramme = c._1();
|
||||||
Tuple2<CSVProject, CSVProgramme> projectprogramme = c._1();
|
CSVProject csvProject = c._1();
|
||||||
CSVProject csvProject = projectprogramme._1();
|
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2());
|
||||||
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(projectprogramme._2());
|
|
||||||
|
|
||||||
String topicdescription = Optional
|
// String topicdescription = Optional
|
||||||
|
// .ofNullable(c._2())
|
||||||
|
// .map(t -> t.getTitle())
|
||||||
|
// .orElse(null);
|
||||||
|
|
||||||
|
return Optional
|
||||||
.ofNullable(c._2())
|
.ofNullable(c._2())
|
||||||
.map(t -> t.getTitle())
|
|
||||||
.orElse(null);
|
|
||||||
|
|
||||||
Project p = Optional
|
|
||||||
.ofNullable(projectprogramme._2())
|
|
||||||
.map(csvProgramme -> {
|
.map(csvProgramme -> {
|
||||||
Project pp = new Project();
|
Project pp = new Project();
|
||||||
pp
|
pp
|
||||||
|
@ -136,14 +135,26 @@ public class SparkAtomicActionJob {
|
||||||
setLevels(h2020classification, ocsvProgramme.get().getClassification());
|
setLevels(h2020classification, ocsvProgramme.get().getClassification());
|
||||||
h2020classification.setH2020Programme(pm);
|
h2020classification.setH2020Programme(pm);
|
||||||
pp.setH2020classification(Arrays.asList(h2020classification));
|
pp.setH2020classification(Arrays.asList(h2020classification));
|
||||||
if (topicdescription != null) {
|
// if (topicdescription != null) {
|
||||||
pp.setH2020topicdescription(topicdescription);
|
// pp.setH2020topicdescription(topicdescription);
|
||||||
}
|
// }
|
||||||
return pp;
|
return pp;
|
||||||
})
|
})
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
|
|
||||||
return p;
|
// return p;
|
||||||
|
}, Encoders.bean(Project.class));
|
||||||
|
|
||||||
|
aaproject
|
||||||
|
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")))
|
||||||
|
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
|
||||||
|
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
|
||||||
|
if (op.isPresent()) {
|
||||||
|
Project rp = p._1();
|
||||||
|
rp.setH2020topicdescription(op.get().getTitle());
|
||||||
|
return rp;
|
||||||
|
}
|
||||||
|
return p._1();
|
||||||
}, Encoders.bean(Project.class))
|
}, Encoders.bean(Project.class))
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
|
|
Loading…
Reference in New Issue