package eu.dnetlib.oa.graph.usagestats.export; import*; import; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author D. Pierrakos, S. Zoupanos */ public class PiwikStatsDB { private String logPath; private String logRepoPath; private String logPortalPath; private Statement stmt = null; private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class); private String CounterRobotsURL; private ArrayList robotsList; public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception { this.logRepoPath = logRepoPath; this.logPortalPath = logPortalPath; } public void recreateDBAndTables() throws Exception { this.createDatabase(); this.createTables(); // The piwiklog table is not needed since it is built // on top of JSON files this.createTmpTables(); } public ArrayList getRobotsList() { return robotsList; } public void setRobotsList(ArrayList robotsList) { this.robotsList = robotsList; } public String getCounterRobotsURL() { return CounterRobotsURL; } public void setCounterRobotsURL(String CounterRobotsURL) { this.CounterRobotsURL = CounterRobotsURL; } private void createDatabase() throws Exception { try { stmt = ConnectDB.getHiveConnection().createStatement();"Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE"; stmt.executeUpdate(dropDatabase); } catch (Exception e) { logger.error("Failed to drop database: " + e); throw new Exception("Failed to drop database: " + e.toString(), e); } try { stmt = ConnectDB.getHiveConnection().createStatement();"Creating usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema(); stmt.executeUpdate(createDatabase); } catch (Exception e) { logger.error("Failed to create database: " + e); throw new Exception("Failed to create database: " + e.toString(), e); } } private void createTables() throws Exception { try { stmt = ConnectDB.getHiveConnection().createStatement(); // Create Piwiklog table - This table should exist String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, action, timestamp, entity_id) " + "into 100 buckets stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(sqlCreateTablePiwikLog); ///////////////////////////////////////// // Rule for duplicate inserts @ piwiklog ///////////////////////////////////////// String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(sqlCreateTablePortalLog); ////////////////////////////////////////////////// // Rule for duplicate inserts @ process_portal_log ////////////////////////////////////////////////// stmt.close(); ConnectDB.getHiveConnection().close(); } catch (Exception e) { logger.error("Failed to create tables: " + e); throw new Exception("Failed to create tables: " + e.toString(), e); } } private void createTmpTables() throws Exception { try { Statement stmt = ConnectDB.getHiveConnection().createStatement(); String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " + "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " + "stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(sqlCreateTmpTablePiwikLog); ////////////////////////////////////////////////// // Rule for duplicate inserts @ piwiklogtmp ////////////////////////////////////////////////// ////////////////////////////////////////////////// // Copy from public.piwiklog to piwiklog ////////////////////////////////////////////////// // String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;"; // stmt.executeUpdate(sqlCopyPublicPiwiklog); String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(sqlCreateTmpTablePortalLog); ////////////////////////////////////////////////// // Rule for duplicate inserts @ process_portal_log_tmp ////////////////////////////////////////////////// stmt.close(); } catch (Exception e) { logger.error("Failed to create tmptables: " + e); throw new Exception("Failed to create tmp tables: " + e.toString(), e); // System.exit(0); } } public void processLogs() throws Exception { try { ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL()); this.robotsList = counterRobots.getRobotsPatterns();"Processing repository logs"); processRepositoryLog();"Repository logs process done");"Removing double clicks"); removeDoubleClicks();"Removing double clicks done");"Cleaning oai"); cleanOAI();"Cleaning oai done");"ViewsStats processing starts"); viewsStats();"ViewsStats processing ends");"DownloadsStats processing starts"); downloadsStats();"DownloadsStats processing starts");"Processing portal logs"); processPortalLog();"Portal logs process done");"Processing portal usagestats"); portalStats();"Portal usagestats process done");"Updating Production Tables"); updateProdTables();"Updated Production Tables"); } catch (Exception e) { logger.error("Failed to process logs: " + e); throw new Exception("Failed to process logs: " + e.toString(), e); } } // public void usageStats() throws Exception { // try { // viewsStats(); // downloadsStats(); //"stat tables and views done"); // } catch (Exception e) { // log.error("Failed to create usage usagestats: " + e); // throw new Exception("Failed to create usage usagestats: " + e.toString(), e); // } // } public void processRepositoryLog() throws Exception { Statement stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false);"Adding JSON Serde jar"); stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");"Added JSON Serde jar");"Dropping piwiklogtmp_json table"); String drop_piwiklogtmp_json = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp_json"; stmt.executeUpdate(drop_piwiklogtmp_json);"Dropped piwiklogtmp_json table");"Creating piwiklogtmp_json"); String create_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp_json(\n" + " `idSite` STRING,\n" + " `idVisit` STRING,\n" + " `country` STRING,\n" + " `referrerName` STRING,\n" + " `browser` STRING,\n" + " `actionDetails` ARRAY<\n" + " struct<\n" + " type: STRING,\n" + " url: STRING,\n" + " `customVariables`: struct<\n" + " `1`: struct<\n" + " `customVariablePageValue1`: STRING\n" + " >\n" + " >,\n" + " timestamp: String\n" + " >\n" + " >\n" + ")\n" + "ROW FORMAT SERDE ''\n" + "LOCATION '" + ExecuteWorkflow.repoLogPath + "'\n" + "TBLPROPERTIES (\"transactional\"=\"false\")"; stmt.executeUpdate(create_piwiklogtmp_json);"Created piwiklogtmp_json");"Dropping piwiklogtmp table"); String drop_piwiklogtmp = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp"; stmt.executeUpdate(drop_piwiklogtmp);"Dropped piwiklogtmp");"Creating piwiklogtmp"); String create_piwiklogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(create_piwiklogtmp);"Created piwiklogtmp");"Inserting into piwiklogtmp"); String insert_piwiklogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, " + "actiondetail.type as action, actiondetail.url as url, " + "actiondetail.customVariables.`1`.`customVariablePageValue1` as entity_id, " + "'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " + "referrerName as referrer_name, browser as agent\n" + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp_json\n" + "LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail"; stmt.executeUpdate(insert_piwiklogtmp);"Inserted into piwiklogtmp"); stmt.close(); } public void removeDoubleClicks() throws Exception { Statement stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false);"Cleaning download double clicks"); // clean download double clicks String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "WHERE EXISTS (\n" + "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p1, " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p2\n" + "WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n" + "AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n" + "AND p1.timestamp\n" + " >\n" + ")\n" + "ROW FORMAT SERDE ''\n" + "LOCATION '" + ExecuteWorkflow.repoLogPath + "'\n" + "TBLPROPERTIES (\"transactional\"=\"false\")"; stmt.executeUpdate(create_process_portal_log_tmp_json);"Created process_portal_log_tmp_json");"Droping process_portal_log_tmp table"); String drop_process_portal_log_tmp = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp"; stmt.executeUpdate(drop_process_portal_log_tmp);"Dropped process_portal_log_tmp");"Creating process_portal_log_tmp"); String create_process_portal_log_tmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp (source BIGINT, id_visit STRING, country STRING, action STRING, url STRING, " + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + "clustered by (source, id_visit, timestamp) into 100 buckets stored as orc tblproperties('transactional'='true')"; stmt.executeUpdate(create_process_portal_log_tmp);"Created process_portal_log_tmp");"Inserting into process_portal_log_tmp"); String insert_process_portal_log_tmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp " + "SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, actiondetail.type as action, " + "actiondetail.url as url, " + "CASE\n" + " WHEN (actiondetail.url like '%datasourceId=%') THEN split(actiondetail.url,'datasourceId=')[1] " + " WHEN (actiondetail.url like '%datasource=%') THEN split(actiondetail.url,'datasource=')[1] " + " WHEN (actiondetail.url like '%datasourceFilter=%') THEN split(actiondetail.url,'datasourceFilter=')[1] " + " WHEN (actiondetail.url like '%articleId=%') THEN split(actiondetail.url,'articleId=')[1] " + " WHEN (actiondetail.url like '%datasetId=%') THEN split(actiondetail.url,'datasetId=')[1] " + " WHEN (actiondetail.url like '%projectId=%') THEN split(actiondetail.url,'projectId=')[1] " + " WHEN (actiondetail.url like '%organizationId=%') THEN split(actiondetail.url,'organizationId=')[1] " + " ELSE '' " + "END AS entity_id, " + "CASE " + " WHEN (actiondetail.url like '%datasourceId=%') THEN 'datasource' " + " WHEN (actiondetail.url like '%datasource=%') THEN 'datasource' " + " WHEN (actiondetail.url like '%datasourceFilter=%') THEN 'datasource' " + " WHEN (actiondetail.url like '%articleId=%') THEN 'result' " + " WHEN (actiondetail.url like '%datasetId=%') THEN 'result' " + " WHEN (actiondetail.url like '%projectId=%') THEN 'project' " + " WHEN (actiondetail.url like '%organizationId=%') THEN 'organization' " + " ELSE '' " + "END AS source_item_type, " + "from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, referrerName as referrer_name, " + "browser as agent " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp_json " + "LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail"; stmt.executeUpdate(insert_process_portal_log_tmp);"Inserted into process_portal_log_tmp"); stmt.close(); } public void portalStats() throws SQLException { Connection con = ConnectDB.getHiveConnection(); Statement stmt = con.createStatement(); con.setAutoCommit(false); // Original queries where of the style // // SELECT DISTINCT source, id_visit, country, action, url, roid.oid, 'oaItem', `timestamp`, referrer_name, agent // FROM usagestats_20200907.process_portal_log_tmp2, // openaire_prod_stats_20200821.result_oids roid // WHERE entity_id IS NOT null AND entity_id=roid.oid AND roid.oid IS NOT null // // The following query is an example of how queries should be // // // INSERT INTO usagestats_20200907.piwiklogtmp // SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'oaItem', `timestamp`, referrer_name, agent // FROM usagestats_20200907.process_portal_log_tmp // WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id // IN (SELECT roid.oid FROM openaire_prod_stats_20200821.result_oids roid WHERE roid.oid IS NOT NULL); // // We should consider if we would like the queries to be as the following // // INSERT INTO usagestats_20200907.piwiklogtmp // SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'oaItem', `timestamp`, referrer_name, agent // FROM usagestats_20200907.process_portal_log_tmp // WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id != '' AND process_portal_log_tmp.entity_id // IN (SELECT roid.oid FROM openaire_prod_stats_20200821.result_oids roid WHERE roid.oid IS NOT NULL AND // roid.oid != '');"PortalStats - Step 1"); String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'oaItem', `timestamp`, referrer_name, agent " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp " + "WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id " + "IN (SELECT roid.oid FROM " + ConnectDB.getStatsDBSchema() + ".project_oids roid WHERE roid.oid IS NOT NULL)"; stmt.executeUpdate(sql); stmt.close();"PortalStats - Step 2"); stmt = con.createStatement(); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'datasource', `timestamp`, referrer_name, agent " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp " + "WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id " + "IN (SELECT roid.oid FROM " + ConnectDB.getStatsDBSchema() + ".project_oids roid WHERE roid.oid IS NOT NULL)"; stmt.executeUpdate(sql); stmt.close();"PortalStats - Step 3"); stmt = con.createStatement(); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'organization', `timestamp`, referrer_name, agent " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp " + "WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id " + "IN (SELECT roid.oid FROM " + ConnectDB.getStatsDBSchema() + ".project_oids roid WHERE roid.oid IS NOT NULL)"; // stmt.executeUpdate(sql); stmt.close();"PortalStats - Step 4"); stmt = con.createStatement(); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SELECT DISTINCT source, id_visit, country, action, url, entity_id, 'project', `timestamp`, referrer_name, agent " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp " + "WHERE process_portal_log_tmp.entity_id IS NOT NULL AND process_portal_log_tmp.entity_id " + "IN (SELECT roid.oid FROM " + ConnectDB.getStatsDBSchema() + ".project_oids roid WHERE roid.oid IS NOT NULL)"; stmt.executeUpdate(sql); stmt.close(); con.close(); } private void cleanOAI() throws Exception { ConnectDB.getHiveConnection().setAutoCommit(false);"Cleaning oai - Step 1"); stmt = ConnectDB.getHiveConnection().createStatement(); String sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 2"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 3"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 4"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 5"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 6"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 7"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 8"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 9"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 10"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 11"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 12"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 13"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 14"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 15"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 16"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 17"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 18"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 19"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 20"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 21"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 22"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 23"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 24"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 25"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 26"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 27"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 28"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Step 29"); stmt = ConnectDB.getHiveConnection().createStatement(); sql = "UPDATE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "SET entity_id = regexp_replace(entity_id, '^'," + "'') WHERE entity_id LIKE ''"; stmt.executeUpdate(sql); stmt.close();"Cleaning oai - Done, closing connection"); ConnectDB.getHiveConnection().close(); } private String processPortalURL(String url) { if (url.indexOf("") > 0) { try { url = URLDecoder.decode(url, "UTF-8"); } catch (Exception e) {"Error when decoding the following URL: " + url); } if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) { url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59); } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) { url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57); } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) { url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63); } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) { url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56); } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) { url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56); } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) { url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56); } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) { url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61); } else { url = ""; } } else { url = ""; } return url; } private void updateProdTables() throws SQLException { Statement stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false);"Inserting data to piwiklog"); String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog " + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp"; stmt.executeUpdate(sql);"Inserting data to views_stats"); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp"; stmt.executeUpdate(sql);"Inserting data to downloads_stats"); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp"; stmt.executeUpdate(sql);"Inserting data to pageviews_stats"); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats " + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats_tmp"; stmt.executeUpdate(sql);"Dropping table views_stats_tmp"); sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp"; stmt.executeUpdate(sql);"Dropping table downloads_stats_tmp"); sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp"; stmt.executeUpdate(sql);"Dropping table pageviews_stats_tmp"); sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats_tmp"; stmt.executeUpdate(sql);"Dropping table process_portal_log_tmp"); sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp"; stmt.executeUpdate(sql); stmt.close(); ConnectDB.getHiveConnection().close(); } private ArrayList listHdfsDir(String dir) throws Exception { FileSystem hdfs = FileSystem.get(new Configuration()); RemoteIterator Files; ArrayList fileNames = new ArrayList<>(); try { Path exportPath = new Path(hdfs.getUri() + dir); Files = hdfs.listFiles(exportPath, false); while (Files.hasNext()) { String fileName =; fileNames.add(fileName); } hdfs.close(); } catch (Exception e) { logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath)); throw new Exception("HDFS file path with exported data does not exist : " + logPath, e); } return fileNames; } private String readHDFSFile(String filename) throws Exception { String result; try { FileSystem fs = FileSystem.get(new Configuration()); //"reading file : " + filename); BufferedReader br = new BufferedReader(new InputStreamReader( Path(filename)))); StringBuilder sb = new StringBuilder(); String line = br.readLine(); while (line != null) { if (!line.equals("[]")) { sb.append(line); } // sb.append(line); line = br.readLine(); } result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\""); if (result.equals("")) { result = "[]"; } // fs.close(); } catch (Exception e) { logger.error(e.getMessage()); throw new Exception(e); } return result; } private Connection getConnection() throws SQLException { return ConnectDB.getHiveConnection(); } }