fix issue with flatMap - the return type must not be null

This commit is contained in:
Miriam Baglioni 2020-05-28 23:50:32 +02:00
parent 773735f870
commit 01f7876595
2 changed files with 30 additions and 47 deletions

View File

@ -57,7 +57,7 @@ public class PrepareProjects {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath); log.info("outputPath {}: ", outputPath);
final String dbProjectPath = parser.get("dbProjectPath"); final String dbProjectPath = parser.get("dbProjectPath");
log.info("dbProjectPath {}: ", dbProjectPath); log.info("dbProjectPath {}: ", dbProjectPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -75,56 +75,39 @@ public class PrepareProjects {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) { private static void exec(SparkSession spark, String projectPath, String dbProjectPath, String outputPath) {
Dataset<CSVProject> project = readPath(spark, progjectPath, CSVProject.class); Dataset<CSVProject> project = readPath(spark, projectPath, CSVProject.class);
Dataset<ProjectSubset> dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class); Dataset<ProjectSubset> dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class);
dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") dbProjects
.flatMap((FlatMapFunction<Tuple2<ProjectSubset, CSVProject>, CSVProject>) value -> { .joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left")
Optional<CSVProject> csvProject = Optional.ofNullable(value._2()); .flatMap(getTuple2CSVProjectFlatMapFunction(), Encoders.bean(CSVProject.class))
if(! csvProject.isPresent()){ .filter(Objects::nonNull)
return null; .write()
} .mode(SaveMode.Overwrite)
List<CSVProject> csvProjectList = new ArrayList<>(); .option("compression", "gzip")
String[] programme = csvProject.get().getProgramme().split(";"); .json(outputPath);
Arrays
.stream(programme)
.forEach(p -> {
CSVProject proj = new CSVProject();
proj.setProgramme(p);
proj.setId(csvProject.get().getId());
csvProjectList.add(proj);
});
return csvProjectList.iterator(); }
}, Encoders.bean(CSVProject.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
//
// .map(value -> {
// Optional<CSVProject> csvProject = Optional.ofNullable(value._2());
// }, Encoders.bean(CSVProject.class))
// .filter(Objects::nonNull)
// .toJavaRDD()
// .flatMap(p -> {
// List<CSVProject> csvProjectList = new ArrayList<>();
// String[] programme = p.getProgramme().split(";");
// Arrays
// .stream(programme)
// .forEach(value -> {
// CSVProject csvProject = new CSVProject();
// csvProject.setProgramme(value);
// csvProject.setId(p.getId());
// csvProjectList.add(csvProject);
// });
//
// return csvProjectList.iterator();
// })
// .map(p -> OBJECT_MAPPER.writeValueAsString(p))
// .saveAsTextFile(outputPath);
private static FlatMapFunction<Tuple2<ProjectSubset, CSVProject>, CSVProject> getTuple2CSVProjectFlatMapFunction() {
return (FlatMapFunction<Tuple2<ProjectSubset, CSVProject>, CSVProject>) value -> {
Optional<CSVProject> csvProject = Optional.ofNullable(value._2());
List<CSVProject> csvProjectList = new ArrayList<>();
if (csvProject.isPresent()) {
String[] programme = csvProject.get().getProgramme().split(";");
Arrays
.stream(programme)
.forEach(p -> {
CSVProject proj = new CSVProject();
proj.setProgramme(p);
proj.setId(csvProject.get().getId());
csvProjectList.add(proj);
});
}
return csvProjectList.iterator();
};
} }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(