From cce21eafc21c66a9dd72f8487e3d119ecc59cd20 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 6 Aug 2020 21:48:29 +0200 Subject: [PATCH] WIP: materialize graph as Hive DB, configured spark actions to include hive support] --- .../oa/graph/clean/CleanGraphSparkJob.java | 67 +++++--- .../oa/graph/merge/MergeGraphSparkJob.java | 83 ++++----- .../raw/GenerateEntitiesApplication.java | 6 +- .../oa/graph/raw/MergeClaimsApplication.java | 14 +- .../graph/clean/oozie_app/config-default.xml | 4 + .../dhp/oa/graph/clean/oozie_app/workflow.xml | 162 ++++++++++++++---- .../graph/generate_entities_parameters.json | 2 +- .../graph/hive/oozie_app/config-default.xml | 8 +- .../dhp/oa/graph/hive/oozie_app/workflow.xml | 4 + .../graph/input_clean_graph_parameters.json | 32 +++- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 28 ++- .../dhp/oa/graph/merge_claims_parameters.json | 6 + .../dhp/oa/graph/merge_graphs_parameters.json | 6 + .../oa/graph/raw_all/oozie_app/workflow.xml | 37 +++- .../eu/dnetlib/dhp/common/GraphSupport.java | 13 +- .../dhp/common/ResetHiveDbApplication.java | 14 +- .../dhp/common/reset_hive_db_parameters.json | 6 + 17 files changed, 361 insertions(+), 131 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index ba2515c5b..692bbe9ec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.oa.graph.clean; +import static eu.dnetlib.dhp.common.GraphSupport.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Objects; @@ -13,7 +15,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,50 +51,64 @@ public class CleanGraphSparkJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputGraph = parser.get("inputGraph"); + log.info("inputGraph: {}", inputGraph); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputGraph = parser.get("outputGraph"); + log.info("outputGraph: {}", outputGraph); - String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); + GraphFormat inputGraphFormat = Optional + .ofNullable(parser.get("inputGraphFormat")) + .map(GraphFormat::valueOf) + .orElse(GraphFormat.DEFAULT); + log.info("inputGraphFormat: {}", inputGraphFormat); + + GraphFormat outputGraphFormat = Optional + .ofNullable(parser.get("outputGraphFormat")) + .map(GraphFormat::valueOf) + .orElse(GraphFormat.DEFAULT); + log.info("outputGraphFormat: {}", outputGraphFormat); + + String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); - - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); - final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); SparkConf conf = new SparkConf(); - runWithSparkSession( + conf.set("hive.metastore.uris", hiveMetastoreUris); + + Class clazz = (Class) Class.forName(graphTableClassName); + + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl); + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); + + runWithSparkHiveSession( conf, isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath); - }); + spark -> cleanGraphTable(spark, vocs, inputGraph, inputGraphFormat, outputGraph, outputGraphFormat, clazz)); } - private static void fixGraphTable( + private static void cleanGraphTable( SparkSession spark, VocabularyGroup vocs, - String inputPath, - Class clazz, - String outputPath) { + String inputGraph, + GraphFormat inputGraphFormat, + String outputGraph, + GraphFormat outputGraphFormat, + Class clazz) { final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); - readTableFromPath(spark, inputPath, clazz) + Dataset cleaned = readGraph(spark, inputGraph, clazz, inputGraphFormat) .map((MapFunction) value -> fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) - .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)); + + saveGraphTable(cleaned, clazz, outputGraph, outputGraphFormat); } protected static T fixVocabularyNames(T value) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index 5e37938c7..e74d75dcb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -3,13 +3,12 @@ package eu.dnetlib.dhp.oa.graph.merge; import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable; import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Objects; import java.util.Optional; -import eu.dnetlib.dhp.common.GraphFormat; -import eu.dnetlib.dhp.common.GraphSupport; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -24,6 +23,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.GraphFormat; +import eu.dnetlib.dhp.common.GraphSupport; import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -71,15 +72,15 @@ public class MergeGraphSparkJob { log.info("outputGraph: {}", outputGraph); GraphFormat inputGraphFormat = Optional - .ofNullable(parser.get("inputGraphFormat")) - .map(GraphFormat::valueOf) - .orElse(GraphFormat.DEFAULT); + .ofNullable(parser.get("inputGraphFormat")) + .map(GraphFormat::valueOf) + .orElse(GraphFormat.DEFAULT); log.info("inputGraphFormat: {}", inputGraphFormat); GraphFormat outputGraphFormat = Optional - .ofNullable(parser.get("outputGraphFormat")) - .map(GraphFormat::valueOf) - .orElse(GraphFormat.DEFAULT); + .ofNullable(parser.get("outputGraphFormat")) + .map(GraphFormat::valueOf) + .orElse(GraphFormat.DEFAULT); log.info("outputGraphFormat: {}", outputGraphFormat); String graphTableClassName = parser.get("graphTableClassName"); @@ -87,45 +88,48 @@ public class MergeGraphSparkJob { Class clazz = (Class) Class.forName(graphTableClassName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession( + runWithSparkHiveSession( conf, isSparkSessionManaged, - spark -> { - deleteGraphTable(spark, clazz, outputGraph, outputGraphFormat); - mergeGraphTable(spark, priority, betaInputGraph, prodInputGraph, clazz, clazz, outputGraph, inputGraphFormat, outputGraphFormat); - }); + spark -> mergeGraphTable( + spark, priority, betaInputGraph, clazz, prodInputGraph, clazz, outputGraph, inputGraphFormat, + outputGraphFormat)); } private static

void mergeGraphTable( - SparkSession spark, - String priority, - String betaInputGraph, - String prodInputGraph, - Class

p_clazz, - Class b_clazz, - String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) { + SparkSession spark, + String priority, + String betaInputGraph, + Class b_clazz, + String prodInputGraph, + Class

p_clazz, + String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) { Dataset> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat); Dataset> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat); Dataset

merged = prod - .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") - .map((MapFunction, Tuple2>, P>) value -> { - Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); - Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); - switch (priority) { - default: - case "BETA": - return mergeWithPriorityToBETA(p, b); - case "PROD": - return mergeWithPriorityToPROD(p, b); - } - }, Encoders.bean(p_clazz)) - .filter((FilterFunction

) Objects::nonNull); + .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") + .map((MapFunction, Tuple2>, P>) value -> { + Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); + Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); + switch (priority) { + default: + case "BETA": + return mergeWithPriorityToBETA(p, b); + case "PROD": + return mergeWithPriorityToPROD(p, b); + } + }, Encoders.bean(p_clazz)) + .filter((FilterFunction

) Objects::nonNull); saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat); } @@ -151,14 +155,15 @@ public class MergeGraphSparkJob { } private static Dataset> readGraph( - SparkSession spark, String inputGraph, Class clazz, GraphFormat inputGraphFormat) { + SparkSession spark, String inputGraph, Class clazz, GraphFormat inputGraphFormat) { log.info("Reading Graph table from: {}", inputGraph); - return GraphSupport.readGraph(spark, inputGraph, clazz, inputGraphFormat) - .map((MapFunction>) t -> { - final String id = ModelSupport.idFn().apply(t); - return new Tuple2<>(id, t); - }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + return GraphSupport + .readGraph(spark, inputGraph, clazz, inputGraphFormat) + .map((MapFunction>) t -> { + final String id = ModelSupport.idFn().apply(t); + return new Tuple2<>(id, t); + }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 8b0af0081..4d2ba80b0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -70,10 +70,10 @@ public class GenerateEntitiesApplication { final String targetPath = parser.get("targetPath"); log.info("targetPath: {}", targetPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl); final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 901252878..bbc38eae1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -62,22 +62,18 @@ public class MergeClaimsApplication { Class clazz = (Class) Class.forName(graphTableClassName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); runWithSparkHiveSession( conf, isSparkSessionManaged, - spark -> { - String type = clazz.getSimpleName().toLowerCase(); - - String rawPath = rawGraphPath + "/" + type; - String claimPath = claimsGraphPath + "/" + type; - - deleteGraphTable(spark, clazz, outputGraph, format); - mergeByType(spark, rawPath, claimPath, outputGraph, format, clazz); - }); + spark -> mergeByType(spark, rawGraphPath, claimsGraphPath, outputGraph, format, clazz)); } private static void mergeByType( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml index 2e0ed9aee..cd29965e3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml @@ -15,4 +15,8 @@ oozie.action.sharelib.for.spark spark2 + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 7329df29a..c00d81669 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -2,18 +2,32 @@ - graphInputPath - the input path to read graph content + inputGraph + the input graph name (or path) - graphOutputPath - the target path to store cleaned graph + outputGraph + the output graph name (or path) - isLookupUrl + isLookUpUrl the address of the lookUp service + + inputGraphFormat + HIVE + the input graph data format + + + outputGraphFormat + HIVE + the output graph data format + + + hiveMetastoreUris + hive server metastore URIs + sparkDriverMemory memory for driver process @@ -48,14 +62,70 @@ spark2EventLogDir spark 2.* event log dir location + + sparkSqlWarehouseDir + spark 2.* db directory location + - + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + ${wf:conf('outputGraphFormat') eq 'JSON'} + ${wf:conf('outputGraphFormat') eq 'HIVE'} + + + + + + + yarn + cluster + reset_DB + eu.dnetlib.dhp.common.ResetHiveDbApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=7680 + + --dbName${outputGraph} + --hiveMetastoreUris${hiveMetastoreUris} + + + + + @@ -82,12 +152,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/publication - --outputPath${graphOutputPath}/publication + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -108,12 +182,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/dataset - --outputPath${graphOutputPath}/dataset + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -134,12 +212,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/otherresearchproduct - --outputPath${graphOutputPath}/otherresearchproduct + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -160,12 +242,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/software - --outputPath${graphOutputPath}/software + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -186,12 +272,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/datasource - --outputPath${graphOutputPath}/datasource + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -212,12 +302,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/organization - --outputPath${graphOutputPath}/organization + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -238,12 +332,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/project - --outputPath${graphOutputPath}/project + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} @@ -264,12 +362,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/relation - --outputPath${graphOutputPath}/relation + --inputGraph${inputGraph} + --outputGraph${outputGraph} + --inputGraphFormat${inputGraphFormat} + --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} + --hiveMetastoreUris${hiveMetastoreUris} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index 8342dde95..884528b42 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -19,7 +19,7 @@ }, { "paramName": "isu", - "paramLongName": "isLookupUrl", + "paramLongName": "isLookUpUrl", "paramDescription": "the url of the ISLookupService", "paramRequired": true } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml index 9608732ed..cb3e6ac0b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml @@ -11,6 +11,10 @@ oozie.use.system.libpath true + + oozie.action.sharelib.for.spark + spark2 + hiveMetastoreUris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 @@ -19,8 +23,4 @@ hiveJdbcUrl jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 - - hiveDbName - openaire - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index 09930336a..96404e752 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -51,6 +51,10 @@ spark2EventLogDir spark 2.* event log dir location + + sparkSqlWarehouseDir + spark 2.* db directory location + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json index 9cfed1e91..4a65e6332 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json @@ -6,20 +6,32 @@ "paramRequired": false }, { - "paramName": "in", - "paramLongName": "inputPath", - "paramDescription": "the path to the graph data dump to read", + "paramName": "ig", + "paramLongName": "inputGraph", + "paramDescription": "the input graph name (or path)", "paramRequired": true }, { - "paramName": "out", - "paramLongName": "outputPath", - "paramDescription": "the path to store the output graph", + "paramName": "og", + "paramLongName": "outputGraph", + "paramDescription": "the output graph name (or path)", + "paramRequired": true + }, + { + "paramName": "igf", + "paramLongName": "inputGraphFormat", + "paramDescription": "the input graph data format", + "paramRequired": true + }, + { + "paramName": "ogf", + "paramLongName": "outputGraphFormat", + "paramDescription": "the output graph data format", "paramRequired": true }, { "paramName": "isu", - "paramLongName": "isLookupUrl", + "paramLongName": "isLookUpUrl", "paramDescription": "url to the ISLookup Service", "paramRequired": true }, @@ -28,5 +40,11 @@ "paramLongName": "graphTableClassName", "paramDescription": "class name moelling the graph table", "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 26d78b928..f9bd7b300 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -28,6 +28,10 @@ the output graph data format + + hiveMetastoreUris + hive server metastore URIs + sparkDriverMemory memory for driver process @@ -62,6 +66,10 @@ spark2EventLogDir spark 2.* event log dir location + + sparkSqlWarehouseDir + spark 2.* db directory location + @@ -93,14 +101,16 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --dbName${outputGraph} + --hiveMetastoreUris${hiveMetastoreUris} - + @@ -127,6 +137,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -136,6 +147,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -156,6 +168,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -165,6 +178,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -185,6 +199,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -194,6 +209,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -214,6 +230,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -223,6 +240,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -243,6 +261,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -252,6 +271,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -272,6 +292,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -281,6 +302,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -301,6 +323,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -310,6 +333,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} @@ -330,6 +354,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --betaInputGraph${betaInputGraph} @@ -339,6 +364,7 @@ --outputGraphFormat${outputGraphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation --priority${priority} + --hiveMetastoreUris${hiveMetastoreUris} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json index 2296ae15f..165d8493d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json @@ -34,5 +34,11 @@ "paramLongName": "graphTableClassName", "paramDescription": "class name associated to the input entity path", "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json index 67fc7d42c..7d5b7576e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json @@ -46,5 +46,11 @@ "paramLongName": "priority", "paramDescription": "decides from which infrastructure the content must win in case of ID clash", "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 26a0e6121..f660e1c6c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -45,7 +45,7 @@ mongo database - isLookupUrl + isLookUpUrl the address of the lookUp service @@ -53,6 +53,11 @@ a blacklist of nsprefixes (comma separeted) + + + hiveMetastoreUris + hive server metastore URIs + sparkDriverMemory memory for driver process @@ -87,6 +92,10 @@ spark2EventLogDir spark 2.* event log dir location + + sparkSqlWarehouseDir + spark 2.* db directory location + @@ -137,7 +146,7 @@ --postgresUrl${postgresURL} --postgresUser${postgresUser} --postgresPassword${postgresPassword} - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} --actionclaims --dbschema${dbSchema} --nsPrefixBlacklist${nsPrefixBlacklist} @@ -190,7 +199,7 @@ --postgresUrl${postgresURL} --postgresUser${postgresUser} --postgresPassword${postgresPassword} - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} --dbschema${dbSchema} --nsPrefixBlacklist${nsPrefixBlacklist} @@ -274,7 +283,7 @@ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims,${contentPath}/oaf_records_invisible --targetPath${workingDir}/entities_claim - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} @@ -321,7 +330,7 @@ --sourcePaths${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records --targetPath${workingDir}/entities - --isLookupUrl${isLookupUrl} + --isLookUpUrl${isLookUpUrl} @@ -376,9 +385,11 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --dbName${outputGraph} + --hiveMetastoreUris${hiveMetastoreUris} @@ -410,6 +421,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --rawGraphPath${workingDir}/graph_raw @@ -417,6 +429,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --hiveMetastoreUris${hiveMetastoreUris} @@ -437,6 +450,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=7680 --rawGraphPath${workingDir}/graph_raw @@ -444,6 +458,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --hiveMetastoreUris${hiveMetastoreUris} @@ -464,6 +479,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=3840 --rawGraphPath${workingDir}/graph_raw @@ -471,6 +487,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation + --hiveMetastoreUris${hiveMetastoreUris} @@ -491,6 +508,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=1920 --rawGraphPath${workingDir}/graph_raw @@ -498,6 +516,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --hiveMetastoreUris${hiveMetastoreUris} @@ -518,6 +537,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=1920 --rawGraphPath${workingDir}/graph_raw @@ -525,6 +545,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --hiveMetastoreUris${hiveMetastoreUris} @@ -545,6 +566,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=200 --rawGraphPath${workingDir}/graph_raw @@ -552,6 +574,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --hiveMetastoreUris${hiveMetastoreUris} @@ -572,6 +595,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=200 --rawGraphPath${workingDir}/graph_raw @@ -579,6 +603,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --hiveMetastoreUris${hiveMetastoreUris} @@ -599,6 +624,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.shuffle.partitions=200 --rawGraphPath${workingDir}/graph_raw @@ -606,6 +632,7 @@ --outputGraph${outputGraph} --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --hiveMetastoreUris${hiveMetastoreUris} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java index d27c3c01d..c2952677c 100644 --- a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java @@ -30,7 +30,7 @@ public class GraphSupport { break; case HIVE: String table = ModelSupport.tableIdentifier(outputGraph, clazz); - String sql = String.format("DROP TABLE IF EXISTS %s PURGE;", table); + String sql = String.format("DROP TABLE IF EXISTS %s PURGE", table); log.info("running SQL: '{}'", sql); spark.sql(sql); break; @@ -46,14 +46,14 @@ public class GraphSupport { String type = clazz.getSimpleName().toLowerCase(); String outPath = outputGraph + "/" + type; - log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString()); + log.info("saving graph to path {},", outPath); writer.option("compression", "gzip").json(outPath); break; case HIVE: final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz); - log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString()); + log.info("saving graph to '{}'", db_table); writer.saveAsTable(db_table); break; @@ -63,17 +63,22 @@ public class GraphSupport { public static Dataset readGraph( SparkSession spark, String graph, Class clazz, GraphFormat format) { + log.info("reading graph {}, format {}, class {}", graph, format, clazz); + switch (format) { case JSON: + String path = graph + "/" + clazz.getSimpleName().toLowerCase(); + log.info("reading path {}", path); return spark .read() - .textFile(graph) + .textFile(path) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)) .filter((FilterFunction) value -> Objects.nonNull(ModelSupport.idFn().apply(value))); case HIVE: String table = ModelSupport.tableIdentifier(graph, clazz); + log.info("reading table {}", table); return spark.read().table(table).as(Encoders.bean(clazz)); default: throw new IllegalStateException(String.format("format not managed: '%s'", format)); diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java index ee85065c3..a8ec39e3a 100644 --- a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java @@ -7,6 +7,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,15 +37,24 @@ public class ResetHiveDbApplication { .orElseThrow(() -> new IllegalArgumentException("missing DB name")); log.info("dbName: {}", dbName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); runWithSparkHiveSession( conf, isSparkSessionManaged, spark -> { - spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName)); - spark.sql(String.format("CREATE DATABASE %s", dbName)); + runSQL(spark, String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName)); + runSQL(spark, String.format("CREATE DATABASE %s", dbName)); }); } + protected static void runSQL(SparkSession spark, String sql) { + log.info("running SQL '{}'", sql); + spark.sqlContext().sql(sql); + } + } diff --git a/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json b/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json index afc531148..629864ec3 100644 --- a/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json +++ b/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json @@ -5,6 +5,12 @@ "paramDescription": "when true will stop SparkSession after job execution", "paramRequired": false }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, { "paramName": "db", "paramLongName": "dbName",