[stats wf] indicators across stats dbs & updates in the org ids #248

dimitris.pierrakos wants to merge 1742 commits from beta into beta2master_sept_2022
5 changed files with 278 additions and 0 deletions
Showing only changes of commit bd187ec6e7 - Show all commits

View File

@ -0,0 +1,75 @@
package eu.dnetlib.dhp.oozie;
import com.google.common.io.Resources;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class RunSQLSparkJob {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public RunSQLSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
public static void main(String[] args) throws Exception {
Map<String, String> params = new HashMap<>();
for (int i = 0; i < args.length - 1; i++) {
if (args[i].startsWith("--")) {
params.put(args[i].substring(2), args[++i]);
* String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
* .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
* parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
URL url = com.google.common.io.Resources.getResource(params.get("sql"));
String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
String sql = StringSubstitutor.replace(raw_sql, params);
log.info("sql: {}", sql);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
spark -> {
for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
log.info("executing: {}", statement);
long startTime = System.currentTimeMillis();
"executed in {}",
DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));

View File

@ -0,0 +1,20 @@
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
"paramName": "sql",
"paramLongName": "sql",
"paramDescription": "sql script to execute",
"paramRequired": true

View File

@ -0,0 +1,26 @@

View File

@ -0,0 +1,62 @@
CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/

View File

@ -0,0 +1,95 @@
<workflow-app name="Update pivot history" xmlns="uri:oozie:workflow:0.5">
<!-- properties used in SQL -->
<!-- <value>openaire_beta_pivots_test</value> -->
<description>Pivot history DB on hive</description>
<!--<value>openaire_beta_20231208</value> -->
<description>New graph DB on hive</description>
<!-- <value>20231208</value> -->
<description>Creation date of new graph db</description>
<!-- RunSQLSparkJob properties -->
<description>hive server metastore URIs</description>
<!-- General oozie workflow properties -->
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
<value>--conf spark.sql.shuffle.partitions=3840</value>
<description>spark resource options</description>
<start to="UpgradePivotHistory"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
<action name="UpgradePivotHistory">
<spark xmlns="uri:oozie:spark-action:0.2">
<name>Upgrade Pivot History</name>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
<ok to="End"/>
<error to="Kill"/>
<end name="End"/>