diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index ced57d0970..95c3cd4800 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.graph; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -13,7 +13,9 @@ public class SparkGraphImporterJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream( + "/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); try(SparkSession spark = getSparkSession(parser)) { @@ -26,23 +28,24 @@ public class SparkGraphImporterJob { spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); // Read the input file and convert it into RDD of serializable object - GraphMappingUtils.types.forEach((name, clazz) -> { - spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) - .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name); - }); + GraphMappingUtils.types.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name)); } } private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + return SparkSession .builder() .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) - .config("hive.metastore.uris", parser.get("hive_metastore_uris")) + .config(conf) .enableHiveSupport() .getOrCreate(); }