From 47f3d9b7574c927b38d6d729b534d27ae0cad477 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Apr 2020 13:24:43 +0200 Subject: [PATCH] unit test for GraphHiveImporterJob --- .../dhp/common/SparkSessionSupport.java | 14 +++ ...moteActionPayloadForGraphTableJobTest.java | 30 +----- .../dhp/oa/graph/GraphHiveImporterJob.java | 69 ++++++++++++++ .../dhp/oa/graph/SparkGraphImporterJob.java | 62 ------------- .../{ => hive}/oozie_app/config-default.xml | 0 .../oozie_app/lib/scripts/postprocessing.sql | 0 .../graph/{ => hive}/oozie_app/workflow.xml | 2 +- .../oa/graph/input_graph_hive_parameters.json | 26 ++++++ .../dhp/oa/graph/input_graph_parameters.json | 6 -- .../oa/graph/GraphHiveImporterJobTest.java | 92 +++++++++++++++++++ .../oa/graph/SparkGraphImporterJobTest.java | 54 ----------- 11 files changed, 204 insertions(+), 151 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/lib/scripts/postprocessing.sql (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/{ => hive}/oozie_app/workflow.xml (97%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java index f42ee1c581..43c18a9560 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java @@ -29,6 +29,20 @@ public class SparkSessionSupport { runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn); } + /** + * Runs a given function using SparkSession created with hive support and using default builder and supplied SparkConf. + * Stops SparkSession when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkHiveSession(SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + runWithSparkSession(c -> SparkSession.builder().config(c).enableHiveSupport().getOrCreate(), conf, isSparkSessionManaged, fn); + } + /** * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession * when SparkSession is managed. Allows to reuse SparkSession created externally. diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java index 755679903b..6f53fbec25 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.actionmanager.promote; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; @@ -45,34 +46,7 @@ public class PromoteActionPayloadForGraphTableJobTest { conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName()); conf.setMaster("local"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(new Class[]{ - Author.class, - Context.class, - Country.class, - DataInfo.class, - eu.dnetlib.dhp.schema.oaf.Dataset.class, - Datasource.class, - ExternalReference.class, - ExtraInfo.class, - Field.class, - GeoLocation.class, - Instance.class, - Journal.class, - KeyValue.class, - Oaf.class, - OafEntity.class, - OAIProvenance.class, - Organization.class, - OriginDescription.class, - OtherResearchProduct.class, - Project.class, - Publication.class, - Qualifier.class, - Relation.class, - Result.class, - Software.class, - StructuredProperty.class - }); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); spark = SparkSession.builder().config(conf).getOrCreate(); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java new file mode 100644 index 0000000000..0270076dd4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.oa.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class GraphHiveImporterJob { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(GraphHiveImporterJob.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName: {}", hiveDbName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> loadGraphAsHiveDB(spark, inputPath, hiveDbName)); + } + + // protected for testing + private static void loadGraphAsHiveDB(SparkSession spark, String inputPath, String hiveDbName) { + + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + // Read the input file and convert it into RDD of serializable object + ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java deleted file mode 100644 index 44b534028b..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJob.java +++ /dev/null @@ -1,62 +0,0 @@ -package eu.dnetlib.dhp.oa.graph; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -public class SparkGraphImporterJob { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json"))); - parser.parseArgument(args); - - new SparkGraphImporterJob().run(parser); - } - - private void run(ArgumentApplicationParser parser) { - try(SparkSession spark = getSparkSession(parser)) { - - final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); - - runWith(spark, inputPath, hiveDbName); - } - } - - // protected for testing - protected void runWith(SparkSession spark, String inputPath, String hiveDbName) { - - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - // Read the input file and convert it into RDD of serializable object - ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name)); - } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - return SparkSession - .builder() - .appName(SparkGraphImporterJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/lib/scripts/postprocessing.sql rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml similarity index 97% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index b523ca17a3..271c7040fb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -49,7 +49,7 @@ yarn cluster MapGraphAsHiveDB - eu.dnetlib.dhp.oa.graph.SparkGraphImporterJob + eu.dnetlib.dhp.oa.graph.GraphHiveImporterJob dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json new file mode 100644 index 0000000000..d6c13773ae --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the target hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json deleted file mode 100644 index 13c7abd517..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, - {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} -] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java new file mode 100644 index 0000000000..29ca46d1df --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java @@ -0,0 +1,92 @@ +package eu.dnetlib.dhp.oa.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +public class GraphHiveImporterJobTest { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ClassLoader cl = GraphHiveImporterJobTest.class.getClassLoader(); + + public static final String JDBC_DERBY_TEMPLATE = "jdbc:derby:;databaseName=%s/junit_metastore_db;create=true"; + + private static SparkSession spark; + + private static Path workingDir; + + private static String dbName; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GraphHiveImporterJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + dbName = RandomStringUtils.randomAlphabetic(5); + log.info("using DB name {}", "test_" + dbName); + + SparkConf conf = new SparkConf(); + conf.setAppName(GraphHiveImporterJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.set("javax.jdo.option.ConnectionURL", String.format(JDBC_DERBY_TEMPLATE, workingDir.resolve("warehouse").toString())); + + spark = SparkSession + .builder() + .appName(GraphHiveImporterJobTest.class.getSimpleName()) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testImportGraphAsHiveDB() throws Exception { + + GraphHiveImporterJob.main(new String[]{ + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(), + "-hiveMetastoreUris", "", + "-hiveDbName", dbName + }); + + ModelSupport.oafTypes.forEach((name, clazz) -> { + long count = spark.read().table(dbName + "." + name).count(); + int expected = name.equals("relation") ? 100 : 10; + + Assertions.assertEquals(expected, count, String.format("%s should be %s", name, expected)); + }); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java deleted file mode 100644 index 302cef8d65..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/SparkGraphImporterJobTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.dhp.oa.graph; - -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.nio.file.Path; - -public class SparkGraphImporterJobTest { - - private final static String TEST_DB_NAME = "test"; - - @Test - public void testImport(@TempDir Path outPath) { - try(SparkSession spark = testSparkSession(outPath.toString())) { - - new SparkGraphImporterJob().runWith( - spark, - getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(), - TEST_DB_NAME); - - GraphMappingUtils.types.forEach((name, clazz) -> { - final long count = spark.read().table(TEST_DB_NAME + "." + name).count(); - if (name.equals("relation")) { - Assertions.assertEquals(100, count, String.format("%s should be 100", name)); - } else { - Assertions.assertEquals(10, count, String.format("%s should be 10", name)); - } - }); - } - } - - private SparkSession testSparkSession(final String inputPath) { - SparkConf conf = new SparkConf(); - - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("hive.metastore.warehouse.dir", inputPath + "/warehouse"); - conf.set("spark.sql.warehouse.dir", inputPath); - conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s/junit_metastore_db;create=true", inputPath)); - conf.set("spark.ui.enabled", "false"); - - return SparkSession - .builder() - .appName(SparkGraphImporterJobTest.class.getSimpleName()) - .master("local[*]") - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } - -}