From 21a14fcd800944d2a7fca1c70ad77726536f2b97 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 15 Jan 2024 00:08:07 +0100 Subject: [PATCH] Reusable RunSQLSparkJob for executing SQL in Spark through Oozie Spark Actions Implements pivots table update oozie workflow --- .../eu/dnetlib/dhp/oozie/RunSQLSparkJob.java | 75 +++++++++++++++ .../dnetlib/dhp/oozie/run_sql_parameters.json | 20 ++++ .../pivothistory/oozie_app/config-default.xml | 26 +++++ .../oa/dedup/pivothistory/oozie_app/sql.sql | 62 ++++++++++++ .../dedup/pivothistory/oozie_app/workflow.xml | 95 +++++++++++++++++++ 5 files changed, 278 insertions(+) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java create mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java new file mode 100644 index 000000000..ef296bfc9 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java @@ -0,0 +1,75 @@ + +package eu.dnetlib.dhp.oozie; + +import com.google.common.io.Resources; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class RunSQLSparkJob { + private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); + + private final ArgumentApplicationParser parser; + + public RunSQLSparkJob(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + + Map params = new HashMap<>(); + for (int i = 0; i < args.length - 1; i++) { + if (args[i].startsWith("--")) { + params.put(args[i].substring(2), args[++i]); + } + } + + /* + * String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class + * .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser + * parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); + */ + + Boolean isSparkSessionManaged = Optional + .ofNullable(params.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + URL url = com.google.common.io.Resources.getResource(params.get("sql")); + String raw_sql = Resources.toString(url, StandardCharsets.UTF_8); + + String sql = StringSubstitutor.replace(raw_sql, params); + log.info("sql: {}", sql); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", params.get("hiveMetastoreUris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) { + log.info("executing: {}", statement); + long startTime = System.currentTimeMillis(); + spark.sql(statement).show(); + log + .info( + "executed in {}", + DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S")); + } + }); + } + +} diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json new file mode 100644 index 000000000..355f38e2f --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "sql", + "paramLongName": "sql", + "paramDescription": "sql script to execute", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml new file mode 100644 index 000000000..17bb70647 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql new file mode 100644 index 000000000..86dbda1c9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql @@ -0,0 +1,62 @@ + +CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ + + +DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/ diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml new file mode 100644 index 000000000..d562f088e --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml @@ -0,0 +1,95 @@ + + + + + pivot_history_db + + Pivot history DB on hive + + + new_graph_db + + New graph DB on hive + + + new_graph_date + + Creation date of new graph db + + + + + hiveMetastoreUris + hive server metastore URIs + + + sparkSqlWarehouseDir + + + + sparkClusterOpts + --conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=3840 + spark resource options + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Upgrade Pivot History + eu.dnetlib.dhp.oozie.RunSQLSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --hiveMetastoreUris${hiveMetastoreUris} + --sqleu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql + --pivot_history_db${pivot_history_db} + --new_graph_db${new_graph_db} + --new_graph_date${new_graph_date} + + + + + + + \ No newline at end of file