diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java index 3d8226f4d..78aed1a69 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java @@ -57,7 +57,7 @@ public class PrepareProjects { final String outputPath = parser.get("outputPath"); log.info("outputPath {}: ", outputPath); - final String dbProjectPath = parser.get("dbProjectPath"); + final String dbProjectPath = parser.get("dbProjectPath"); log.info("dbProjectPath {}: ", dbProjectPath); SparkConf conf = new SparkConf(); @@ -75,56 +75,39 @@ public class PrepareProjects { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) { - Dataset project = readPath(spark, progjectPath, CSVProject.class); + private static void exec(SparkSession spark, String projectPath, String dbProjectPath, String outputPath) { + Dataset project = readPath(spark, projectPath, CSVProject.class); Dataset dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class); - dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") - .flatMap((FlatMapFunction, CSVProject>) value -> { - Optional csvProject = Optional.ofNullable(value._2()); - if(! csvProject.isPresent()){ - return null; - } - List csvProjectList = new ArrayList<>(); - String[] programme = csvProject.get().getProgramme().split(";"); - Arrays - .stream(programme) - .forEach(p -> { - CSVProject proj = new CSVProject(); - proj.setProgramme(p); - proj.setId(csvProject.get().getId()); - csvProjectList.add(proj); - }); + dbProjects + .joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") + .flatMap(getTuple2CSVProjectFlatMapFunction(), Encoders.bean(CSVProject.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - return csvProjectList.iterator(); - }, Encoders.bean(CSVProject.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); -// -// .map(value -> { -// Optional csvProject = Optional.ofNullable(value._2()); -// }, Encoders.bean(CSVProject.class)) -// .filter(Objects::nonNull) -// .toJavaRDD() -// .flatMap(p -> { -// List csvProjectList = new ArrayList<>(); -// String[] programme = p.getProgramme().split(";"); -// Arrays -// .stream(programme) -// .forEach(value -> { -// CSVProject csvProject = new CSVProject(); -// csvProject.setProgramme(value); -// csvProject.setId(p.getId()); -// csvProjectList.add(csvProject); -// }); -// -// return csvProjectList.iterator(); -// }) -// .map(p -> OBJECT_MAPPER.writeValueAsString(p)) -// .saveAsTextFile(outputPath); + } + private static FlatMapFunction, CSVProject> getTuple2CSVProjectFlatMapFunction() { + return (FlatMapFunction, CSVProject>) value -> { + Optional csvProject = Optional.ofNullable(value._2()); + List csvProjectList = new ArrayList<>(); + if (csvProject.isPresent()) { + + String[] programme = csvProject.get().getProgramme().split(";"); + Arrays + .stream(programme) + .forEach(p -> { + CSVProject proj = new CSVProject(); + proj.setProgramme(p); + proj.setId(csvProject.get().getId()); + csvProjectList.add(proj); + }); + } + return csvProjectList.iterator(); + }; } public static Dataset readPath( diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/dbProject b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/dbProject new file mode 100644 index 000000000..e69de29bb