From 25ceec29abf51868f8b46997b2e4dcf6334f24fc Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 4 Mar 2020 10:44:24 +0100 Subject: [PATCH 1/4] code formatting --- .../java/eu/dnetlib/dhp/graph/GraphMappingUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java index 7c0967b2e7..0291be47ef 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -18,13 +18,13 @@ public class GraphMappingUtils { public final static Map types = Maps.newHashMap(); static { - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); types.put("relation", Relation.class); } From 9af3e904be92636860bd7a2d5b9e770c1cfec8cd Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 4 Mar 2020 10:53:31 +0100 Subject: [PATCH 2/4] close the SparkSession at the end --- .../dhp/graph/SparkGraphImporterJob.java | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 6335589060..ced57d0970 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -15,30 +15,35 @@ public class SparkGraphImporterJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession + + try(SparkSession spark = getSparkSession(parser)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String hiveDbName = parser.get("hive_db_name"); + + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + + // Read the input file and convert it into RDD of serializable object + GraphMappingUtils.types.forEach((name, clazz) -> { + spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) + .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name); + }); + } + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + return SparkSession .builder() .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) .config("hive.metastore.uris", parser.get("hive_metastore_uris")) .enableHiveSupport() .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); - - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - - // Read the input file and convert it into RDD of serializable object - GraphMappingUtils.types.forEach((name, clazz) -> { - spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) - .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name); - }); - } } From 1e563bc15e4448c1d4bc41004ad06a99f9680d03 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 4 Mar 2020 10:55:11 +0100 Subject: [PATCH 3/4] introduced distinct properties driving the resouce usage for the XML record creation and the indexing phase --- .../eu/dnetlib/dhp/graph/oozie_app/workflow.xml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index e3d0ca192c..3132ae9407 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -50,9 +50,9 @@ eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} + --executor-cores ${sparkExecutorCoresForJoining} + --executor-memory ${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForJoining} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" @@ -79,8 +79,9 @@ eu.dnetlib.dhp.graph.SparkXmlIndexingJob dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} + --executor-cores ${sparkExecutorCoresForIndexing} + --executor-memory ${sparkExecutorMemoryForIndexing} + --driver-memory=${sparkDriverMemoryForIndexing} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" From 023398760384ac010ad2eecc72b3f127f81834d9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 4 Mar 2020 10:56:50 +0100 Subject: [PATCH 4/4] introduced post processing step following the hive DB creation/population --- .../dnetlib/dhp/graph/hive/postprocessing.sql | 8 ++++++++ .../dnetlib/dhp/graph/oozie_app/workflow.xml | 19 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/hive/postprocessing.sql diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/hive/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/hive/postprocessing.sql new file mode 100644 index 0000000000..26fcbacf5f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/hive/postprocessing.sql @@ -0,0 +1,8 @@ +CREATE view result as + select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p + union all + select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d + union all + select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s + union all + select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 24090a2458..b420384f5c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -37,12 +37,29 @@ MapGraphIntoDataFrame eu.dnetlib.dhp.graph.SparkGraphImporterJob dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster --sourcePath${sourcePath} --hive_db_name${hive_db_name} --hive_metastore_uris${hive_metastore_uris} + + + + + + + ${jobTracker} + ${nameNode} + + hive_db_name=${hive_db_name} +