processIrusStats done

This commit is contained in:
Spyros Zoupanos 2020-09-13 16:00:40 +03:00
parent 3d5904fb41
commit 08a102a76c
3 changed files with 109 additions and 44 deletions

View File

@ -83,38 +83,108 @@ public class IrusStats {
}
}
// The following may not be needed - It will be created when JSON tables are created
private void createTmpTables() throws Exception {
try {
// // The following may not be needed - It will be created when JSON tables are created
// private void createTmpTables() throws Exception {
// try {
//
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilogtmp(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
// stmt.executeUpdate(sqlCreateTableSushiLog);
//
// // stmt.executeUpdate("CREATE TABLE IF NOT EXISTS public.sushilog AS TABLE sushilog;");
// // String sqlCopyPublicSushiLog = "INSERT INTO sushilog SELECT * FROM public.sushilog;";
// // stmt.executeUpdate(sqlCopyPublicSushiLog);
// String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO sushilogtmp "
// + " WHERE (EXISTS ( SELECT sushilogtmp.source, sushilogtmp.repository,"
// + "sushilogtmp.rid, sushilogtmp.date "
// + "FROM sushilogtmp "
// + "WHERE sushilogtmp.source = new.source AND sushilogtmp.repository = new.repository AND sushilogtmp.rid = new.rid AND sushilogtmp.date = new.date AND sushilogtmp.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlcreateRuleSushiLog);
//
// stmt.close();
// ConnectDB.getConnection().close();
// log.info("Sushi Tmp Tables Created");
// } catch (Exception e) {
// log.error("Failed to create tables: " + e);
// throw new Exception("Failed to create tables: " + e.toString(), e);
// }
// }
Statement stmt = ConnectDB.getConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilogtmp(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmt.executeUpdate(sqlCreateTableSushiLog);
// stmt.executeUpdate("CREATE TABLE IF NOT EXISTS public.sushilog AS TABLE sushilog;");
// String sqlCopyPublicSushiLog = "INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO sushilogtmp "
+ " WHERE (EXISTS ( SELECT sushilogtmp.source, sushilogtmp.repository,"
+ "sushilogtmp.rid, sushilogtmp.date "
+ "FROM sushilogtmp "
+ "WHERE sushilogtmp.source = new.source AND sushilogtmp.repository = new.repository AND sushilogtmp.rid = new.rid AND sushilogtmp.date = new.date AND sushilogtmp.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
stmt.executeUpdate(sqlcreateRuleSushiLog);
stmt.close();
ConnectDB.getConnection().close();
log.info("Sushi Tmp Tables Created");
} catch (Exception e) {
log.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
public void irusStats() throws Exception {
public void processIrusStats() throws Exception {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
System.out.println("====> Added JSON Serde jar");
System.out.println("====> Dropping sushilogtmp_json table");
String drop_sushilogtmp_json = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sushilogtmp_json";
stmt.executeUpdate(drop_sushilogtmp_json);
System.out.println("====> Dropped sushilogtmp_json table");
System.out.println("====> Creating sushilogtmp_json table");
String create_sushilogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp_json(\n" +
" `ItemIdentifier` ARRAY<\n" +
" struct<\n" +
" Type: STRING,\n" +
" Value: STRING\n" +
" >\n" +
" >,\n" +
" `ItemPerformance` ARRAY<\n" +
" struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >\n" +
" >\n" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + UsageStatsExporter.irusUKReportPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_sushilogtmp_json);
System.out.println("====> Created sushilogtmp_json table");
System.out.println("====> Dropping sushilogtmp table");
String drop_sushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sushilogtmp";
stmt.executeUpdate(drop_sushilogtmp);
System.out.println("====> Dropped sushilogtmp table");
System.out.println("====> Creating sushilogtmp table");
String create_sushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilogtmp(source STRING, repository STRING, " +
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
"tblproperties('transactional'='true')";
stmt.executeUpdate(create_sushilogtmp);
System.out.println("====> Created sushilogtmp table");
System.out.println("====> Inserting to sushilogtmp table");
String insert_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp " +
"SELECT 'IRUS-UK', 'opendoar____::', `ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " +
"`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp_json " +
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
"LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " +
"WHERE `ItemIdent`.`Type`= 'OAI'";
stmt.executeUpdate(insert_sushilogtmp);
System.out.println("====> Inserted to sushilogtmp table");
ConnectDB.getConnection().close();
System.exit(0);
// String sql = "INSERT INTO sushi_result_downloads SELECT s.source, d.id AS repository, ro.id, s.date, s.count
// FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND s.oai=ro.orid AND
// metric_type='ft_total'";
@ -132,7 +202,6 @@ public class IrusStats {
sql = "Insert into sushilog select * from sushilogtmp;";
stmt.executeUpdate(sql);
ConnectDB.getConnection().commit();
ConnectDB.getConnection().close();
}

View File

@ -191,7 +191,7 @@ public class PiwikStatsDB {
System.out.println("====> DownloadsStats processing starts");
System.out.println("====> Processing portal logs");
// processPortalLog();
processPortalLog();
System.out.println("====> Portal logs process done");
log.info("portal process done");
@ -717,6 +717,10 @@ public class PiwikStatsDB {
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
System.out.println("====> Added JSON Serde jar");
System.out.println("====> Dropping process_portal_log_tmp_json table");
String drop_process_portal_log_tmp_json = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +

View File

@ -44,14 +44,6 @@ public class UsageStatsExporter {
// piwd.GetOpenAIRELogs(repoLogPath, portalLogPath, portalMatomoID);
System.out.println("====> Downloaded logs");
// Adding JSON Serde jar needed for creating tables over JSON files
Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false);
System.out.println("====> Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
System.out.println("====> Added JSON Serde jar");
stmt.close();
// Create DB tables, insert/update statistics
// String cRobotsUrl = properties.getProperty("COUNTER_robots_Url");
String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json";
@ -61,15 +53,15 @@ public class UsageStatsExporter {
log.info("process logs done");
IrusStats irusstats = new IrusStats(irusUKBaseURL);
// irusstats.processIrusRRReport(irusUKReportPath);
// irusstats.getIrusRRReport(irusUKReportPath);
irusstats.irusStats();
irusstats.processIrusStats();
// log.info("irus done");
System.exit(0);
SarcStats sarcStats = new SarcStats();
sarcStats.processSarc(sarcsReportPath);
// SarcStats sarcStats = new SarcStats();
// sarcStats.processSarc(sarcsReportPath);
// sarcStats.sarcStats();
log.info("sarc done");