Corrections for Sarc stats. Heavy queries run on Impala.

This commit is contained in:
Spyros Zoupanos 2020-10-10 12:06:31 +03:00
parent d9cc70d334
commit 8826684130
1 changed files with 65 additions and 19 deletions

View File

@ -33,7 +33,8 @@ import org.slf4j.LoggerFactory;
*/
public class SarcStats {
private Statement stmt = null;
private Statement stmtHive = null;
private Statement stmtImpala = null;
private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
@ -44,9 +45,9 @@ public class SarcStats {
private void createTables() throws Exception {
try {
stmt = ConnectDB.getHiveConnection().createStatement();
stmtHive = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmt.executeUpdate(sqlCreateTableSushiLog);
stmtHive.executeUpdate(sqlCreateTableSushiLog);
// String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
@ -56,11 +57,11 @@ public class SarcStats {
+ "sushilog.rid, sushilog.date "
+ "FROM sushilog "
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
stmt.executeUpdate(sqlcreateRuleSushiLog);
stmtHive.executeUpdate(sqlcreateRuleSushiLog);
String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
stmt.executeUpdate(createSushiIndex);
stmtHive.executeUpdate(createSushiIndex);
stmt.close();
stmtHive.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
@ -283,8 +284,9 @@ public class SarcStats {
}
public void finalizeSarcStats() throws Exception {
stmt = ConnectDB.getHiveConnection().createStatement();
stmtHive = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
@ -295,21 +297,65 @@ public class SarcStats {
"`date` string, " +
"`count` bigint, " +
"`openaire` bigint)";
stmt.executeUpdate(createDownloadsStats);
stmtHive.executeUpdate(createDownloadsStats);
logger.info("Created downloads_stats table");
// Insert into downloads_stats
logger.info("Inserting into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) AS date, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp s, " +
logger.info("Dropping sarc_sushilogtmp_impala table");
String drop_sarc_sushilogtmp_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp_impala";
stmtHive.executeUpdate(drop_sarc_sushilogtmp_impala);
logger.info("Dropped sarc_sushilogtmp_impala table");
logger.info("Creating sarc_sushilogtmp_impala, a table readable by impala");
String createSarcSushilogtmpImpala = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala " +
"STORED AS PARQUET AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmtHive.executeUpdate(createSarcSushilogtmpImpala);
logger.info("Created sarc_sushilogtmp_impala");
logger.info("Making sarc_sushilogtmp visible to impala");
String invalidateMetadata = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala;";
stmtImpala.executeUpdate(invalidateMetadata);
logger.info("Dropping downloads_stats_impala table");
String drop_downloads_stats_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".downloads_stats_impala";
stmtHive.executeUpdate(drop_downloads_stats_impala);
logger.info("Dropped downloads_stats_impala table");
logger.info("Making downloads_stats_impala deletion visible to impala");
try {
String invalidateMetadataDownloadsStatsImpala = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala;";
stmtImpala.executeUpdate(invalidateMetadataDownloadsStatsImpala);
} catch (SQLException sqle) {
}
// We run the following query in Impala because it is faster
logger.info("Creating downloads_stats_impala");
String createDownloadsStatsImpala = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala AS " +
"SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " +
"LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_impala s, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
ConnectDB.getStatsDBSchema() + ".datasource_results dr, " +
ConnectDB.getStatsDBSchema() + ".result_pids ro \n" +
ConnectDB.getStatsDBSchema() + ".result_pids ro " +
"WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " +
"s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS'";
stmt.executeUpdate(insertDStats);
stmtImpala.executeUpdate(createDownloadsStatsImpala);
logger.info("Creating downloads_stats_impala");
// Insert into downloads_stats
logger.info("Inserting data from downloads_stats_impala into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats SELECT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_impala";
stmtHive.executeUpdate(insertDStats);
logger.info("Inserted into downloads_stats");
logger.info("Creating sushilog table");
@ -321,17 +367,17 @@ public class SarcStats {
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmt.executeUpdate(createSushilog);
stmtHive.executeUpdate(createSushilog);
logger.info("Created sushilog table");
// Insert into sushilog
logger.info("Inserting into sushilog");
String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmt.executeUpdate(insertSushiLog);
stmtHive.executeUpdate(insertSushiLog);
logger.info("Inserted into sushilog");
stmt.close();
stmtHive.close();
ConnectDB.getHiveConnection().close();
}