From 66c7ddfc5e5ea70ea000cbd5132ec9ec9b106f88 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Thu, 14 May 2020 22:27:18 +0300 Subject: [PATCH] More progress on SQL statements and parameters --- .../oa/graph/usagestats/export/ConnectDB.java | 48 ++++++++++++++----- .../usagestats/export/PiwikDownloadLogs.java | 6 ++- .../graph/usagestats/export/PiwikStatsDB.java | 41 +++++++++------- .../usagestats/export/UsageStatsExporter.java | 28 +++++++---- 4 files changed, 81 insertions(+), 42 deletions(-) diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java index eab84222aa..05d79d854d 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java @@ -27,17 +27,26 @@ public abstract class ConnectDB { private static String dbURL; private static String dbUsername; private static String dbPassword; - private static String defaultDBSchema; + private static String usageStatsDBSchema; + private static String statsDBSchema; private final static Logger log = Logger.getLogger(ConnectDB.class); static void init(Properties properties) throws ClassNotFoundException { - dbURL = properties.getProperty("Stats_db_Url"); - dbUsername = properties.getProperty("Stats_db_User"); - dbPassword = properties.getProperty("Stats_db_Pass"); - defaultDBSchema = properties.getProperty("Stats_db_Schema"); + // To be initialized by a property file - Class.forName(properties.getProperty("Stats_db_Driver")); +// dbURL = properties.getProperty("Stats_db_Url"); +// dbUsername = properties.getProperty("Stats_db_User"); +// dbPassword = properties.getProperty("Stats_db_Pass"); +// defaultDBSchema = properties.getProperty("Stats_db_Schema"); +// +// Class.forName(properties.getProperty("Stats_db_Driver")); + + dbURL = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1"; + usageStatsDBSchema = "usagestats"; + statsDBSchema = "stats_wf_db_galexiou_oozie_beta"; + + Class.forName("org.apache.hive.jdbc.HiveDriver"); } public static Connection getConnection() throws SQLException { @@ -50,16 +59,31 @@ public abstract class ConnectDB { } } - private static Connection connect() throws SQLException { - Connection connection = DriverManager.getConnection(dbURL, dbUsername, dbPassword); - Statement stmt = connection.createStatement(); - String sqlSetSearchPath = "SET search_path TO " + defaultDBSchema + ";"; - stmt.executeUpdate(sqlSetSearchPath); - stmt.close(); + public static String getUsageStatsDBSchema() { + return ConnectDB.usageStatsDBSchema; + } + public static String getStatsDBSchema() { + return ConnectDB.statsDBSchema; + } + + private static Connection connect() throws SQLException { + + Connection connection = DriverManager.getConnection(dbURL); + Statement stmt = connection.createStatement(); log.debug("Opened database successfully"); return connection; + +// Connection connection = DriverManager.getConnection(dbURL, dbUsername, dbPassword); +// Statement stmt = connection.createStatement(); +// String sqlSetSearchPath = "SET search_path TO " + defaultDBSchema + ";"; +// stmt.executeUpdate(sqlSetSearchPath); +// stmt.close(); +// +// log.debug("Opened database successfully"); +// +// return connection; } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java index 4a5f53c491..4db8ae98bd 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java @@ -69,7 +69,8 @@ public class PiwikDownloadLogs { ResultSet rs = statement .executeQuery( - "SELECT distinct piwik_id from public.datasource where piwik_id is not null order by piwik_id;"); + "SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema() + + ".datasource where piwik_id is not null order by piwik_id"); while (rs.next()) { int siteId = rs.getInt(1); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); @@ -85,7 +86,8 @@ public class PiwikDownloadLogs { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); PreparedStatement st = ConnectDB.DB_CONNECTION .prepareStatement( - "SELECT max(timestamp) FROM piwiklog WHERE source=? HAVING max(timestamp) is not null;"); + "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog WHERE source=? HAVING max(timestamp) is not null"); st.setInt(1, siteId); ResultSet rs_date = st.executeQuery(); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java index 0d26ec661d..047f11b678 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java @@ -70,29 +70,33 @@ public class PiwikStatsDB { private void createTables() throws Exception { try { stmt = ConnectDB.getConnection().createStatement(); - String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; + String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')"; String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " + " ON INSERT TO piwiklog " + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit," + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id " - + "FROM piwiklog " - + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; - String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");"; + + "FROM " + ConnectDB.getUsageStatsDBSchema() + "piwiklog " + + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING"; + String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on " + + ConnectDB.getUsageStatsDBSchema() + "piwiklog(source, id_visit, action, entity_id, \"timestamp\")"; stmt.executeUpdate(sqlCreateTablePiwikLog); - stmt.executeUpdate(sqlcreateRulePiwikLog); - stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); +// stmt.executeUpdate(sqlcreateRulePiwikLog); --> We need to find a way to eliminate duplicates +// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes - String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));"; + String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + "process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " - + " ON INSERT TO process_portal_log " + + " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log " + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit," + "process_portal_log.\"timestamp\" " - + "FROM process_portal_log " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + "process_portal_log " + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; - String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");"; + String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on " + + ConnectDB.getUsageStatsDBSchema() + "process_portal_log(source, id_visit, \"timestamp\");"; stmt.executeUpdate(sqlCreateTablePortalLog); - stmt.executeUpdate(sqlcreateRulePortalLog); - stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); +// stmt.executeUpdate(sqlcreateRulePortalLog); --> We need to find a way to eliminate duplicates +// stmt.executeUpdate(sqlCreateRuleIndexPiwikLog); --> We probably don't need indexes stmt.close(); ConnectDB.getConnection().close(); @@ -107,27 +111,28 @@ public class PiwikStatsDB { private void createTmpTables() throws Exception { try { Statement stmt = ConnectDB.getConnection().createStatement(); - String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; + String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + "piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets stored as orc tblproperties('transactional'='true')"; String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " - + " ON INSERT TO piwiklogtmp " + + " ON INSERT TO " + ConnectDB.getUsageStatsDBSchema() + "piwiklogtmp " + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit," + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id " + "FROM piwiklogtmp " + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; stmt.executeUpdate(sqlCreateTmpTablePiwikLog); - stmt.executeUpdate(sqlcreateTmpRulePiwikLog); +// stmt.executeUpdate(sqlcreateTmpRulePiwikLog); --> We need to find a way to eliminate duplicates // String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;"; // stmt.executeUpdate(sqlCopyPublicPiwiklog); - String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));"; + String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " + " ON INSERT TO process_portal_log_tmp " + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit," + "process_portal_log_tmp.\"timestamp\" " + "FROM process_portal_log_tmp " + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; - stmt.executeUpdate(sqlCreateTmpTablePortalLog); - stmt.executeUpdate(sqlcreateTmpRulePortalLog); +// stmt.executeUpdate(sqlCreateTmpTablePortalLog); --> We need to find a way to eliminate duplicates +// stmt.executeUpdate(sqlcreateTmpRulePortalLog); --> We probably don't need indexes stmt.close(); log.info("Usage Tmp Tables Created"); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java index 2934fd7044..b7d03ff562 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java @@ -18,23 +18,31 @@ public class UsageStatsExporter { public void export() throws Exception { // read workdflow parameters - String matomoAuthToken = properties.getProperty("matomo_AuthToken"); - String matomoBaseURL = properties.getProperty("matomo_BaseUrl"); - String repoLogPath = properties.getProperty("repo_LogPath"); - String portalLogPath = properties.getProperty("portal_LogPath"); - String portalMatomoID = properties.getProperty("portal_MatomoID"); - String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl"); +// String matomoAuthToken = properties.getProperty("matomo_AuthToken"); +// String matomoBaseURL = properties.getProperty("matomo_BaseUrl"); +// String repoLogPath = properties.getProperty("repo_LogPath"); +// String portalLogPath = properties.getProperty("portal_LogPath"); +// String portalMatomoID = properties.getProperty("portal_MatomoID"); +// String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl"); + + String matomoAuthToken = "703bd17d845acdaf795e01bb1e0895b9"; + String matomoBaseURL = "analytics.openaire.eu"; + String repoLogPath = "/user/spyros/logs/usage_stats_logs/Repologs"; + String portalLogPath = "/user/spyros/logs/usage_stats_logs/Portallogs/"; + String portalMatomoID = "109"; + String irusUKBaseURL = "https://irus.jisc.ac.uk/api/sushilite/v1_7/"; // connect to DB ConnectDB.init(properties); + // Create DB tables - they are also needed to download the statistics too + PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath); + + // Download the statistics PiwikDownloadLogs piwd = new PiwikDownloadLogs(matomoBaseURL, matomoAuthToken); piwd.GetOpenAIRELogs(repoLogPath, portalLogPath, portalMatomoID); - /* - * Create DB tables, insert/update statistics - */ - PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath); + // Create DB tables, insert/update statistics piwikstatsdb.setCounterRobotsURL(properties.getProperty("COUNTER_robots_Url")); piwikstatsdb.processLogs(); log.info("process logs done");