From 5309a99a70bf6e81ca5d55e6d0062be1d75e05f4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 28 May 2020 17:29:53 +0200 Subject: [PATCH] modified the PrepareProjects to consider those in the db --- .../project/PrepareProjects.java | 71 ++++++++++++++----- 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java index 7ca50b219..3d8226f4d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java @@ -9,9 +9,11 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +57,9 @@ public class PrepareProjects { final String outputPath = parser.get("outputPath"); log.info("outputPath {}: ", outputPath); + final String dbProjectPath = parser.get("dbProjectPath"); + log.info("dbProjectPath {}: ", dbProjectPath); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -62,7 +67,7 @@ public class PrepareProjects { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - exec(spark, projectPath, outputPath); + exec(spark, projectPath, dbProjectPath, outputPath); }); } @@ -70,27 +75,55 @@ public class PrepareProjects { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static void exec(SparkSession spark, String progjectPath, String outputPath) { + private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) { Dataset project = readPath(spark, progjectPath, CSVProject.class); + Dataset dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class); - project - .toJavaRDD() - .flatMap(p -> { - List csvProjectList = new ArrayList<>(); - String[] programme = p.getProgramme().split(";"); - Arrays - .stream(programme) - .forEach(value -> { - CSVProject csvProject = new CSVProject(); - csvProject.setProgramme(value); - csvProject.setId(p.getId()); - csvProjectList.add(csvProject); - }); + dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") + .flatMap((FlatMapFunction, CSVProject>) value -> { + Optional csvProject = Optional.ofNullable(value._2()); + if(! csvProject.isPresent()){ + return null; + } + List csvProjectList = new ArrayList<>(); + String[] programme = csvProject.get().getProgramme().split(";"); + Arrays + .stream(programme) + .forEach(p -> { + CSVProject proj = new CSVProject(); + proj.setProgramme(p); + proj.setId(csvProject.get().getId()); + csvProjectList.add(proj); + }); - return csvProjectList.iterator(); - }) - .map(p -> OBJECT_MAPPER.writeValueAsString(p)) - .saveAsTextFile(outputPath); + return csvProjectList.iterator(); + }, Encoders.bean(CSVProject.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); +// +// .map(value -> { +// Optional csvProject = Optional.ofNullable(value._2()); +// }, Encoders.bean(CSVProject.class)) +// .filter(Objects::nonNull) +// .toJavaRDD() +// .flatMap(p -> { +// List csvProjectList = new ArrayList<>(); +// String[] programme = p.getProgramme().split(";"); +// Arrays +// .stream(programme) +// .forEach(value -> { +// CSVProject csvProject = new CSVProject(); +// csvProject.setProgramme(value); +// csvProject.setId(p.getId()); +// csvProjectList.add(csvProject); +// }); +// +// return csvProjectList.iterator(); +// }) +// .map(p -> OBJECT_MAPPER.writeValueAsString(p)) +// .saveAsTextFile(outputPath); }