More progress - Adding queries to code

This commit is contained in:
Spyros Zoupanos 2020-08-31 23:19:15 +03:00
parent 8db9a7ccdc
commit f3dda9858c
3 changed files with 114 additions and 105 deletions

View File

@ -70,33 +70,32 @@ public class PiwikStatsDB {
private void createTables() throws Exception { private void createTables() throws Exception {
try { try {
stmt = ConnectDB.getConnection().createStatement(); stmt = ConnectDB.getConnection().createStatement();
String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')";
String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO piwiklog "
+ " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
+ "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + "piwiklog "
+ "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING";
String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on "
+ ConnectDB.getUsageStatsDBSchema() + "piwiklog(source, id_visit, action, entity_id, \"timestamp\")";
stmt.executeUpdate(sqlCreateTablePiwikLog);
// stmt.executeUpdate(sqlcreateRulePiwikLog); --> We need to find a way to eliminate duplicates
// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes
String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() // Create Piwiklog table - This table should exist
+ "process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; String sqlCreateTablePiwikLog =
String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " "CREATE TABLE IF NOT EXISTS"
+ " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log " + ConnectDB.getUsageStatsDBSchema()
+ " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit," + ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ "process_portal_log.\"timestamp\" " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log " + "clustered by (source, id_visit, action, timestamp, entity_id) "
+ "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; + "into 100 buckets stored as orc tblproperties('transactional'='true')";
String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on " stmt.executeUpdate(sqlCreateTablePiwikLog);
+ ConnectDB.getUsageStatsDBSchema() + "process_portal_log(source, id_visit, \"timestamp\");";
/////////////////////////////////////////
// Rule for duplicate inserts @ piwiklog
/////////////////////////////////////////
String sqlCreateTablePortalLog =
"CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getUsageStatsDBSchema()
+ ".process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTablePortalLog); stmt.executeUpdate(sqlCreateTablePortalLog);
// stmt.executeUpdate(sqlcreateRulePortalLog); --> We need to find a way to eliminate duplicates
// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes //////////////////////////////////////////////////
// Rule for duplicate inserts @ process_portal_log
//////////////////////////////////////////////////
stmt.close(); stmt.close();
ConnectDB.getConnection().close(); ConnectDB.getConnection().close();
@ -111,28 +110,38 @@ public class PiwikStatsDB {
private void createTmpTables() throws Exception { private void createTmpTables() throws Exception {
try { try {
Statement stmt = ConnectDB.getConnection().createStatement(); Statement stmt = ConnectDB.getConnection().createStatement();
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() String sqlCreateTmpTablePiwikLog =
+ "piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')"; "CREATE TABLE IF NOT EXISTS "
String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " + ConnectDB.getUsageStatsDBSchema()
+ " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "piwiklogtmp " + ".piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
+ " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit," + "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id " + "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
+ "FROM piwiklogtmp " + "stored as orc tblproperties('transactional'='true');";
+ "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
stmt.executeUpdate(sqlCreateTmpTablePiwikLog); stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
// stmt.executeUpdate(sqlcreateTmpRulePiwikLog); --> We need to find a way to eliminate duplicates
//////////////////////////////////////////////////
// Rule for duplicate inserts @ piwiklogtmp
//////////////////////////////////////////////////
//////////////////////////////////////////////////
// Copy from public.piwiklog to piwiklog
//////////////////////////////////////////////////
// String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;"; // String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
// stmt.executeUpdate(sqlCopyPublicPiwiklog); // stmt.executeUpdate(sqlCopyPublicPiwiklog);
String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO process_portal_log_tmp "
+ " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit," String sqlCreateTmpTablePortalLog =
+ "process_portal_log_tmp.\"timestamp\" " "CREATE TABLE IF NOT EXISTS "
+ "FROM process_portal_log_tmp " + ConnectDB.getUsageStatsDBSchema()
+ "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; + ".process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, "
+ "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTmpTablePortalLog); stmt.executeUpdate(sqlCreateTmpTablePortalLog);
// stmt.executeUpdate(sqlcreateTmpRulePortalLog); --> We need to find a way to eliminate duplicates
//////////////////////////////////////////////////
// Rule for duplicate inserts @ process_portal_log_tmp
//////////////////////////////////////////////////
stmt.close(); stmt.close();
log.info("Usage Tmp Tables Created"); log.info("Usage Tmp Tables Created");

View File

@ -188,62 +188,64 @@ public class SarcStats {
fin.writeChar('\n'); fin.writeChar('\n');
} }
fin.close(); fin.close();
//////////////////
// JSONObject jsonObjectRow = (JSONObject) aJsonArray; // JSONObject jsonObjectRow = (JSONObject) aJsonArray;
// JSONArray itemIdentifier = new JSONArray(); // JSONArray itemIdentifier = new JSONArray();
// obj = jsonObjectRow.get("c:ItemIdentifier"); // obj = jsonObjectRow.get("c:ItemIdentifier");
// if (obj instanceof JSONObject) { // if (obj instanceof JSONObject) {
// itemIdentifier.add(obj); // itemIdentifier.add(obj);
// } else { // } else {
// // JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("c:ItemIdentifier"); // // JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("c:ItemIdentifier");
// itemIdentifier = (JSONArray) obj; // itemIdentifier = (JSONArray) obj;
// }
// for (Object identifier : itemIdentifier) {
// JSONObject doi = (JSONObject) identifier;
// if (doi.get("c:Type").toString().equals("DOI")) {
// rid = doi.get("c:Value").toString();
// // System.out.println("DOI: " + rid);
// break;
// }
// }
// if (rid.isEmpty()) {
// continue;
// }
//
// JSONObject itemPerformance = (JSONObject) jsonObjectRow.get("c:ItemPerformance");
// // for (Object perf : itemPerformance) {
// JSONObject performance = (JSONObject) itemPerformance;
// JSONObject periodObj = (JSONObject) performance.get("c:Period");
// String period = periodObj.get("c:Begin").toString();
// JSONObject instanceObj = (JSONObject) performance.get("c:Instance");
// String type = instanceObj.get("c:MetricType").toString();
// String count = instanceObj.get("c:Count").toString();
// // System.out.println(rid + " : " + period + " : " + count);
//
// preparedStatement.setString(1, "SARC-OJS");
// preparedStatement.setString(2, issn);
// // preparedStatement.setString(2, url);
// preparedStatement.setString(3, rid);
// preparedStatement.setString(4, period);
// preparedStatement.setString(5, type);
// preparedStatement.setInt(6, Integer.parseInt(count));
// preparedStatement.addBatch();
// batch_size++;
// if (batch_size == 10000) {
// preparedStatement.executeBatch();
// ConnectDB.getConnection().commit();
// batch_size = 0;
// }
// // }
//
// // break;
// } // }
// for (Object identifier : itemIdentifier) {
// JSONObject doi = (JSONObject) identifier;
// if (doi.get("c:Type").toString().equals("DOI")) {
// rid = doi.get("c:Value").toString();
// // System.out.println("DOI: " + rid);
// break;
// }
// }
// if (rid.isEmpty()) {
// continue;
// }
//
// JSONObject itemPerformance = (JSONObject) jsonObjectRow.get("c:ItemPerformance");
// // for (Object perf : itemPerformance) {
// JSONObject performance = (JSONObject) itemPerformance;
// JSONObject periodObj = (JSONObject) performance.get("c:Period");
// String period = periodObj.get("c:Begin").toString();
// JSONObject instanceObj = (JSONObject) performance.get("c:Instance");
// String type = instanceObj.get("c:MetricType").toString();
// String count = instanceObj.get("c:Count").toString();
// // System.out.println(rid + " : " + period + " : " + count);
//
// preparedStatement.setString(1, "SARC-OJS");
// preparedStatement.setString(2, issn);
// // preparedStatement.setString(2, url);
// preparedStatement.setString(3, rid);
// preparedStatement.setString(4, period);
// preparedStatement.setString(5, type);
// preparedStatement.setInt(6, Integer.parseInt(count));
// preparedStatement.addBatch();
// batch_size++;
// if (batch_size == 10000) {
// preparedStatement.executeBatch();
// ConnectDB.getConnection().commit();
// batch_size = 0;
// }
// // }
//
// // break;
// }
//////////////////
// break; // break;
} }
preparedStatement.executeBatch(); preparedStatement.executeBatch();
ConnectDB.getConnection().commit(); ConnectDB.getConnection().commit();
ConnectDB.getConnection().close(); ConnectDB.getConnection().close();
} }
private String getJson(String url) throws Exception { private String getJson(String url) throws Exception {

View File

@ -38,22 +38,20 @@ public class UsageStatsExporter {
// connect to DB // connect to DB
ConnectDB.init(properties); ConnectDB.init(properties);
// // Create DB tables - they are also needed to download the statistics too // Create DB tables - they are also needed to download the statistics too
// PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath); PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath);
// //
// // Download the statistics - The following 2 lines are not needed after the download - Commenting them out for // // Download the statistics - The following 2 lines are not needed after the download - Commenting them out for
// // the moment // // the moment
// PiwikDownloadLogs piwd = new PiwikDownloadLogs(matomoBaseURL, matomoAuthToken); PiwikDownloadLogs piwd = new PiwikDownloadLogs(matomoBaseURL, matomoAuthToken);
// piwd.GetOpenAIRELogs(repoLogPath, portalLogPath, portalMatomoID); piwd.GetOpenAIRELogs(repoLogPath, portalLogPath, portalMatomoID);
//
// System.exit(0); // Create DB tables, insert/update statistics
// // String cRobotsUrl = properties.getProperty("COUNTER_robots_Url");
// // Create DB tables, insert/update statistics String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json";
//// String cRobotsUrl = properties.getProperty("COUNTER_robots_Url"); piwikstatsdb.setCounterRobotsURL(cRobotsUrl);
// String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json"; piwikstatsdb.processLogs();
// piwikstatsdb.setCounterRobotsURL(cRobotsUrl); log.info("process logs done");
// piwikstatsdb.processLogs();
// log.info("process logs done");
// IrusStats irusstats = new IrusStats(irusUKBaseURL); // IrusStats irusstats = new IrusStats(irusUKBaseURL);
// irusstats.processIrusRRReport(irusUKReportPath); // irusstats.processIrusRRReport(irusUKReportPath);