enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
1 changed files with 52 additions and 19 deletions
Showing only changes of commit 5309a99a70 - Show all commits

View File

@ -9,9 +9,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,6 +57,9 @@ public class PrepareProjects {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath); log.info("outputPath {}: ", outputPath);
final String dbProjectPath = parser.get("dbProjectPath");
log.info("dbProjectPath {}: ", dbProjectPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -62,7 +67,7 @@ public class PrepareProjects {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
exec(spark, projectPath, outputPath); exec(spark, projectPath, dbProjectPath, outputPath);
}); });
} }
@ -70,27 +75,55 @@ public class PrepareProjects {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
private static void exec(SparkSession spark, String progjectPath, String outputPath) { private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) {
Dataset<CSVProject> project = readPath(spark, progjectPath, CSVProject.class); Dataset<CSVProject> project = readPath(spark, progjectPath, CSVProject.class);
Dataset<ProjectSubset> dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class);
project dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left")
.toJavaRDD() .flatMap((FlatMapFunction<Tuple2<ProjectSubset, CSVProject>, CSVProject>) value -> {
.flatMap(p -> { Optional<CSVProject> csvProject = Optional.ofNullable(value._2());
List<CSVProject> csvProjectList = new ArrayList<>(); if(! csvProject.isPresent()){
String[] programme = p.getProgramme().split(";"); return null;
Arrays }
.stream(programme) List<CSVProject> csvProjectList = new ArrayList<>();
.forEach(value -> { String[] programme = csvProject.get().getProgramme().split(";");
CSVProject csvProject = new CSVProject(); Arrays
csvProject.setProgramme(value); .stream(programme)
csvProject.setId(p.getId()); .forEach(p -> {
csvProjectList.add(csvProject); CSVProject proj = new CSVProject();
}); proj.setProgramme(p);
proj.setId(csvProject.get().getId());
csvProjectList.add(proj);
});
return csvProjectList.iterator(); return csvProjectList.iterator();
}) }, Encoders.bean(CSVProject.class))
.map(p -> OBJECT_MAPPER.writeValueAsString(p)) .write()
.saveAsTextFile(outputPath); .mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
//
// .map(value -> {
// Optional<CSVProject> csvProject = Optional.ofNullable(value._2());
// }, Encoders.bean(CSVProject.class))
// .filter(Objects::nonNull)
// .toJavaRDD()
// .flatMap(p -> {
// List<CSVProject> csvProjectList = new ArrayList<>();
// String[] programme = p.getProgramme().split(";");
// Arrays
// .stream(programme)
// .forEach(value -> {
// CSVProject csvProject = new CSVProject();
// csvProject.setProgramme(value);
// csvProject.setId(p.getId());
// csvProjectList.add(csvProject);
// });
//
// return csvProjectList.iterator();
// })
// .map(p -> OBJECT_MAPPER.writeValueAsString(p))
// .saveAsTextFile(outputPath);
} }