diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 802c3ff219..0b86dcdf1a 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -19,6 +19,11 @@ org.apache.spark spark-sql_2.11 + + org.apache.spark + spark-hive_2.11 + test + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 95c3cd4800..dbbb88b88c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -18,29 +18,38 @@ public class SparkGraphImporterJob { "/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); + new SparkGraphImporterJob().run(parser); + } + + private void run(ArgumentApplicationParser parser) { try(SparkSession spark = getSparkSession(parser)) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String hiveDbName = parser.get("hive_db_name"); - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - - // Read the input file and convert it into RDD of serializable object - GraphMappingUtils.types.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name)); + runWith(spark, inputPath, hiveDbName); } } + // public for testing + public void runWith(SparkSession spark, String inputPath, String hiveDbName) { + + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + // Read the input file and convert it into RDD of serializable object + GraphMappingUtils.types.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name)); + } + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - return SparkSession .builder() .appName(SparkGraphImporterJob.class.getSimpleName()) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index 86fca71f34..13c7abd517 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,6 +1,6 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, - {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, + {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index bbee2f01cb..e63bbbbfba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -59,10 +59,10 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} + -mt yarn + -s${sourcePath} + -db${hive_db_name} + -h${hive_metastore_uris} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java index c7743d6845..511adf3f14 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java @@ -1,52 +1,54 @@ package eu.dnetlib.dhp.graph; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; +import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import scala.Tuple2; import java.nio.file.Path; -import java.util.List; -import java.util.stream.Collectors; public class SparkGraphImporterJobTest { - private static final long MAX = 1000L; + private final static String TEST_DB_NAME = "test"; - @Disabled("must be parametrized to run locally") - public void testImport(@TempDir Path outPath) throws Exception { - SparkGraphImporterJob.main(new String[] { - "-mt", "local[*]", - "-s", getClass().getResource("/eu/dnetlib/dhp/graph/sample").getPath(), - "-h", "", - "-db", "test" - }); + @Test + public void testImport(@TempDir Path outPath) { + try(SparkSession spark = testSparkSession(outPath.toString())) { - countEntities(outPath.toString()).forEach(t -> { - System.out.println(t); - Assertions.assertEquals(MAX, t._2().longValue(), String.format("mapped %s must be %s", t._1(), MAX)); - }); + new SparkGraphImporterJob().runWith( + spark, + getClass().getResource("/eu/dnetlib/dhp/graph/sample").getPath(), + TEST_DB_NAME); + + GraphMappingUtils.types.forEach((name, clazz) -> { + final long count = spark.read().table(TEST_DB_NAME + "." + name).count(); + if (name.equals("relation")) { + Assertions.assertEquals(100, count, String.format("%s should be 100", name)); + } else { + Assertions.assertEquals(10, count, String.format("%s should be 10", name)); + } + }); + } } - public static List> countEntities(final String inputPath) { + private SparkSession testSparkSession(final String inputPath) { + SparkConf conf = new SparkConf(); - final SparkSession spark = SparkSession + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("hive.metastore.warehouse.dir", inputPath + "/warehouse"); + conf.set("spark.sql.warehouse.dir", inputPath); + conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s/junit_metastore_db;create=true", inputPath)); + conf.set("spark.ui.enabled", "false"); + + return SparkSession .builder() .appName(SparkGraphImporterJobTest.class.getSimpleName()) .master("local[*]") + .config(conf) + .enableHiveSupport() .getOrCreate(); - //final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - return GraphMappingUtils.types.entrySet() - .stream() - .map(entry -> { - final Long count = spark.read().load(inputPath + "/" + entry.getKey()).as(Encoders.bean(entry.getValue())).count(); - return new Tuple2(entry.getKey(), count); - }) - .collect(Collectors.toList()); } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/sample/dataset/dataset_10.json.gz b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/sample/dataset/dataset_10.json.gz index ce0b9709be..0da3c4071a 100644 Binary files a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/sample/dataset/dataset_10.json.gz and b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/sample/dataset/dataset_10.json.gz differ diff --git a/pom.xml b/pom.xml index 6594943a8c..861b7a2eae 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,12 @@ ${dhp.spark.version} provided + + org.apache.spark + spark-hive_2.11 + ${dhp.spark.version} + test + org.slf4j