forked from D-Net/dnet-hadoop
Changes for better connection management. Now we can keep the connection alive all the time or to create a new connection every time that we request the connection object
This commit is contained in:
parent
8b591937e1
commit
6ff5436991
|
@ -31,6 +31,8 @@ public abstract class ConnectDB {
|
||||||
private static String statsDBSchema;
|
private static String statsDBSchema;
|
||||||
private final static Logger log = Logger.getLogger(ConnectDB.class);
|
private final static Logger log = Logger.getLogger(ConnectDB.class);
|
||||||
|
|
||||||
|
private final static boolean createAlwaysNewConnection = true;
|
||||||
|
|
||||||
static void init() throws ClassNotFoundException {
|
static void init() throws ClassNotFoundException {
|
||||||
|
|
||||||
dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
|
dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
|
||||||
|
@ -42,21 +44,37 @@ public abstract class ConnectDB {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Connection getHiveConnection() throws SQLException {
|
public static Connection getHiveConnection() throws SQLException {
|
||||||
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
|
if (!createAlwaysNewConnection) {
|
||||||
return DB_HIVE_CONNECTION;
|
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
|
||||||
} else {
|
return DB_HIVE_CONNECTION;
|
||||||
DB_HIVE_CONNECTION = connectHive();
|
} else {
|
||||||
|
DB_HIVE_CONNECTION = connectHive();
|
||||||
|
|
||||||
|
return DB_HIVE_CONNECTION;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed())
|
||||||
|
DB_HIVE_CONNECTION.close();
|
||||||
|
|
||||||
|
DB_HIVE_CONNECTION = connectHive();
|
||||||
return DB_HIVE_CONNECTION;
|
return DB_HIVE_CONNECTION;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Connection getImpalaConnection() throws SQLException {
|
public static Connection getImpalaConnection() throws SQLException {
|
||||||
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
|
if (!createAlwaysNewConnection) {
|
||||||
return DB_IMPALA_CONNECTION;
|
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
|
||||||
} else {
|
return DB_IMPALA_CONNECTION;
|
||||||
DB_IMPALA_CONNECTION = connectImpala();
|
} else {
|
||||||
|
DB_IMPALA_CONNECTION = connectImpala();
|
||||||
|
|
||||||
|
return DB_IMPALA_CONNECTION;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed())
|
||||||
|
DB_IMPALA_CONNECTION.close();
|
||||||
|
|
||||||
|
DB_IMPALA_CONNECTION = connectImpala();
|
||||||
return DB_IMPALA_CONNECTION;
|
return DB_IMPALA_CONNECTION;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,6 @@ public class IrusStats {
|
||||||
|
|
||||||
public void processIrusStats() throws Exception {
|
public void processIrusStats() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Adding JSON Serde jar");
|
logger.info("Adding JSON Serde jar");
|
||||||
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
||||||
|
@ -289,8 +288,6 @@ public class IrusStats {
|
||||||
|
|
||||||
logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar);
|
logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar);
|
||||||
|
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
||||||
|
|
||||||
// Setting the starting period
|
// Setting the starting period
|
||||||
|
|
|
@ -134,7 +134,6 @@ public class LaReferenciaStats {
|
||||||
|
|
||||||
public void processlaReferenciaLog() throws Exception {
|
public void processlaReferenciaLog() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Adding JSON Serde jar");
|
logger.info("Adding JSON Serde jar");
|
||||||
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
||||||
|
@ -215,7 +214,6 @@ public class LaReferenciaStats {
|
||||||
public void removeDoubleClicks() throws Exception {
|
public void removeDoubleClicks() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Cleaning download double clicks");
|
logger.info("Cleaning download double clicks");
|
||||||
// clean download double clicks
|
// clean download double clicks
|
||||||
|
@ -255,7 +253,6 @@ public class LaReferenciaStats {
|
||||||
public void viewsStats() throws Exception {
|
public void viewsStats() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Creating la_result_views_monthly_tmp view");
|
logger.info("Creating la_result_views_monthly_tmp view");
|
||||||
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
|
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
|
||||||
|
@ -296,7 +293,6 @@ public class LaReferenciaStats {
|
||||||
private void downloadsStats() throws Exception {
|
private void downloadsStats() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Creating la_result_downloads_monthly_tmp view");
|
logger.info("Creating la_result_downloads_monthly_tmp view");
|
||||||
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
|
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
|
||||||
|
@ -337,7 +333,6 @@ public class LaReferenciaStats {
|
||||||
private void updateProdTables() throws SQLException, Exception {
|
private void updateProdTables() throws SQLException, Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Updating lareferencialog");
|
logger.info("Updating lareferencialog");
|
||||||
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
|
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
|
||||||
|
|
|
@ -202,7 +202,7 @@ public class PiwikDownloadLogs {
|
||||||
piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Downloading from repos with the followins piwikIds: " + piwikIdToVisit);
|
logger.info("Downloading from repos with the following piwikIds: " + piwikIdToVisit);
|
||||||
|
|
||||||
// Setting the starting period
|
// Setting the starting period
|
||||||
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
|
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
|
||||||
|
|
|
@ -228,7 +228,6 @@ public class PiwikStatsDB {
|
||||||
public void processRepositoryLog() throws Exception {
|
public void processRepositoryLog() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Adding JSON Serde jar");
|
logger.info("Adding JSON Serde jar");
|
||||||
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
||||||
|
@ -302,7 +301,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
public void removeDoubleClicks() throws Exception {
|
public void removeDoubleClicks() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Cleaning download double clicks");
|
logger.info("Cleaning download double clicks");
|
||||||
// clean download double clicks
|
// clean download double clicks
|
||||||
|
@ -340,7 +338,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
public void viewsStats() throws Exception {
|
public void viewsStats() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Dropping result_views_monthly_tmp table");
|
logger.info("Dropping result_views_monthly_tmp table");
|
||||||
String drop_result_views_monthly_tmp = "DROP TABLE IF EXISTS " +
|
String drop_result_views_monthly_tmp = "DROP TABLE IF EXISTS " +
|
||||||
|
@ -436,7 +433,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
private void downloadsStats() throws Exception {
|
private void downloadsStats() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Dropping result_downloads_monthly_tmp view");
|
logger.info("Dropping result_downloads_monthly_tmp view");
|
||||||
String drop_result_views_monthly_tmp = "DROP VIEW IF EXISTS " +
|
String drop_result_views_monthly_tmp = "DROP VIEW IF EXISTS " +
|
||||||
|
@ -501,7 +497,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
public void finalizeStats() throws Exception {
|
public void finalizeStats() throws Exception {
|
||||||
stmt = ConnectDB.getHiveConnection().createStatement();
|
stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Dropping full_dates table");
|
logger.info("Dropping full_dates table");
|
||||||
String dropFullDates = "DROP TABLE IF EXISTS " +
|
String dropFullDates = "DROP TABLE IF EXISTS " +
|
||||||
|
@ -570,7 +565,6 @@ public class PiwikStatsDB {
|
||||||
// Create repository Views statistics
|
// Create repository Views statistics
|
||||||
private void repositoryViewsStats() throws Exception {
|
private void repositoryViewsStats() throws Exception {
|
||||||
stmt = ConnectDB.getHiveConnection().createStatement();
|
stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
// String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
// String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
||||||
String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
||||||
|
@ -633,7 +627,6 @@ public class PiwikStatsDB {
|
||||||
// Create repository downloads statistics
|
// Create repository downloads statistics
|
||||||
private void repositoryDownloadsStats() throws Exception {
|
private void repositoryDownloadsStats() throws Exception {
|
||||||
stmt = ConnectDB.getHiveConnection().createStatement();
|
stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
// String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
// String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
||||||
String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
||||||
|
@ -700,7 +693,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
public void processPortalLog() throws Exception {
|
public void processPortalLog() throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Adding JSON Serde jar");
|
logger.info("Adding JSON Serde jar");
|
||||||
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
||||||
|
@ -791,7 +783,6 @@ public class PiwikStatsDB {
|
||||||
public void portalStats() throws SQLException {
|
public void portalStats() throws SQLException {
|
||||||
Connection con = ConnectDB.getHiveConnection();
|
Connection con = ConnectDB.getHiveConnection();
|
||||||
Statement stmt = con.createStatement();
|
Statement stmt = con.createStatement();
|
||||||
con.setAutoCommit(false);
|
|
||||||
|
|
||||||
// Original queries where of the style
|
// Original queries where of the style
|
||||||
//
|
//
|
||||||
|
@ -869,7 +860,6 @@ public class PiwikStatsDB {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanOAI() throws Exception {
|
private void cleanOAI() throws Exception {
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Cleaning oai - Step 1");
|
logger.info("Cleaning oai - Step 1");
|
||||||
stmt = ConnectDB.getHiveConnection().createStatement();
|
stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
|
@ -1148,7 +1138,6 @@ public class PiwikStatsDB {
|
||||||
|
|
||||||
private void updateProdTables() throws SQLException {
|
private void updateProdTables() throws SQLException {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Inserting data to piwiklog");
|
logger.info("Inserting data to piwiklog");
|
||||||
String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog " +
|
String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog " +
|
||||||
|
|
|
@ -88,7 +88,6 @@ public class SarcStats {
|
||||||
|
|
||||||
public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
|
public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Adding JSON Serde jar");
|
logger.info("Adding JSON Serde jar");
|
||||||
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
|
||||||
|
@ -192,7 +191,6 @@ public class SarcStats {
|
||||||
public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
|
public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
logger.info("Creating sushilog table");
|
logger.info("Creating sushilog table");
|
||||||
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||||
|
@ -284,7 +282,6 @@ public class SarcStats {
|
||||||
|
|
||||||
public void finalizeSarcStats() throws Exception {
|
public void finalizeSarcStats() throws Exception {
|
||||||
stmtHive = ConnectDB.getHiveConnection().createStatement();
|
stmtHive = ConnectDB.getHiveConnection().createStatement();
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
|
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
|
||||||
|
|
||||||
logger.info("Creating downloads_stats table");
|
logger.info("Creating downloads_stats table");
|
||||||
|
@ -383,7 +380,6 @@ public class SarcStats {
|
||||||
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
|
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
|
||||||
String url, String issn) throws Exception {
|
String url, String issn) throws Exception {
|
||||||
logger.info("Processing SARC! issn: " + issn + " with url: " + url);
|
logger.info("Processing SARC! issn: " + issn + " with url: " + url);
|
||||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
|
||||||
|
|
||||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
||||||
// Setting the starting period
|
// Setting the starting period
|
||||||
|
|
Loading…
Reference in New Issue