From 9af3e904be92636860bd7a2d5b9e770c1cfec8cd Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 4 Mar 2020 10:53:31 +0100 Subject: [PATCH] close the SparkSession at the end --- .../dhp/graph/SparkGraphImporterJob.java | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 633558906..ced57d097 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -15,30 +15,35 @@ public class SparkGraphImporterJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession + + try(SparkSession spark = getSparkSession(parser)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String hiveDbName = parser.get("hive_db_name"); + + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + + // Read the input file and convert it into RDD of serializable object + GraphMappingUtils.types.forEach((name, clazz) -> { + spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) + .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name); + }); + } + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + return SparkSession .builder() .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) .config("hive.metastore.uris", parser.get("hive_metastore_uris")) .enableHiveSupport() .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); - - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - - // Read the input file and convert it into RDD of serializable object - GraphMappingUtils.types.forEach((name, clazz) -> { - spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) - .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name); - }); - } }