forked from D-Net/dnet-hadoop
Reusable RunSQLSparkJob for executing SQL in Spark through Oozie Spark Actions
Implements pivots table update oozie workflow
This commit is contained in:
parent
c67467723b
commit
21a14fcd80
|
@ -0,0 +1,75 @@
|
|||
|
||||
package eu.dnetlib.dhp.oozie;
|
||||
|
||||
import com.google.common.io.Resources;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
public class RunSQLSparkJob {
|
||||
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
|
||||
|
||||
private final ArgumentApplicationParser parser;
|
||||
|
||||
public RunSQLSparkJob(ArgumentApplicationParser parser) {
|
||||
this.parser = parser;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Map<String, String> params = new HashMap<>();
|
||||
for (int i = 0; i < args.length - 1; i++) {
|
||||
if (args[i].startsWith("--")) {
|
||||
params.put(args[i].substring(2), args[++i]);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
|
||||
* .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
|
||||
* parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
|
||||
*/
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(params.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
URL url = com.google.common.io.Resources.getResource(params.get("sql"));
|
||||
String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
|
||||
|
||||
String sql = StringSubstitutor.replace(raw_sql, params);
|
||||
log.info("sql: {}", sql);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
|
||||
log.info("executing: {}", statement);
|
||||
long startTime = System.currentTimeMillis();
|
||||
spark.sql(statement).show();
|
||||
log
|
||||
.info(
|
||||
"executed in {}",
|
||||
DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "sql",
|
||||
"paramLongName": "sql",
|
||||
"paramDescription": "sql script to execute",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>/user/hive/warehouse</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,62 @@
|
|||
|
||||
CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS
|
||||
WITH pivots (
|
||||
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||
LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id
|
||||
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||
UNION
|
||||
SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||
)
|
||||
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||
FROM pivots
|
||||
GROUP BY id; /*EOS*/
|
||||
CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS
|
||||
WITH pivots (
|
||||
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||
LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id
|
||||
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||
UNION
|
||||
SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||
)
|
||||
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||
FROM pivots
|
||||
GROUP BY id; /*EOS*/
|
||||
CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS
|
||||
WITH pivots (
|
||||
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||
LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id
|
||||
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||
UNION
|
||||
SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||
)
|
||||
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||
FROM pivots
|
||||
GROUP BY id; /*EOS*/
|
||||
CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS
|
||||
WITH pivots (
|
||||
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||
LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id
|
||||
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||
UNION
|
||||
SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||
)
|
||||
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||
FROM pivots
|
||||
GROUP BY id; /*EOS*/
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/
|
||||
|
||||
DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/
|
||||
|
||||
DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/
|
||||
|
||||
DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
|
||||
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/
|
|
@ -0,0 +1,95 @@
|
|||
<workflow-app name="Update pivot history" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<!-- properties used in SQL -->
|
||||
<property>
|
||||
<name>pivot_history_db</name>
|
||||
<!-- <value>openaire_beta_pivots_test</value> -->
|
||||
<description>Pivot history DB on hive</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>new_graph_db</name>
|
||||
<!--<value>openaire_beta_20231208</value> -->
|
||||
<description>New graph DB on hive</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>new_graph_date</name>
|
||||
<!-- <value>20231208</value> -->
|
||||
<description>Creation date of new graph db</description>
|
||||
</property>
|
||||
|
||||
<!-- RunSQLSparkJob properties -->
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
</property>
|
||||
<!-- General oozie workflow properties -->
|
||||
<property>
|
||||
<name>sparkClusterOpts</name>
|
||||
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||
<description>spark cluster-wide options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkResourceOpts</name>
|
||||
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkApplicationOpts</name>
|
||||
<value>--conf spark.sql.shuffle.partitions=3840</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="UpgradePivotHistory"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="UpgradePivotHistory">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Upgrade Pivot History</name>
|
||||
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql</arg>
|
||||
<arg>--pivot_history_db</arg><arg>${pivot_history_db}</arg>
|
||||
<arg>--new_graph_db</arg><arg>${new_graph_db}</arg>
|
||||
<arg>--new_graph_date</arg><arg>${new_graph_date}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
Loading…
Reference in New Issue