Introducing impala connections and using the correct connection string

This commit is contained in:
Spyros Zoupanos 2020-09-27 13:19:45 +03:00
parent 73656f7f31
commit dc6114a24e
8 changed files with 177 additions and 129 deletions

View File

@ -22,9 +22,11 @@ import org.apache.log4j.Logger;
public abstract class ConnectDB {
public static Connection DB_CONNECTION;
public static Connection DB_HIVE_CONNECTION;
public static Connection DB_IMPALA_CONNECTION;
private static String dbURL;
private static String dbHiveUrl;
private static String dbImpalaUrl;
private static String dbUsername;
private static String dbPassword;
private static String usageStatsDBSchema;
@ -42,20 +44,37 @@ public abstract class ConnectDB {
//
// Class.forName(properties.getProperty("Stats_db_Driver"));
dbURL = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1";
dbHiveUrl = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1";
// dbImpalaUrl = "jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;UseNativeQuery=1";
// dbImpalaUrl = "jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;UseNativeQuery=1";
// dbImpalaUrl = "jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/openaire_prod_stats_20200731;UID=spyros;PWD=RU78N9sqQndnH3SQ;UseNativeQuery=1";
// dbImpalaUrl = "jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/openaire_prod_stats_20200731;UID=spyros;PWD=RU78N9sqQndnH3SQ;UseNativeQuery=1";
// dbImpalaUrl = "jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/openaire_prod_stats_shadow_20200821;user=spyros;PWD=RU78N9sqQndnH3SQ";
// dbImpalaUrl = "jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:28000/;transportMode=http";
dbImpalaUrl = "jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl";
usageStatsDBSchema = "usagestats_20200913";
statsDBSchema = "openaire_prod_stats_shadow_20200821";
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
public static Connection getConnection() throws SQLException {
if (DB_CONNECTION != null && !DB_CONNECTION.isClosed()) {
return DB_CONNECTION;
public static Connection getHiveConnection() throws SQLException {
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
return DB_HIVE_CONNECTION;
} else {
DB_CONNECTION = connect();
DB_HIVE_CONNECTION = connectHive();
return DB_CONNECTION;
return DB_HIVE_CONNECTION;
}
}
public static Connection getImpalaConnection() throws SQLException {
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
return DB_IMPALA_CONNECTION;
} else {
DB_IMPALA_CONNECTION = connectImpala();
return DB_IMPALA_CONNECTION;
}
}
@ -67,23 +86,22 @@ public abstract class ConnectDB {
return ConnectDB.statsDBSchema;
}
private static Connection connect() throws SQLException {
private static Connection connectHive() throws SQLException {
Connection connection = DriverManager.getConnection(dbURL);
Connection connection = DriverManager.getConnection(dbHiveUrl);
Statement stmt = connection.createStatement();
log.debug("Opened database successfully");
return connection;
}
// Connection connection = DriverManager.getConnection(dbURL, dbUsername, dbPassword);
// Statement stmt = connection.createStatement();
// String sqlSetSearchPath = "SET search_path TO " + defaultDBSchema + ";";
// stmt.executeUpdate(sqlSetSearchPath);
// stmt.close();
//
// log.debug("Opened database successfully");
//
// return connection;
private static Connection connectImpala() throws SQLException {
Connection connection = DriverManager.getConnection(dbImpalaUrl);
Statement stmt = connection.createStatement();
log.debug("Opened database successfully");
return connection;
}
}

View File

@ -54,7 +54,7 @@ public class IrusStats {
try {
System.out.println("====> Creating sushilog");
Statement stmt = ConnectDB.getConnection().createStatement();
Statement stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog(source STRING, " +
"repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, " +
@ -75,7 +75,7 @@ public class IrusStats {
// stmt.executeUpdate(createSushiIndex);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("Sushi Tables Created");
} catch (Exception e) {
log.error("Failed to create tables: " + e);
@ -112,8 +112,8 @@ public class IrusStats {
// }
public void processIrusStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
@ -181,7 +181,7 @@ public class IrusStats {
stmt.executeUpdate(insert_sushilogtmp);
System.out.println("====> Inserted to sushilogtmp table");
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
// // !!!!!!!!!!!!!!!!!!!!!
// // To do the following
@ -252,7 +252,7 @@ public class IrusStats {
System.out.println("====> (processIrusIRReport) Getting report(s) with opendoar: " + opendoar);
ConnectDB.getConnection().setAutoCommit(false);
ConnectDB.getHiveConnection().setAutoCommit(false);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
@ -266,7 +266,7 @@ public class IrusStats {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getConnection()
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, "opendoar____::" + opendoar);
@ -279,7 +279,7 @@ public class IrusStats {
}
rs_date.close();
PreparedStatement preparedStatement = ConnectDB
.getConnection()
.getHiveConnection()
.prepareStatement(
"INSERT INTO sushilogtmp (source, repository, rid, date, metric_type, count) VALUES (?,?,?,?,?,?)");
int batch_size = 0;
@ -325,7 +325,7 @@ public class IrusStats {
}
preparedStatement.executeBatch();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
System.out.println("====> (processIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar);
}

View File

@ -44,7 +44,7 @@ public class LaReferenciaDownloadLogs {
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getConnection().createStatement();
Statement stmt = ConnectDB.getHiveConnection().createStatement();
System.out.println("====> Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
@ -66,7 +66,7 @@ public class LaReferenciaDownloadLogs {
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("Lareferencia Tables Created");
} catch (Exception e) {
@ -158,7 +158,7 @@ public class LaReferenciaDownloadLogs {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getConnection()
.getHiveConnection()
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() +
".lareferencialog WHERE matomoid=? GROUP BY timestamp HAVING max(timestamp) is not null");

View File

@ -45,7 +45,7 @@ public class LaReferenciaStats {
*/
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getConnection().createStatement();
Statement stmt = ConnectDB.getHiveConnection().createStatement();
System.out.println("====> Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
@ -67,7 +67,7 @@ public class LaReferenciaStats {
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("Lareferencia Tables Created");
} catch (Exception e) {
@ -133,8 +133,8 @@ public class LaReferenciaStats {
}
public void processlaReferenciaLog() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
@ -214,8 +214,8 @@ public class LaReferenciaStats {
public void removeDoubleClicks() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Cleaning download double clicks");
// clean download double clicks
@ -233,7 +233,7 @@ public class LaReferenciaStats {
stmt.close();
System.out.println("====> Cleaned download double clicks");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
System.out.println("====> Cleaning action double clicks");
// clean view double clicks
sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp WHERE EXISTS (" +
@ -254,8 +254,8 @@ public class LaReferenciaStats {
public void viewsStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Creating la_result_views_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
@ -290,13 +290,13 @@ public class LaReferenciaStats {
System.out.println("====> Created la_views_stats_tmp table");
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
private void downloadsStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Creating la_result_downloads_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
@ -331,13 +331,13 @@ public class LaReferenciaStats {
System.out.println("====> Created la_downloads_stats_tmp table");
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
private void updateProdTables() throws SQLException, Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Updating lareferencialog");
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
@ -361,7 +361,7 @@ public class LaReferenciaStats {
// stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}

View File

@ -82,7 +82,7 @@ public class PiwikDownloadLogs {
public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
Statement statement = ConnectDB.getConnection().createStatement();
Statement statement = ConnectDB.getHiveConnection().createStatement();
ResultSet rs = statement
.executeQuery(
@ -101,7 +101,7 @@ public class PiwikDownloadLogs {
end.add(Calendar.DAY_OF_MONTH, -1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB.DB_CONNECTION
PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
+ ".piwiklog WHERE source=? GROUP BY timestamp HAVING max(timestamp) is not null");

View File

@ -72,7 +72,7 @@ public class PiwikStatsDB {
private void createDatabase() throws Exception {
try {
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema();
stmt.executeUpdate(createDatabase);
@ -84,7 +84,7 @@ public class PiwikStatsDB {
private void createTables() throws Exception {
try {
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
// Create Piwiklog table - This table should exist
String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS "
@ -111,7 +111,7 @@ public class PiwikStatsDB {
//////////////////////////////////////////////////
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("Usage Tables Created");
} catch (Exception e) {
@ -122,7 +122,7 @@ public class PiwikStatsDB {
private void createTmpTables() throws Exception {
try {
Statement stmt = ConnectDB.getConnection().createStatement();
Statement stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getUsageStatsDBSchema()
+ ".piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
@ -225,8 +225,8 @@ public class PiwikStatsDB {
public void processRepositoryLog() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
@ -299,8 +299,8 @@ public class PiwikStatsDB {
}
public void removeDoubleClicks() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Cleaning download double clicks");
// clean download double clicks
@ -337,8 +337,8 @@ public class PiwikStatsDB {
}
public void viewsStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Dropping result_views_monthly_tmp table");
String drop_result_views_monthly_tmp = "DROP TABLE IF EXISTS " +
@ -428,12 +428,12 @@ public class PiwikStatsDB {
System.out.println("====> Created pageviews_stats table");
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
private void downloadsStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Dropping result_downloads_monthly_tmp view");
String drop_result_views_monthly_tmp = "DROP VIEW IF EXISTS " +
@ -493,12 +493,12 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
public void finalizeStats() throws Exception {
stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
Calendar startCalendar = Calendar.getInstance();
startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
@ -582,14 +582,14 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().commit();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().commit();
ConnectDB.getHiveConnection().close();
}
// Create repository Views statistics
private void repositoryViewsStats() throws Exception {
stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
// String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
@ -645,14 +645,14 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().commit();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().commit();
ConnectDB.getHiveConnection().close();
}
// Create repository downloads statistics
private void repositoryDownloadsStats() throws Exception {
stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
// String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
@ -713,13 +713,13 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().commit();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().commit();
ConnectDB.getHiveConnection().close();
}
public void processPortalLog() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
@ -808,7 +808,7 @@ public class PiwikStatsDB {
}
public void portalStats() throws SQLException {
Connection con = ConnectDB.getConnection();
Connection con = ConnectDB.getHiveConnection();
Statement stmt = con.createStatement();
con.setAutoCommit(false);
@ -836,7 +836,7 @@ public class PiwikStatsDB {
// WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id != '' AND process_portal_log_tmp.entity_id
// IN (SELECT roid.oid FROM openaire_prod_stats_20200821.result_oids roid WHERE roid.oid IS NOT NULL AND
// roid.oid != '');
System.out.println("====> PortalStats - Step 1");
String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'oaItem', `timestamp`, referrer_name, agent "
@ -888,10 +888,10 @@ public class PiwikStatsDB {
}
private void cleanOAI() throws Exception {
ConnectDB.getConnection().setAutoCommit(false);
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Cleaning oai - Step 1");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
String sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/'," +
"'oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%'";
@ -899,7 +899,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 2");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/'," +
"'oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%'";
@ -907,7 +907,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 3");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/'," +
"'oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%'";
@ -915,7 +915,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 4");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/'," +
"'oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%'";
@ -923,7 +923,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 5");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/'," +
"'oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%'";
@ -931,7 +931,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 6");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/'," +
"'oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%'";
@ -939,7 +939,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 7");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/'," +
"'oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%'";
@ -947,7 +947,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 8");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/'," +
"'oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%'";
@ -955,7 +955,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 9");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/'," +
"'oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%'";
@ -963,7 +963,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 10");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/'," +
"'oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%'";
@ -971,7 +971,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 11");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/'," +
"'oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%'";
@ -979,7 +979,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 12");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/'," +
"'oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%'";
@ -987,7 +987,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 13");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/'," +
"'oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%'";
@ -995,7 +995,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 14");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/'," +
"'oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%'";
@ -1003,7 +1003,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 15");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/'," +
"'oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%'";
@ -1011,7 +1011,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 16");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/'," +
"'oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%'";
@ -1019,7 +1019,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 17");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/'," +
"'oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%'";
@ -1027,7 +1027,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 18");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/'," +
"'oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%'";
@ -1035,7 +1035,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 19");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/'," +
"'oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%'";
@ -1043,7 +1043,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 20");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/'," +
"'oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%'";
@ -1051,7 +1051,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 21");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/'," +
"'oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%'";
@ -1059,7 +1059,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 22");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/'," +
"'oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%'";
@ -1067,7 +1067,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 23");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/'," +
"'oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%'";
@ -1075,7 +1075,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 24");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/'," +
"'oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%'";
@ -1083,7 +1083,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 25");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/'," +
"'oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%'";
@ -1091,7 +1091,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 26");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/'," +
"'oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%'";
@ -1099,7 +1099,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 27");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/'," +
"'oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%'";
@ -1107,7 +1107,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 28");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/'," +
"'oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%'";
@ -1115,7 +1115,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Step 29");
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/'," +
"'oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%'";
@ -1123,7 +1123,7 @@ public class PiwikStatsDB {
stmt.close();
System.out.println("====> Cleaning oai - Done, closing connection");
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
private String processPortalURL(String url) {
@ -1166,8 +1166,8 @@ public class PiwikStatsDB {
}
private void updateProdTables() throws SQLException {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Inserting data to piwiklog");
String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog " +
@ -1206,7 +1206,7 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("updateProdTables done");
}
@ -1268,6 +1268,6 @@ public class PiwikStatsDB {
}
private Connection getConnection() throws SQLException {
return ConnectDB.getConnection();
return ConnectDB.getHiveConnection();
}
}

View File

@ -43,7 +43,7 @@ public class SarcStats {
private void createTables() throws Exception {
try {
stmt = ConnectDB.getConnection().createStatement();
stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmt.executeUpdate(sqlCreateTableSushiLog);
@ -60,7 +60,7 @@ public class SarcStats {
stmt.executeUpdate(createSushiIndex);
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
log.info("Sushi Tables Created");
} catch (Exception e) {
log.error("Failed to create tables: " + e);
@ -70,8 +70,8 @@ public class SarcStats {
public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray,
String url, String issn) throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
@ -168,7 +168,7 @@ public class SarcStats {
stmt.executeUpdate(insert_sarc_sushilogtmp);
System.out.println("====> Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
////////////////////////////////////
// Add everything to the sushilog table!!!!
@ -177,15 +177,15 @@ public class SarcStats {
public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
System.out.println("====> Dropping sarc_sushilogtmp table");
String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp";
stmt.executeUpdate(drop_sarc_sushilogtmp);
System.out.println("====> Dropped sarc_sushilogtmp table");
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
List<String[]> issnAndUrls = new ArrayList<String[]>();
issnAndUrls.add(new String[] {
@ -248,8 +248,8 @@ public class SarcStats {
}
public void finalizeSarcStats() throws Exception {
stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
// Insert into downloads_stats
System.out.println("====> Inserting into downloads_stats");
@ -274,13 +274,13 @@ public class SarcStats {
System.out.println("====> Inserted into sushilog");
stmt.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
String url, String issn) throws Exception {
log.info("Processing SARC! issn: " + issn + " with url: " + url);
ConnectDB.getConnection().setAutoCommit(false);
ConnectDB.getHiveConnection().setAutoCommit(false);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
@ -294,7 +294,7 @@ public class SarcStats {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getConnection()
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, issn);
@ -308,7 +308,7 @@ public class SarcStats {
rs_date.close();
PreparedStatement preparedStatement = ConnectDB
.getConnection()
.getHiveConnection()
.prepareStatement(
"INSERT INTO sushilog (source, repository, rid, date, metric_type, count) VALUES (?,?,?,?,?,?)");
int batch_size = 0;
@ -425,7 +425,7 @@ public class SarcStats {
dfs.close();
ConnectDB.getConnection().close();
ConnectDB.getHiveConnection().close();
}
private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception {

View File

@ -1,6 +1,8 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
@ -30,6 +32,31 @@ public class UsageStatsExporter {
this.properties = properties;
}
public void runImpalaQuery() throws Exception {
Statement stmt = ConnectDB.getImpalaConnection().createStatement();
ConnectDB.getImpalaConnection().setAutoCommit(false);
System.out.println("====> Executing Impala query");
Statement statement = ConnectDB.getImpalaConnection().createStatement();
ResultSet rs = statement
.executeQuery(
// "CREATE TABLE usagestats_20200913.spyros_tmp5 AS\n" +
// "SELECT s.source, d.id AS repository_id, ro.id as result_id, s.count, '0' \n" +
// "FROM usagestats_20200913.sarc_sushilogtmp2 s, \n" +
// "openaire_prod_stats_shadow_20200821.datasource_oids d, \n" +
// "openaire_prod_stats_shadow_20200821.datasource_results dr, \n" +
// "openaire_prod_stats_shadow_20200821.result_pids ro \n" +
// "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id \n" +
// "AND s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS' ");
"CREATE TABLE usagestats_20200913.spyros_tmp6 AS\n" +
"SELECT * \n" +
"FROM usagestats_20200913.sarc_sushilogtmp2");
stmt.close();
}
// public void export() throws Exception {
public void export() throws Exception {
@ -37,6 +64,9 @@ public class UsageStatsExporter {
System.out.println("====> Initialising DB properties");
ConnectDB.init(properties);
runImpalaQuery();
System.exit(0);
// Create DB tables - they are also needed to download the statistics too
System.out.println("====> Creating database and tables");
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath);