diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java index d0fe95289..bec3810f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java @@ -19,6 +19,8 @@ public class GraphHiveImporterJob { private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = @@ -37,12 +39,12 @@ public class GraphHiveImporterJob { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String hiveMetastoreUris = parser.get("hiveMetastoreUris"); - log.info("hiveMetastoreUris: {}", hiveMetastoreUris); - String hiveDbName = parser.get("hiveDbName"); log.info("hiveDbName: {}", hiveDbName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", hiveMetastoreUris); @@ -58,13 +60,13 @@ public class GraphHiveImporterJob { spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); // Read the input file and convert it into RDD of serializable object ModelSupport.oafTypes.forEach( (name, clazz) -> spark.createDataset( sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) + .map(s -> OBJECT_MAPPER.readValue(s, clazz)) .rdd(), Encoders.bean(clazz)) .write() diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml index 8d8766283..2c9a92d7a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml @@ -12,11 +12,7 @@ true - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris + hiveMetastoreUris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 @@ -24,7 +20,7 @@ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 - hive_db_name + hiveDbName openaire \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index 67ca6a64a..4e9e95c83 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -2,11 +2,11 @@ - sourcePath + inputPath the source path - hive_db_name + hiveDbName the target hive database name @@ -87,9 +87,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} + --inputPath${inputPath} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} @@ -102,7 +102,7 @@ hive.metastore.uris - ${hive_metastore_uris} + ${hiveMetastoreUris} ${hive_jdbc_url}/${hive_db_name}