More progress on adding queries to the code. Initial database and table creation seems OK. Downloading logs from available piwik_ids

This commit is contained in:
Spyros Zoupanos 2020-09-02 21:02:56 +03:00
parent 637e61bb0f
commit cf7b9c6db3
4 changed files with 86 additions and 83 deletions

View File

@ -43,7 +43,7 @@ public abstract class ConnectDB {
// Class.forName(properties.getProperty("Stats_db_Driver")); // Class.forName(properties.getProperty("Stats_db_Driver"));
dbURL = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1"; dbURL = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1";
usageStatsDBSchema = "usagestats"; usageStatsDBSchema = "usagestats_20200902";
statsDBSchema = "openaire_prod_stats_shadow_20200821"; statsDBSchema = "openaire_prod_stats_shadow_20200821";
Class.forName("org.apache.hive.jdbc.HiveDriver"); Class.forName("org.apache.hive.jdbc.HiveDriver");

View File

@ -86,9 +86,9 @@ public class PiwikDownloadLogs {
Statement statement = ConnectDB.getConnection().createStatement(); Statement statement = ConnectDB.getConnection().createStatement();
ResultSet rs = statement ResultSet rs = statement
.executeQuery( .executeQuery(
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema() "SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
+ ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id"); + ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
while (rs.next()) { while (rs.next()) {
int siteId = rs.getInt(1); int siteId = rs.getInt(1);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");

View File

@ -39,6 +39,7 @@ public class PiwikStatsDB {
public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception { public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
this.logRepoPath = logRepoPath; this.logRepoPath = logRepoPath;
this.logPortalPath = logPortalPath; this.logPortalPath = logPortalPath;
this.createDatabase();
this.createTables(); this.createTables();
// The piwiklog table is not needed since it is built // The piwiklog table is not needed since it is built
// on top of JSON files // on top of JSON files
@ -69,36 +70,46 @@ public class PiwikStatsDB {
this.CounterRobotsURL = CounterRobotsURL; this.CounterRobotsURL = CounterRobotsURL;
} }
private void createDatabase() throws Exception {
try {
stmt = ConnectDB.getConnection().createStatement();
String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema();
stmt.executeUpdate(createDatabase);
} catch (Exception e) {
log.error("Failed to create database: " + e);
throw new Exception("Failed to create database: " + e.toString(), e);
}
}
private void createTables() throws Exception { private void createTables() throws Exception {
try { try {
stmt = ConnectDB.getConnection().createStatement(); stmt = ConnectDB.getConnection().createStatement();
// Create Piwiklog table - This table should exist // Create Piwiklog table - This table should exist
String sqlCreateTablePiwikLog = String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS "
"CREATE TABLE IF NOT EXISTS" + ConnectDB.getUsageStatsDBSchema()
+ ConnectDB.getUsageStatsDBSchema() + ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, action, timestamp, entity_id) "
+ "clustered by (source, id_visit, action, timestamp, entity_id) " + "into 100 buckets stored as orc tblproperties('transactional'='true')";
+ "into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTablePiwikLog); stmt.executeUpdate(sqlCreateTablePiwikLog);
///////////////////////////////////////// /////////////////////////////////////////
// Rule for duplicate inserts @ piwiklog // Rule for duplicate inserts @ piwiklog
///////////////////////////////////////// /////////////////////////////////////////
String sqlCreateTablePortalLog = String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS "
"CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ConnectDB.getUsageStatsDBSchema() + ".process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ ".process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
+ "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTablePortalLog); stmt.executeUpdate(sqlCreateTablePortalLog);
////////////////////////////////////////////////// //////////////////////////////////////////////////
// Rule for duplicate inserts @ process_portal_log // Rule for duplicate inserts @ process_portal_log
////////////////////////////////////////////////// //////////////////////////////////////////////////
stmt.close(); stmt.close();
ConnectDB.getConnection().close(); ConnectDB.getConnection().close();
log.info("Usage Tables Created"); log.info("Usage Tables Created");
@ -112,35 +123,31 @@ public class PiwikStatsDB {
private void createTmpTables() throws Exception { private void createTmpTables() throws Exception {
try { try {
Statement stmt = ConnectDB.getConnection().createStatement(); Statement stmt = ConnectDB.getConnection().createStatement();
String sqlCreateTmpTablePiwikLog = String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS "
"CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
+ ".piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " + "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
+ "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " + "stored as orc tblproperties('transactional'='true');";
+ "stored as orc tblproperties('transactional'='true');";
stmt.executeUpdate(sqlCreateTmpTablePiwikLog); stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
////////////////////////////////////////////////// //////////////////////////////////////////////////
// Rule for duplicate inserts @ piwiklogtmp // Rule for duplicate inserts @ piwiklogtmp
////////////////////////////////////////////////// //////////////////////////////////////////////////
////////////////////////////////////////////////// //////////////////////////////////////////////////
// Copy from public.piwiklog to piwiklog // Copy from public.piwiklog to piwiklog
////////////////////////////////////////////////// //////////////////////////////////////////////////
// String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;"; // String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
// stmt.executeUpdate(sqlCopyPublicPiwiklog); // stmt.executeUpdate(sqlCopyPublicPiwiklog);
String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getUsageStatsDBSchema()
String sqlCreateTmpTablePortalLog = + ".process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
"CREATE TABLE IF NOT EXISTS " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ ConnectDB.getUsageStatsDBSchema() + "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
+ ".process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTmpTablePortalLog); stmt.executeUpdate(sqlCreateTmpTablePortalLog);
////////////////////////////////////////////////// //////////////////////////////////////////////////
// Rule for duplicate inserts @ process_portal_log_tmp // Rule for duplicate inserts @ process_portal_log_tmp
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -199,47 +206,43 @@ public class PiwikStatsDB {
public void processRepositoryLog() throws Exception { public void processRepositoryLog() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement(); Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false); ConnectDB.getConnection().setAutoCommit(false);
String stm_piwiklogtmp_json = String stm_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
"CREATE EXTERNAL TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() +
ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp_json(\n" +
".piwiklogtmp_json(\n" + " `idSite` STRING,\n" +
" `idSite` STRING,\n" + " `idVisit` STRING,\n" +
" `idVisit` STRING,\n" + " `country` STRING,\n" +
" `country` STRING,\n" + " `referrerName` STRING,\n" +
" `referrerName` STRING,\n" + " `browser` STRING,\n" +
" `browser` STRING,\n" + " `actionDetails` ARRAY<\n" +
" `actionDetails` ARRAY<\n" + " struct<\n" +
" struct<\n" + " type: STRING,\n" +
" type: STRING,\n" + " url: STRING,\n" +
" url: STRING,\n" + " `customVariables`: struct<\n" +
" `customVariables`: struct<\n" + " `1`: struct<\n" +
" `1`: struct<\n" + " `customVariablePageValue1`: STRING\n" +
" `customVariablePageValue1`: STRING\n" + " >\n" +
" >\n" + " >,\n" +
" >,\n" + " timestamp: String\n" +
" timestamp: String\n" + " >\n" +
" >\n" + " >\n" +
" >\n" + ")\n" +
")\n" + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + "LOCATION '/user/spyros/logs/usage_stats_logs/Repologs2/'\n" +
"LOCATION '/user/spyros/logs/usage_stats_logs/Repologs2/'\n" + "TBLPROPERTIES (\"transactional\"=\"false\");\n" +
"TBLPROPERTIES (\"transactional\"=\"false\");\n" + "";
"";
stmt.executeUpdate(stm_piwiklogtmp_json); stmt.executeUpdate(stm_piwiklogtmp_json);
String stm_piwiklogtmp = "CREATE TABLE " +
String stm_piwiklogtmp = ConnectDB.getUsageStatsDBSchema() +
"CREATE TABLE " + ".piwiklogtmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " +
ConnectDB.getUsageStatsDBSchema() + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
".piwiklogtmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " + "clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true');";
"entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + stmt.executeUpdate(stm_piwiklogtmp);
"clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true');";
stmt.executeUpdate(processRepositoryLog);
stmt.close(); stmt.close();
// ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath); // ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
//// File dir = new File(this.logRepoPath); //// File dir = new File(this.logRepoPath);
//// File[] jsonFiles = dir.listFiles(); //// File[] jsonFiles = dir.listFiles();

View File

@ -27,13 +27,13 @@ public class UsageStatsExporter {
String matomoAuthToken = "703bd17d845acdaf795e01bb1e0895b9"; String matomoAuthToken = "703bd17d845acdaf795e01bb1e0895b9";
String matomoBaseURL = "analytics.openaire.eu"; String matomoBaseURL = "analytics.openaire.eu";
String repoLogPath = "/user/spyros/logs/usage_stats_logs/Repologs"; String repoLogPath = "/user/spyros/logs/usage_stats_logs2/Repologs";
String portalLogPath = "/user/spyros/logs/usage_stats_logs/Portallogs/"; String portalLogPath = "/user/spyros/logs/usage_stats_logs2/Portallogs/";
String portalMatomoID = "109"; String portalMatomoID = "109";
String irusUKBaseURL = "https://irus.jisc.ac.uk/api/sushilite/v1_7/"; String irusUKBaseURL = "https://irus.jisc.ac.uk/api/sushilite/v1_7/";
String irusUKReportPath = "/user/spyros/logs/usage_stats_logs/irusUKReports"; String irusUKReportPath = "/user/spyros/logs/usage_stats_logs2/irusUKReports";
String sarcsReportPath = "/user/spyros/logs/usage_stats_logs/sarcReports"; String sarcsReportPath = "/user/spyros/logs/usage_stats_logs2/sarcReports";
// connect to DB // connect to DB
ConnectDB.init(properties); ConnectDB.init(properties);