[DUMP INDICATORS ] refactoring

This commit is contained in:
Miriam Baglioni 2022-11-10 09:40:42 +01:00
parent 0a53c29a8f
commit 31ce13ffb4
1 changed files with 11 additions and 11 deletions

View File

@ -72,17 +72,17 @@ public class SparkUpdateProjectInfo implements Serializable {
Dataset<CommunityResult> result = Utils.readPath(spark, inputPath, CommunityResult.class); Dataset<CommunityResult> result = Utils.readPath(spark, inputPath, CommunityResult.class);
Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class);
Dataset<CommunityResult> tmp = result Dataset<CommunityResult> tmp = result
.joinWith( .joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")), resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left") "left")
.map((MapFunction<Tuple2<CommunityResult, ResultProject>, CommunityResult>) value -> { .map((MapFunction<Tuple2<CommunityResult, ResultProject>, CommunityResult>) value -> {
CommunityResult r = value._1(); CommunityResult r = value._1();
Optional.ofNullable(value._2()).ifPresent(rp -> r.setProjects(rp.getProjectsList())); Optional.ofNullable(value._2()).ifPresent(rp -> r.setProjects(rp.getProjectsList()));
return r; return r;
}, Encoders.bean(CommunityResult.class)); }, Encoders.bean(CommunityResult.class));
long long count = tmp.count();
count = tmp.count(); tmp
tmp.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.json(outputPath); .json(outputPath);