forked from D-Net/dnet-hadoop
More progress on SQL statements and parameters
This commit is contained in:
parent
98ba2d0282
commit
66c7ddfc5e
|
@ -27,17 +27,26 @@ public abstract class ConnectDB {
|
|||
private static String dbURL;
|
||||
private static String dbUsername;
|
||||
private static String dbPassword;
|
||||
private static String defaultDBSchema;
|
||||
private static String usageStatsDBSchema;
|
||||
private static String statsDBSchema;
|
||||
private final static Logger log = Logger.getLogger(ConnectDB.class);
|
||||
|
||||
static void init(Properties properties) throws ClassNotFoundException {
|
||||
|
||||
dbURL = properties.getProperty("Stats_db_Url");
|
||||
dbUsername = properties.getProperty("Stats_db_User");
|
||||
dbPassword = properties.getProperty("Stats_db_Pass");
|
||||
defaultDBSchema = properties.getProperty("Stats_db_Schema");
|
||||
// To be initialized by a property file
|
||||
|
||||
Class.forName(properties.getProperty("Stats_db_Driver"));
|
||||
// dbURL = properties.getProperty("Stats_db_Url");
|
||||
// dbUsername = properties.getProperty("Stats_db_User");
|
||||
// dbPassword = properties.getProperty("Stats_db_Pass");
|
||||
// defaultDBSchema = properties.getProperty("Stats_db_Schema");
|
||||
//
|
||||
// Class.forName(properties.getProperty("Stats_db_Driver"));
|
||||
|
||||
dbURL = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1";
|
||||
usageStatsDBSchema = "usagestats";
|
||||
statsDBSchema = "stats_wf_db_galexiou_oozie_beta";
|
||||
|
||||
Class.forName("org.apache.hive.jdbc.HiveDriver");
|
||||
}
|
||||
|
||||
public static Connection getConnection() throws SQLException {
|
||||
|
@ -50,16 +59,31 @@ public abstract class ConnectDB {
|
|||
}
|
||||
}
|
||||
|
||||
private static Connection connect() throws SQLException {
|
||||
Connection connection = DriverManager.getConnection(dbURL, dbUsername, dbPassword);
|
||||
Statement stmt = connection.createStatement();
|
||||
String sqlSetSearchPath = "SET search_path TO " + defaultDBSchema + ";";
|
||||
stmt.executeUpdate(sqlSetSearchPath);
|
||||
stmt.close();
|
||||
public static String getUsageStatsDBSchema() {
|
||||
return ConnectDB.usageStatsDBSchema;
|
||||
}
|
||||
|
||||
public static String getStatsDBSchema() {
|
||||
return ConnectDB.statsDBSchema;
|
||||
}
|
||||
|
||||
private static Connection connect() throws SQLException {
|
||||
|
||||
Connection connection = DriverManager.getConnection(dbURL);
|
||||
Statement stmt = connection.createStatement();
|
||||
log.debug("Opened database successfully");
|
||||
|
||||
return connection;
|
||||
|
||||
// Connection connection = DriverManager.getConnection(dbURL, dbUsername, dbPassword);
|
||||
// Statement stmt = connection.createStatement();
|
||||
// String sqlSetSearchPath = "SET search_path TO " + defaultDBSchema + ";";
|
||||
// stmt.executeUpdate(sqlSetSearchPath);
|
||||
// stmt.close();
|
||||
//
|
||||
// log.debug("Opened database successfully");
|
||||
//
|
||||
// return connection;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -69,7 +69,8 @@ public class PiwikDownloadLogs {
|
|||
|
||||
ResultSet rs = statement
|
||||
.executeQuery(
|
||||
"SELECT distinct piwik_id from public.datasource where piwik_id is not null order by piwik_id;");
|
||||
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
|
||||
+ ".datasource where piwik_id is not null order by piwik_id");
|
||||
while (rs.next()) {
|
||||
int siteId = rs.getInt(1);
|
||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
||||
|
@ -85,7 +86,8 @@ public class PiwikDownloadLogs {
|
|||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
PreparedStatement st = ConnectDB.DB_CONNECTION
|
||||
.prepareStatement(
|
||||
"SELECT max(timestamp) FROM piwiklog WHERE source=? HAVING max(timestamp) is not null;");
|
||||
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog WHERE source=? HAVING max(timestamp) is not null");
|
||||
st.setInt(1, siteId);
|
||||
|
||||
ResultSet rs_date = st.executeQuery();
|
||||
|
|
|
@ -70,29 +70,33 @@ public class PiwikStatsDB {
|
|||
private void createTables() throws Exception {
|
||||
try {
|
||||
stmt = ConnectDB.getConnection().createStatement();
|
||||
String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
||||
String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||
+ " ON INSERT TO piwiklog "
|
||||
+ " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
|
||||
+ "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
|
||||
+ "FROM piwiklog "
|
||||
+ "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||
String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + "piwiklog "
|
||||
+ "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING";
|
||||
String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + "piwiklog(source, id_visit, action, entity_id, \"timestamp\")";
|
||||
stmt.executeUpdate(sqlCreateTablePiwikLog);
|
||||
stmt.executeUpdate(sqlcreateRulePiwikLog);
|
||||
stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
|
||||
// stmt.executeUpdate(sqlcreateRulePiwikLog); --> We need to find a way to eliminate duplicates
|
||||
// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes
|
||||
|
||||
String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
|
||||
String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ "process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||
+ " ON INSERT TO process_portal_log "
|
||||
+ " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log "
|
||||
+ " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
|
||||
+ "process_portal_log.\"timestamp\" "
|
||||
+ "FROM process_portal_log "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log "
|
||||
+ "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||
String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
|
||||
String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + "process_portal_log(source, id_visit, \"timestamp\");";
|
||||
stmt.executeUpdate(sqlCreateTablePortalLog);
|
||||
stmt.executeUpdate(sqlcreateRulePortalLog);
|
||||
stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
|
||||
// stmt.executeUpdate(sqlcreateRulePortalLog); --> We need to find a way to eliminate duplicates
|
||||
// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes
|
||||
|
||||
stmt.close();
|
||||
ConnectDB.getConnection().close();
|
||||
|
@ -107,27 +111,28 @@ public class PiwikStatsDB {
|
|||
private void createTmpTables() throws Exception {
|
||||
try {
|
||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
||||
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ "piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||
+ " ON INSERT TO piwiklogtmp "
|
||||
+ " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "piwiklogtmp "
|
||||
+ " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
|
||||
+ "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
|
||||
+ "FROM piwiklogtmp "
|
||||
+ "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||
stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
|
||||
stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
|
||||
// stmt.executeUpdate(sqlcreateTmpRulePiwikLog); --> We need to find a way to eliminate duplicates
|
||||
|
||||
// String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
|
||||
// stmt.executeUpdate(sqlCopyPublicPiwiklog);
|
||||
String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
|
||||
String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||
+ " ON INSERT TO process_portal_log_tmp "
|
||||
+ " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
|
||||
+ "process_portal_log_tmp.\"timestamp\" "
|
||||
+ "FROM process_portal_log_tmp "
|
||||
+ "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||
stmt.executeUpdate(sqlCreateTmpTablePortalLog);
|
||||
stmt.executeUpdate(sqlcreateTmpRulePortalLog);
|
||||
// stmt.executeUpdate(sqlCreateTmpTablePortalLog); --> We need to find a way to eliminate duplicates
|
||||
// stmt.executeUpdate(sqlcreateTmpRulePortalLog); --> We probably don't need indexes
|
||||
|
||||
stmt.close();
|
||||
log.info("Usage Tmp Tables Created");
|
||||
|
|
|
@ -18,23 +18,31 @@ public class UsageStatsExporter {
|
|||
public void export() throws Exception {
|
||||
|
||||
// read workdflow parameters
|
||||
String matomoAuthToken = properties.getProperty("matomo_AuthToken");
|
||||
String matomoBaseURL = properties.getProperty("matomo_BaseUrl");
|
||||
String repoLogPath = properties.getProperty("repo_LogPath");
|
||||
String portalLogPath = properties.getProperty("portal_LogPath");
|
||||
String portalMatomoID = properties.getProperty("portal_MatomoID");
|
||||
String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl");
|
||||
// String matomoAuthToken = properties.getProperty("matomo_AuthToken");
|
||||
// String matomoBaseURL = properties.getProperty("matomo_BaseUrl");
|
||||
// String repoLogPath = properties.getProperty("repo_LogPath");
|
||||
// String portalLogPath = properties.getProperty("portal_LogPath");
|
||||
// String portalMatomoID = properties.getProperty("portal_MatomoID");
|
||||
// String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl");
|
||||
|
||||
String matomoAuthToken = "703bd17d845acdaf795e01bb1e0895b9";
|
||||
String matomoBaseURL = "analytics.openaire.eu";
|
||||
String repoLogPath = "/user/spyros/logs/usage_stats_logs/Repologs";
|
||||
String portalLogPath = "/user/spyros/logs/usage_stats_logs/Portallogs/";
|
||||
String portalMatomoID = "109";
|
||||
String irusUKBaseURL = "https://irus.jisc.ac.uk/api/sushilite/v1_7/";
|
||||
|
||||
// connect to DB
|
||||
ConnectDB.init(properties);
|
||||
|
||||
// Create DB tables - they are also needed to download the statistics too
|
||||
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath);
|
||||
|
||||
// Download the statistics
|
||||
PiwikDownloadLogs piwd = new PiwikDownloadLogs(matomoBaseURL, matomoAuthToken);
|
||||
piwd.GetOpenAIRELogs(repoLogPath, portalLogPath, portalMatomoID);
|
||||
|
||||
/*
|
||||
* Create DB tables, insert/update statistics
|
||||
*/
|
||||
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath);
|
||||
// Create DB tables, insert/update statistics
|
||||
piwikstatsdb.setCounterRobotsURL(properties.getProperty("COUNTER_robots_Url"));
|
||||
piwikstatsdb.processLogs();
|
||||
log.info("process logs done");
|
||||
|
|
Loading…
Reference in New Issue