diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index debe4436a..1a380db66 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -21,25 +21,30 @@ public class SparkGraphImporterJob { .builder() .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) - .config("hive.metastore.uris", parser.get("hive.metastore.uris")) + .config("hive.metastore.uris", parser.get("hive_metastore_uris")) .enableHiveSupport() .getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); + final String hiveDbName = parser.get("hive_db_name"); + + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); // Read the input file and convert it into RDD of serializable object GraphMappingUtils.types.forEach((name, clazz) -> { final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + spark.createDataset(inputRDD .filter(s -> s._1().equals(clazz.getName())) .map(Tuple2::_2) .map(s -> new ObjectMapper().readValue(s, clazz)) .rdd(), Encoders.bean(clazz)) + .limit(1000) .write() .mode(SaveMode.Overwrite) - .saveAsTable("openaire." + name); + .saveAsTable(hiveDbName + "." + name); }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index 7f76c064f..86fca71f3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,5 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"h", "paramLongName":"hive.metastore.uris","paramDescription": "the hive metastore uris", "paramRequired": true} + {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, + {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml index a5201a743..fcab9dd00 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -16,7 +16,11 @@ spark2 - hive.metastore.uris + hive_metastore_uris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + hive_db_name + openaire + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index ed2369087..24090a245 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -4,6 +4,10 @@ sourcePath the source path + + hive_db_name + the target hive database name + sparkDriverMemory memory for driver process @@ -36,7 +40,8 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster --sourcePath${sourcePath} - --hive.metastore.uris${hive.metastore.uris} + --hive_db_name${hive_db_name} + --hive_metastore_uris${hive_metastore_uris}