diff --git a/dhp-workflows/dhp-usage-stats-update/pom.xml b/dhp-workflows/dhp-usage-stats-update/pom.xml new file mode 100755 index 000000000..20d2f5b76 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/pom.xml @@ -0,0 +1,91 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.2.4-SNAPSHOT + + 4.0.0 + dhp-usage-stats-build + + + + pl.project13.maven + git-commit-id-plugin + 2.1.15 + + + + revision + + + + + ${project.basedir}/../.git + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + + + + + + UTF-8 + UTF-8 + 0.13.1-cdh5.2.1 + 2.5.0-cdh5.2.1 + + + + + org.apache.spark + spark-core_2.11 + 2.2.0 + + + org.apache.spark + spark-sql_2.11 + 2.4.5 + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + org.json + json + 20180130 + jar + + + org.apache.hive + hive-jdbc + ${cdh.hive.version} + + + org.apache.hadoop + hadoop-common + ${cdh.hadoop.version} + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + c3p0 + c3p0 + 0.9.1.2 + jar + + + dhp-usage-stats-build + diff --git a/dhp-workflows/dhp-usage-stats-update/runworkflow.sh b/dhp-workflows/dhp-usage-stats-update/runworkflow.sh new file mode 100755 index 000000000..191fb24c6 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/runworkflow.sh @@ -0,0 +1 @@ +mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/usagestatsbuild \ No newline at end of file diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java new file mode 100755 index 000000000..be7ce8afa --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java @@ -0,0 +1,134 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.sql.Connection; +import java.sql.SQLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +import com.mchange.v2.c3p0.ComboPooledDataSource; + +public abstract class ConnectDB { + + public static Connection DB_HIVE_CONNECTION; + public static Connection DB_IMPALA_CONNECTION; + + private static String dbHiveUrl; + private static String dbImpalaUrl; + private static String usageRawDataDBSchema; + private static String usageStatsDBSchema; + private static String usagestatsPermanentDBSchema; + private static String statsDBSchema; + + private ConnectDB() { + } + + static void init() throws ClassNotFoundException { + + dbHiveUrl = ExecuteWorkflow.dbHiveUrl; + dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl; + usageStatsDBSchema = ExecuteWorkflow.usageStatsDBSchema; + statsDBSchema = ExecuteWorkflow.statsDBSchema; + usageRawDataDBSchema = ExecuteWorkflow.usageRawDataDBSchema; + usagestatsPermanentDBSchema = ExecuteWorkflow.usagestatsPermanentDBSchema; + + Class.forName("org.apache.hive.jdbc.HiveDriver"); + } + + public static Connection getHiveConnection() throws SQLException { + if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) { + return DB_HIVE_CONNECTION; + } else { + DB_HIVE_CONNECTION = connectHive(); + + return DB_HIVE_CONNECTION; + } + } + + public static Connection getImpalaConnection() throws SQLException { + if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) { + return DB_IMPALA_CONNECTION; + } else { + DB_IMPALA_CONNECTION = connectImpala(); + + return DB_IMPALA_CONNECTION; + } + } + + public static String getUsageRawDataDBSchema() { + return ConnectDB.usageRawDataDBSchema; + } + + public static String getUsageStatsDBSchema() { +// String datePattern = "YYYYMMdd"; +// DateFormat df = new SimpleDateFormat(datePattern); +//// Get the today date using Calendar object. +// Date today = Calendar.getInstance().getTime(); +// String todayAsString = df.format(today); + +// return ConnectDB.usageStatsDBSchema + todayAsString; + return ConnectDB.usageStatsDBSchema; + } + + public static String getStatsDBSchema() { + return ConnectDB.statsDBSchema; + } + + public static String getUsagestatsPermanentDBSchema() { + return ConnectDB.usagestatsPermanentDBSchema; + } + + private static Connection connectHive() throws SQLException { + ComboPooledDataSource cpds = new ComboPooledDataSource(); + cpds.setJdbcUrl(dbHiveUrl); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); + + cpds.setAcquireRetryAttempts(30); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); + + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); + return cpds.getConnection(); + + } + + private static Connection connectImpala() throws SQLException { + ComboPooledDataSource cpds = new ComboPooledDataSource(); + cpds.setJdbcUrl(dbImpalaUrl); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); + + cpds.setAcquireRetryAttempts(30); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); + + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); + + return cpds.getConnection(); + + } + +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java new file mode 100755 index 000000000..a05424f2a --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java @@ -0,0 +1,140 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.BasicConfigurator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class ExecuteWorkflow { + +// static String matomoAuthToken; + static String portalMatomoID; +// static String irusUKBaseURL; +// static String lareferenciaBaseURL; +// static String lareferenciaAuthToken; + static String dbHiveUrl; + static String dbImpalaUrl; + static String usageRawDataDBSchema; + static String usageStatsDBSchema; + static String usagestatsPermanentDBSchema; + static String statsDBSchema; + static boolean recreateDbAndTables; + + static boolean processPiwikLogs; + static boolean processLaReferenciaLogs; + + static boolean irusProcessStats; + + static boolean sarcProcessStats; + + static boolean finalizeStats; + static boolean finalTablesVisibleToImpala; + + static int numberOfDownloadThreads; + + private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class); + + public static void main(String args[]) throws Exception { + + // Sending the logs to the console + BasicConfigurator.configure(); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + UsageStatsExporter.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json"))); + parser.parseArgument(args); + + // Setting up the initial parameters +// matomoAuthToken = parser.get("matomoAuthToken"); +// matomoBaseURL = parser.get("matomoBaseURL"); + portalMatomoID = parser.get("portalMatomoID"); +// irusUKBaseURL = parser.get("irusUKBaseURL"); +// lareferenciaBaseURL = parser.get("lareferenciaBaseURL"); +// lareferenciaAuthToken = parser.get("lareferenciaAuthToken"); + + dbHiveUrl = parser.get("dbHiveUrl"); + dbImpalaUrl = parser.get("dbImpalaUrl"); + usageRawDataDBSchema = parser.get("usageRawDataDBSchema"); + usageStatsDBSchema = parser.get("usageStatsDBSchema"); + usagestatsPermanentDBSchema = parser.get("usagestatsPermanentDBSchema"); + statsDBSchema = parser.get("statsDBSchema"); + + if (parser.get("processPiwikLogs").toLowerCase().equals("true")) { + processPiwikLogs = true; + } else { + processPiwikLogs = false; + } + +// String startingLogPeriodStr = parser.get("startingLogPeriod"); +// Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr); +// startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate); +// +// String endingLogPeriodStr = parser.get("endingLogPeriod"); +// Date endingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(endingLogPeriodStr); +// endingLogPeriod = startingLogPeriodStr(endingLogPeriodDate); + + if (parser.get("recreateDbAndTables").toLowerCase().equals("true")) { + recreateDbAndTables = true; + } else { + recreateDbAndTables = false; + } + + if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true")) { + processLaReferenciaLogs = true; + } else { + processLaReferenciaLogs = false; + } + + if (parser.get("irusProcessStats").toLowerCase().equals("true")) { + irusProcessStats = true; + } else { + irusProcessStats = false; + } + + if (parser.get("sarcProcessStats").toLowerCase().equals("true")) { + sarcProcessStats = true; + } else { + sarcProcessStats = false; + } + + if (parser.get("finalizeStats").toLowerCase().equals("true")) { + finalizeStats = true; + } else { + finalizeStats = false; + } + if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true")) { + finalTablesVisibleToImpala = true; + } else { + numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads")); + } + + UsageStatsExporter usagestatsExport = new UsageStatsExporter(); + usagestatsExport.export(); + } + + private static Calendar startingLogPeriodStr(Date date) { + + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + return calendar; + + } +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java new file mode 100755 index 000000000..831a8dde1 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java @@ -0,0 +1,95 @@ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.io.*; +import java.net.URL; +import java.net.URLConnection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class IrusStats { + + private String irusUKURL; + + private static final Logger logger = LoggerFactory.getLogger(IrusStats.class); + + public IrusStats() throws Exception { + } + + public void processIrusStats() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating irus_downloads_stats_tmp table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".irus_downloads_stats_tmp " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`count` bigint, " + + "`openaire` bigint)"; + stmt.executeUpdate(createDownloadsStats); + logger.info("Created irus_downloads_stats_tmp table"); + + logger.info("Inserting into irus_downloads_stats_tmp"); + String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_downloads_stats_tmp " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'"; + stmt.executeUpdate(insertDStats); + logger.info("Inserted into irus_downloads_stats_tmp"); + + String createR5Stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".irus_R5_stats_tmp " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`views` bigint, " + + "`downloads` bigint, " + + "`openaire` bigint)"; + stmt.executeUpdate(createR5Stats); + logger.info("Created irus_R5_stats_tmp table"); + + logger.info("Inserting into irus_R5_stats_tmp"); + String insertΡ5Stats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_R5_stats_tmp " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, " + + "(s.total_item_investigations-s.total_item_requests) as views, s.total_item_requests as downloads, '0' " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog_cop_r5 s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE s.repository=d.oid AND s.rid=ro.oid AND s.source='IRUS-UK'"; + stmt.executeUpdate(insertΡ5Stats); + logger.info("Inserted into irus_R5_stats_tmp"); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + //// to add create table sushilog_cop_r5 as select * from openaire_prod_usage_raw.sushilog_cop_r5 + //// to add create table sushilog_cop_r5 as select * from openaire_prod_usage_raw.sushilog_cop_r5 + +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java new file mode 100755 index 000000000..60c4afb30 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java @@ -0,0 +1,321 @@ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.io.*; +import java.net.URLDecoder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class LaReferenciaStats { + + private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class); + + private String logRepoPath; + + private Statement stmt = null; + + private String CounterRobotsURL; + private ArrayList robotsList; + + public LaReferenciaStats() throws Exception { + } + + public void processLogs() throws Exception { + try { + logger.info("LaReferencia creating viewsStats"); + viewsStats(); + logger.info("LaReferencia created viewsStats"); + + logger.info("LaReferencia creating downloadsStats"); + downloadsStats(); + logger.info("LaReferencia created downloadsStats"); + + logger.info("LaReferencia creating COUNTER CoP R5 metrics"); + createCoPR5TablesForLareferencia(); + logger.info("LaReferencia created COUNTER CoP R5 metrics"); + +// logger.info("LaReferencia updating Production Tables"); +// updateProdTables(); +// logger.info("LaReferencia updated Production Tables"); + + } catch (Exception e) { + logger.error("Failed to process logs: " + e); + throw new Exception("Failed to process logs: " + e.toString(), e); + } + } + + public void viewsStats() throws Exception { + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating la_result_views_monthly_tmp view"); + String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS " + + + "SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' " + + "THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='action' and " + + "(source_item_type='oaItem' or source_item_type='repItem') " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " + + "source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created la_result_views_monthly_tmp view"); + + logger.info("Dropping la_views_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".la_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Dropped la_views_stats_tmp table"); + + logger.info("Creating la_views_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp " + + "AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.oid AND p.id=ro.oid " + + "GROUP BY d.id, ro.id, month " + + "ORDER BY d.id, ro.id, month"; + stmt.executeUpdate(sql); + logger.info("Created la_views_stats_tmp table"); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + + private void downloadsStats() throws Exception { + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating la_result_downloads_monthly_tmp view"); + String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + ".la_result_downloads_monthly_tmp AS " + + "SELECT entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' " + + "THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='download' and " + + "(source_item_type='oaItem' or source_item_type='repItem') " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " + + "source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created la_result_downloads_monthly_tmp view"); + + logger.info("Dropping la_downloads_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".la_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Dropped la_downloads_stats_tmp table"); + + logger.info("Creating la_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp " + + "AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(downloads) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_downloads_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.oid AND p.id=ro.oid " + + "GROUP BY d.id, ro.id, month " + + "ORDER BY d.id, ro.id, month"; + stmt.executeUpdate(sql); + logger.info("Created la_downloads_stats_tmp table"); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + + private void createCoPR5TablesForLareferencia() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Unique Item Investigations + + logger.info("Create View Unique_Item_Investigations"); + String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + ".lr_view_unique_item_investigations " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_investigations, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog " + + "WHERE (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View Unique_Item_Investigations"); + + logger.info("Drop Table Unique_Item_Investigations"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations "; + stmt.executeUpdate(sql); + logger.info("Dropped Table Unique_Item_Investigations"); + + logger.info("Create Table tbl_unique_item_investigations"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(unique_item_investigations) AS unique_item_investigations, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_investigations p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_unique_item_investigations"); + + // Total Item Investigations + + logger.info("Create View lr_view_total_item_investigations"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_investigations " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) AS total_item_investigations, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog " + + "WHERE (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View lr_view_total_item_investigations"); + + logger.info("Drop Table lr_tbl_total_item_investigations"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations "; + stmt.executeUpdate(sql); + logger.info("Dropped Table lr_tbl_total_item_investigations"); + + logger.info("Create Table lr_tbl_total_item_investigations"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(total_item_investigations) AS total_item_investigations, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_investigations p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table lr_tbl_total_item_investigations"); + + // Unique Item Requests + + logger.info("Create View lr_view_unique_item_requests"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_requests AS " + + "SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_requests, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog " + + "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View lr_view_unique_item_requests"); + + logger.info("Drop Table Unique_Item_Requests"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests "; + stmt.executeUpdate(sql); + logger.info("Dropped Table Unique_Item_Requests"); + + logger.info("Create Table lr_tbl_unique_item_requests"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(unique_item_requests) AS unique_item_requests, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_requests p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table lr_tbl_unique_item_requests"); + + // Total Item Requests + + logger.info("Create View lr_view_total_item_requests"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_requests " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) AS total_item_requests, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog " + + "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View lr_view_total_item_requests"); + + logger.info("Drop Table lr_tbl_total_item_requests"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests "; + stmt.executeUpdate(sql); + logger.info("Dropped Table lr_tbl_total_item_requests"); + + logger.info("Create Table lr_tbl_total_item_requests"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(total_item_requests) AS total_item_requests, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_requests p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table lr_tbl_total_item_requests"); + + // All CoP R5 metrics Table + logger.info("Drop Table lr_tbl_all_r5_metrics"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_all_r5_metrics "; + stmt.executeUpdate(sql); + logger.info("Dropped Table lr_tbl_all_r5_metrics"); + + logger.info("Create Table lr_tbl_all_r5_metrics"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_all_r5_metrics as " + + "WITH tmp1 as (SELECT coalesce(ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(vs.unique_item_investigations, 0) as unique_item_investigations, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations AS vs " + + "FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations AS ds " + + " ON ds.source=vs.source AND ds.result_id=vs.result_id AND ds.date=vs.date), " + + "tmp2 AS (select coalesce (ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations, " + + "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, " + + " coalesce(vs.unique_item_requests, 0) as unique_item_requests FROM tmp1 " + + "AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests AS vs " + + "ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date) " + + "SELECT 'LaReferencia' as source, coalesce (ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations, " + + "coalesce(ds.unique_item_requests, 0) as unique_item_requests, " + + "coalesce(vs.total_item_requests, 0) as total_item_requests " + + "FROM tmp2 AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests " + + "AS vs ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date"; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_all_r5_metrics"); + + stmt.close(); + ConnectDB.getHiveConnection().close(); + + } + +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java new file mode 100755 index 000000000..d20f37363 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java @@ -0,0 +1,1112 @@ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class PiwikStatsDB { + + private String logPath; + + private Statement stmt = null; + + private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class); + + public PiwikStatsDB() throws Exception { + + } + + public void recreateDBAndTables() throws Exception { + this.createDatabase(); + // The piwiklog table is not needed since it is built + // on top of JSON files + //////////// this.createTmpTables(); + } + + private void createDatabase() throws Exception { + + try { + + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); + String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE"; + stmt.executeUpdate(dropDatabase); + } catch (Exception e) { + logger.error("Failed to drop database: " + e); + throw new Exception("Failed to drop database: " + e.toString(), e); + } + + try { + + logger.info("Creating usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); + String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema(); + stmt.executeUpdate(createDatabase); + logger.info("Usagestats DB created: " + ConnectDB.getUsageStatsDBSchema()); + + } catch (Exception e) { + logger.error("Failed to create database: " + e); + throw new Exception("Failed to create database: " + e.toString(), e); + } + + try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Creating permanent usagestats DB: " + ConnectDB.getUsagestatsPermanentDBSchema()); + String createPermanentDatabase = "CREATE DATABASE IF NOT EXISTS " + + ConnectDB.getUsagestatsPermanentDBSchema(); + stmt.executeUpdate(createPermanentDatabase); + logger.info("Created permanent usagestats DB: " + ConnectDB.getUsagestatsPermanentDBSchema()); + + } catch (Exception e) { + logger.error("Failed to create database: " + e); + throw new Exception("Failed to create database: " + e.toString(), e); + } + } + + public void createDistinctPiwikLog() throws Exception { + logger.info("Initialising DB properties"); + ConnectDB.init(); + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping piwiklogdistinct"); + String sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct"; + stmt.executeUpdate(sql); + logger.info("Dropped piwiklogdistinct"); + + logger.info("Creating piwiklogdistinct table"); + // Create Piwiklogdistinct table - This table should exist + String sqlCreateTablePiwikLogDistinct = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogdistinct(source INT, id_visit STRING, country STRING, action STRING, url STRING, " + + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + + "clustered by (source, id_visit, action, timestamp, entity_id) " + + "into 100 buckets stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(sqlCreateTablePiwikLogDistinct); + logger.info("Created piwiklogdistinct table"); + + logger.info("Inserting data to piwiklogdistinct"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "SELECT DISTINCT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".piwiklog WHERE entity_id is not null"; + stmt.executeUpdate(sql); + logger.info("Inserted data to piwiklogdistinct"); + } + + public void processLogs() throws Exception { + try { + + logger.info("ViewsStats processing starts at: " + new Timestamp(System.currentTimeMillis())); + viewsStats(); + logger.info("ViewsStats processing ends at: " + new Timestamp(System.currentTimeMillis())); + + logger.info("DownloadsStats processing starts at: " + new Timestamp(System.currentTimeMillis())); + downloadsStats(); + logger.info("DownloadsStats processing ends at: " + new Timestamp(System.currentTimeMillis())); + + logger.info("COUNTER CoP R5 metrics processing starts at: " + new Timestamp(System.currentTimeMillis())); + createCoPR5Tables(); + logger.info("COUNTER CoP R5 metrics processing ends at: " + new Timestamp(System.currentTimeMillis())); + + } catch (Exception e) { + logger.error("Failed to process logs: " + e); + throw new Exception("Failed to process logs: " + e.toString(), e); + } + } + + public void processEpisciencesLogs() throws Exception { + try { + + logger.info("Views Episciences processing starts at: " + new Timestamp(System.currentTimeMillis())); + episciencesViewsStats(); + logger.info("Views Episciences processing ends at: " + new Timestamp(System.currentTimeMillis())); + + logger.info("downloads Episciences processing starts at: " + new Timestamp(System.currentTimeMillis())); + episciencesDownloadsStats(); + logger.info("Downloads Episciences processing ends at: " + new Timestamp(System.currentTimeMillis())); + + } catch (Exception e) { + logger.error("Failed to process logs: " + e); + throw new Exception("Failed to process logs: " + e.toString(), e); + } + } + + public void viewsStats() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping openaire_result_views_monthly_tmp view"); + String drop_result_views_monthly = "DROP VIEW IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".openaire_piwikresult_views_monthly_tmp"; + stmt.executeUpdate(drop_result_views_monthly); + logger.info("Dropped openaire_result_views_monthly_tmp view"); + + logger.info("Creating openaire_result_views_monthly_tmp view"); + String create_result_views_monthly = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + ".openaire_result_views_monthly_tmp " + + "AS SELECT entity_id, " + + "reflect('java.net.URLDecoder', 'decode', entity_id) AS id," + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) " + + "AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogdistinct where action='action' and (source_item_type='oaItem' or " + + "source_item_type='repItem') " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " + + "source ORDER BY source, entity_id"; + stmt.executeUpdate(create_result_views_monthly); + logger.info("Created openaire_result_views_monthly_tmp table"); + + logger.info("Dropping openaire_views_stats_tmp table"); + String drop_views_stats = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".openaire_views_stats_tmp"; + stmt.executeUpdate(drop_views_stats); + logger.info("Dropped openaire_views_stats_tmp table"); + + logger.info("Creating openaire_views_stats_tmp table"); + String create_views_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".openaire_views_stats_tmp " + + "AS SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' AND ro.oid!='400' AND ro.oid!='503' " + + "AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month " + + "ORDER BY d.id, ro.id, month "; + stmt.executeUpdate(create_views_stats); + logger.info("Created openaire_views_stats_tmp table"); + + logger.info("Creating openaire_pageviews_stats_tmp table"); + String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".openaire_pageviews_stats_tmp AS SELECT " + + "'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=" + ExecuteWorkflow.portalMatomoID + + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' AND ro.oid!='400' AND ro.oid!='503' " + + "AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month " + + "ORDER BY d.id, ro.id, month "; + stmt.executeUpdate(create_pageviews_stats); + logger.info("Created pageviews_stats table"); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + + private void downloadsStats() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping openaire_result_downloads_monthly_tmp view"); + String drop_result_downloads_monthly = "DROP VIEW IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".openaire_result_downloads_monthly_tmp"; + stmt.executeUpdate(drop_result_downloads_monthly); + logger.info("Dropped openaire_result_downloads_monthly_tmp view"); + + logger.info("Creating openaire_result_downloads_monthly_tmp view"); + String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + ".openaire_result_downloads_monthly_tmp " + + "AS SELECT entity_id, " + + "reflect('java.net.URLDecoder', 'decode', entity_id) AS id," + + "COUNT(entity_id) as downloads, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct where action='download' " + + "AND (source_item_type='oaItem' OR source_item_type='repItem') " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) , source " + + "ORDER BY source, entity_id, month"; + stmt.executeUpdate(sql); + logger.info("Created openaire_result_downloads_monthly_tmp view"); + + logger.info("Dropping openaire_downloads_stats_tmp table"); + String drop_views_stats = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".openaire_downloads_stats_tmp"; + stmt.executeUpdate(drop_views_stats); + logger.info("Dropped openaire_downloads_stats_tmp table"); + + logger.info("Creating openaire_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp AS " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(downloads) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' AND ro.oid!='400' AND ro.oid!='503' " + + "AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month " + + "ORDER BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created downloads_stats table"); + + logger.info("Dropping openaire_result_downloads_monthly_tmp view"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp"; + logger.info("Dropped openaire_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + + public void uploadOldPedocs() throws Exception { + stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Dropping Pedocs pedocs_views_stats_tmp table + logger.info("Dropping Pedocs pedocs_views_stats_tmp table"); + String sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_views_stats_tmp"; + logger.info("Dropped pedocs_views_stats_tmp table "); + stmt.executeUpdate(sql); + + // Dropping Pedocs pedocs_downloads_stats table + logger.info("Dropping pedocs_downloads_stats table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_downloads_stats"; + logger.info("Dropped pedocs_downloads_stats table "); + stmt.executeUpdate(sql); + + // Creating Pedocs pedocs_views_stats_tmp table + logger.info("Creating Pedocs pedocs_views_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_views_stats_tmp AS " + + "SELECT 'OpenAIRE' as source, 'opendoar____::ab1a4d0dd4d48a2ba1077c4494791306' as repository_id," + + "r.id as result_id,date,counter_abstract as count, 0 as openaire " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".pedocsoldviews p, " + ConnectDB.getStatsDBSchema() + + ".result_oids r where r.oid=p.identifier"; + stmt.executeUpdate(sql); + logger.info("Created pedocs_views_stats_tmp table "); + + // Creating Pedocs pedocs_downloads_stats_tmp table + logger.info("Creating Pedocs pedocs_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_downloads_stats_tmp AS " + + "SELECT 'OpenAIRE' as source, 'opendoar____::ab1a4d0dd4d48a2ba1077c4494791306' as repository_id," + + "r.id as result_id, date, counter as count, 0 as openaire " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".pedocsolddownloads p, " + ConnectDB.getStatsDBSchema() + + ".result_oids r where r.oid=p.identifier"; + stmt.executeUpdate(sql); + logger.info("Created pedocs_downloads_stats_tmp table "); + + } + + public void uploadTUDELFTStats() throws Exception { + stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Dropping TUDELFT tudelft_result_views_monthly_tmp view + logger.info("Dropping TUDELFT tudelft_result_views_monthly_tmp view"); + String sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_views_monthly_tmp"; + logger.info("Dropped tudelft_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping TUDELFT tudelft_result_views_monthly_tmp view + logger.info("Dropping TUDELFT tudelft_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_downloads_monthly_tmp"; + logger.info("Dropped tudelft_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping TUDELFT tudelft_views_stats_tmp table + logger.info("Dropping TUDELFT tudelft_views_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_views_stats_tmp"; + logger.info("Dropped tudelft_views_stats_tmp table "); + stmt.executeUpdate(sql); + + // Dropping TUDELFT tudelft_downloads_stats_tmp table + logger.info("Dropping TUDELFT tudelft_downloads_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_downloads_stats_tmp"; + logger.info("Dropped tudelft_downloads_stats_tmp table "); + stmt.executeUpdate(sql); + + // Creating TUDELFT tudelft_result_views_monthly_tmp view + logger.info("Creating TUDELFT tudelft_result_views_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_views_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='action' and (source_item_type='oaItem' or source_item_type='repItem') and source=252 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created tudelft_result_views_monthly_tmp view "); + + // Creating TUDELFT tudelft_views_stats_tmp table + logger.info("Creating TUDELFT tudelft_views_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_views_stats_tmp AS " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".tudelft_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE concat('tud:',p.id)=ro.oid and d.id='opendoar____::c9892a989183de32e976c6f04e700201' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created TUDELFT tudelft_views_stats_tmp table"); + + // Creating TUDELFT tudelft_result_downloads_monthly_tmp view + logger.info("Creating TUDELFT tudelft_result_downloads_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_downloads_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='download' and (source_item_type='oaItem' or source_item_type='repItem') and source=252 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created tudelft_result_downloads_monthly_tmp view "); + + // Creating TUDELFT tudelft_downloads_stats_tmp table + logger.info("Creating TUDELFT tudelft_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_downloads_stats_tmp AS " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".tudelft_result_downloads_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE concat('tud:',p.id)=ro.oid and d.id='opendoar____::c9892a989183de32e976c6f04e700201' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created TUDELFT tudelft_downloads_stats_tmp table"); + + // Dropping TUDELFT tudelft_result_views_monthly_tmp view + logger.info("Dropping TUDELFT tudelft_result_views_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_views_monthly_tmp"; + logger.info("Dropped tudelft_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping TUDELFT tudelft_result_views_monthly_tmp view + logger.info("Dropping TUDELFT tudelft_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_result_downloads_monthly_tmp"; + logger.info("Dropped tudelft_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + } + + public void uploadB2SHAREStats() throws Exception { + stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view"); + String sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp"; + logger.info("Dropped b2share_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping b2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp"; + logger.info("Dropped b2share_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_views_stats_tmp table + logger.info("Dropping B2SHARE b2share_views_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp"; + logger.info("Dropped b2share_views_stats_tmp table "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_downloads_stats_tmp table + logger.info("Dropping B2SHARE b2share_downloads_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp"; + logger.info("Dropped b2share_downloads_stats_tmp table "); + stmt.executeUpdate(sql); + + // Creating B2SHARE b2share_result_views_monthly_tmp view + logger.info("Creating B2SHARE b2share_result_views_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='action' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created b2share_result_views_monthly_tmp view "); + + // Creating B2SHARE b2share_views_stats_tmp table + logger.info("Creating B2SHARE b2share_views_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp AS " + + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".b2share_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created B2SHARE b2share_views_stats_tmp table"); + + // Creating B2SHARE b2share_result_downloads_monthly_tmp view + logger.info("Creating B2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='download' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created b2share_result_downloads_monthly_tmp view "); + + // Creating B2SHARE b2share_downloads_stats_tmp table + logger.info("Creating B2SHARE b2share_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp AS " + + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".b2share_result_downloads_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created B2SHARE b2share_downloads_stats_tmp table"); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp"; + logger.info("Dropped b2share_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp"; + logger.info("Dropped b2share_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + } + + public void episciencesViewsStats() throws Exception { + logger.info("Creating episciences Views"); + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping Episcience Views Table"); + String dropEpisciencesViewsTable = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesviews "; + stmt.executeUpdate(dropEpisciencesViewsTable); + logger.info("Dropped Episcience Views Table"); + + logger.info("Creating Episcience Views Table"); + String createEpisciencesViewsTable = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesviews (source STRING, repository_id STRING, result_id STRING, date STRING, count INT, openaire INT)" + + " clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true') "; + + stmt.executeUpdate(createEpisciencesViewsTable); + + String returnEpisciencesJournals = "SELECT id, substring(regexp_extract(websiteurl,'^([^\\.]+)\\.?',1),9) FROM " + + ConnectDB.getStatsDBSchema() + + ".datasource where websiteurl like '%episciences%' and (dateofvalidation is not null or harvested=true)"; + + PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION + .prepareStatement(returnEpisciencesJournals); + ResultSet rs = st.executeQuery(); + while (rs.next()) { + String journal_openaire_id = rs.getString(1); + String episciencesSuffix = rs.getString(2); + + logger.info("Working on journal_id:" + journal_openaire_id + " suffix:" + episciencesSuffix); + logger.info("Dropping episciencesSuffix_result_views_monthly_tmp table"); + String dropepisciencesSuffixView = "DROP VIEW " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_views_monthly_tmp"; + // Statement stmtRS = ConnectDB.getHiveConnection().createStatement(); + stmt.executeUpdate(dropepisciencesSuffixView); + logger.info("Dropped episciencesSuffix_result_views_monthly_tmp table"); + + logger.info("Creating episciencesSuffix_result_views_monthly_tmp table"); + + String create_result_views_monthly = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_views_monthly_tmp " + + "AS SELECT entity_id, " + + "reflect('java.net.URLDecoder', 'decode', entity_id) AS id," + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) " + + "AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + + ".episcienceslog where action='action' and (source_item_type='oaItem' or " + + "source_item_type='repItem') and entity_id like '%" + episciencesSuffix + "%'" + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " + + "source ORDER BY source, entity_id"; + + stmt.executeUpdate(create_result_views_monthly); + logger.info("Created episciencesSuffix_result_views_monthly_tmp table"); + + logger.info("Inserting episciencesSuffix_result_views_monthly_tmp into EpisciencesViews Table"); + String insertIntoEpisciencesViewsTable = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesviews SELECT 'Episciences' as source, '" + + journal_openaire_id + "' as repository_id, ro.id as result_id, month as date," + + " max(views) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_views_monthly_tmp p," + + ConnectDB.getStatsDBSchema() + + ".result_oids ro WHERE p.id=ro.oid GROUP BY ro.id, month ORDER BY ro.id, month"; + + stmt.executeUpdate(insertIntoEpisciencesViewsTable); + logger.info("Inserted episciencesSuffix_result_views_monthly_tmp into EpisciencesViews Table"); + + stmt.executeUpdate(dropepisciencesSuffixView); + logger.info("Dropped episciencesSuffix_result_views_monthly_tmp view"); + } + rs.close(); + + logger.info("Episciences Views Created"); + } + + public void episciencesDownloadsStats() throws Exception { + logger.info("Creating episciences Downloads"); + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping Episcience Downloads Table"); + String dropEpisciencesDownloadsTable = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesvdownloads "; + stmt.executeUpdate(dropEpisciencesDownloadsTable); + logger.info("Dropped Episcience Downloads Table"); + + logger.info("Creating Episcience Downloads Table"); + String createEpisciencesDownloadsTable = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesdownloads (source STRING, repository_id STRING, result_id STRING, date STRING, count INT, openaire INT)" + + " clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true') "; + + stmt.executeUpdate(createEpisciencesDownloadsTable); + + String returnEpisciencesJournals = "SELECT id, substring(regexp_extract(websiteurl,'^([^\\.]+)\\.?',1),9) FROM " + + ConnectDB.getStatsDBSchema() + + ".datasource where websiteurl like '%episciences%' and (dateofvalidation is not null or harvested=true)"; + + PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION + .prepareStatement(returnEpisciencesJournals); + ResultSet rs = st.executeQuery(); + while (rs.next()) { + String journal_openaire_id = rs.getString(1); + String episciencesSuffix = rs.getString(2); + + logger.info("Working on journal_id:" + journal_openaire_id + " suffix:" + episciencesSuffix); + logger.info("Dropping episciencesSuffix_result_downloads_monthly_tmp table"); + String dropepisciencesSuffixDownloads = "DROP VIEW IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_downloads_monthly_tmp"; + stmt.executeUpdate(dropepisciencesSuffixDownloads); + + logger.info("Creating episciencesSuffix_result_downloads_monthly_tmp table"); + + String create_result_downloads_monthly = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_downloads_monthly_tmp " + + "AS SELECT entity_id, " + + "reflect('java.net.URLDecoder', 'decode', entity_id) AS id," + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) " + + "AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + + ".episcienceslog where action='download' and (source_item_type='oaItem' or " + + "source_item_type='repItem') and entity_id like '%" + episciencesSuffix + "%'" + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " + + "source ORDER BY source, entity_id"; + + stmt.executeUpdate(create_result_downloads_monthly); + logger.info("Created episciencesSuffix_result_downloads_monthly_tmp table"); + + logger.info("Inserting episciencesSuffix_result_downloads_monthly_tmp into EpisciencesDownloadsTable"); + String insertIntoEpisciencesDownloadsTable = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + + ".episciencesdownloads SELECT 'Episciences' as source, '" + + journal_openaire_id + "' as repository_id, ro.id as result_id, month as date," + + " max(views) AS count, max(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + + "." + episciencesSuffix.replace("-", "_") + "_result_downloads_monthly_tmp p," + + ConnectDB.getStatsDBSchema() + + ".result_oids ro WHERE p.id=ro.oid GROUP BY ro.id, month ORDER BY ro.id, month"; + + stmt.executeUpdate(insertIntoEpisciencesDownloadsTable); + logger.info("Inserted episciencesSuffix_result_downloads_monthly_tmp into EpisciencesDownloadsTable"); + + stmt.executeUpdate(dropepisciencesSuffixDownloads); + logger.info("Dropped episciencesSuffix_result_downloads_monthly_tmp view"); + + } + rs.close(); + } + + private void createCoPR5Tables() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Unique Item Investigations +//REMOVE sessionid from total + logger.info("Create View Unique_Item_Investigations"); + String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".view_unique_item_investigations " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_investigations, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View Unique_Item_Investigations"); + + logger.info("Drop Table Unique_Item_Investigations"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_investigations "; + stmt.executeUpdate(sql); + logger.info("Dropped Table Unique_Item_Investigations"); + + logger.info("Create Table tbl_unique_item_investigations"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_investigations as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(unique_item_investigations) AS unique_item_investigations, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_unique_item_investigations p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_unique_item_investigations"); + + // Total Item Investigations + + logger.info("Create View view_total_item_investigations"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_investigations " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) AS total_item_investigations, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View view_total_item_investigations"); + + logger.info("Drop Table tbl_total_item_investigations"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_investigations "; + stmt.executeUpdate(sql); + logger.info("Dropped Table tbl_total_item_investigations"); + + logger.info("Create Table tbl_total_item_investigations"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_investigations AS " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(total_item_investigations) AS total_item_investigations, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_investigations p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_total_item_investigations"); + + // Unique Item Requests + + logger.info("Create View view_unique_item_requests"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".view_unique_item_requests AS " + + "SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_requests, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View view_unique_item_requests"); + + logger.info("Drop Table Unique_Item_Requests"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_requests "; + stmt.executeUpdate(sql); + logger.info("Dropped Table Unique_Item_Requests"); + + logger.info("Create Table tbl_unique_item_requests"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_requests as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(unique_item_requests) AS unique_item_requests, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_unique_item_requests p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_unique_item_requests"); + + // Total Item Requests + + logger.info("Create View view_total_item_requests"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_requests " + + "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) AS total_item_requests, " + + "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogdistinct " + + "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') " + + "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source "; + stmt.executeUpdate(sql); + logger.info("Created View view_total_item_requests"); + + logger.info("Drop Table tbl_total_item_requests"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_requests "; + stmt.executeUpdate(sql); + logger.info("Dropped Table tbl_total_item_requests"); + + logger.info("Create Table tbl_total_item_requests"); + sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_requests as " + + "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "sum(total_item_requests) AS total_item_requests, sum(openaire_referrer) AS openaire " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_requests p, " + + ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' " + + "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + + "GROUP BY d.id, ro.id, month "; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_total_item_requests"); + + // All CoP R5 metrics Table + logger.info("Drop Table tbl_all_r5_metrics"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".tbl_all_r5_metrics "; + stmt.executeUpdate(sql); + logger.info("Dropped Table tbl_all_r5_metrics"); + + logger.info("Create Table tbl_all_r5_metrics"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".tbl_all_r5_metrics as " + + "WITH tmp1 as (SELECT coalesce(ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(vs.unique_item_investigations, 0) as unique_item_investigations, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_investigations AS vs " + + "FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_investigations AS ds " + + " ON ds.source=vs.source AND ds.result_id=vs.result_id AND ds.date=vs.date), " + + "tmp2 AS (select coalesce (ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations, " + + "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, " + + " coalesce(vs.unique_item_requests, 0) as unique_item_requests FROM tmp1 " + + "AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".tbl_unique_item_requests AS vs " + + "ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date) " + + "SELECT 'OpenAIRE' as source, coalesce (ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, " + + "coalesce(ds.total_item_investigations, 0) as total_item_investigations, " + + "coalesce(ds.unique_item_requests, 0) as unique_item_requests, " + + "coalesce(vs.total_item_requests, 0) as total_item_requests " + + "FROM tmp2 AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".tbl_total_item_requests " + + "AS vs ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date"; + stmt.executeUpdate(sql); + logger.info("Created Table tbl_all_r5_metrics"); + stmt.close(); + ConnectDB.getHiveConnection().close(); + + } + + public void finalizeStats() throws Exception { + stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Dropping views_stats table + logger.info("Dropping views_stats table"); + String sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".views_stats"; + logger.info("Dropped views_stats table "); + stmt.executeUpdate(sql); + + // Dropping downloads_stats table + logger.info("Dropping downloads_stats table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats"; + logger.info("Dropped downloads_stats table "); + stmt.executeUpdate(sql); + + // Dropping page_views_stats table + logger.info("Dropping pageviews_stats table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats"; + logger.info("Dropped pageviews_stats table "); + stmt.executeUpdate(sql); + + // Dropping usage_stats table + logger.info("Dropping usage_stats table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats"; + logger.info("Dropped usage_stats table "); + stmt.executeUpdate(sql); + + // Creating views_stats table + logger.info("Creating views_stats table"); + String createViewsStats = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".views_stats " + + "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_views_stats_tmp STORED AS PARQUET"; + stmt.executeUpdate(createViewsStats); + logger.info("Created views_stats table"); + + // Inserting OpenAIRE views stats + logger.info("Inserting Openaire data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Openaire views updated to views_stats"); + + // Inserting Episciences views stats + logger.info("Inserting Episciences data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".episciencesviews"; + stmt.executeUpdate(sql); + logger.info("Episciences views updated to views_stats"); + + // Inserting Pedocs old views stats + logger.info("Inserting Pedocs old data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Pedocs views updated to views_stats"); + + // Inserting TUDELFT views stats + logger.info("Inserting TUDELFT data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("TUDELFT views updated to views_stats"); + + // Inserting Lareferencia views stats + logger.info("Inserting LaReferencia data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("LaReferencia views updated to views_stats"); + + // Inserting B2SHARE views stats + logger.info("Inserting B2SHARE data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("B2SHARE views updated to views_stats"); + + logger.info("Creating downloads_stats table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats " + + "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp STORED AS PARQUET"; + stmt.executeUpdate(createDownloadsStats); + logger.info("Created downloads_stats table"); + + // Inserting OpenAIRE downloads stats + logger.info("Inserting OpenAIRE data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Inserted OpenAIRE data to downloads_stats"); + + // Inserting Episciences views stats + logger.info("Inserting Episciences data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".episciencesdownloads"; + stmt.executeUpdate(sql); + logger.info("Episciences downloads updated to downloads_stats"); + + // Inserting Pedocs old downloads stats + logger.info("Inserting PeDocs old data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".pedocs_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Inserted Pedocs data to downloads_stats"); + + // Inserting TUDELFT downloads stats + logger.info("Inserting TUDELFT data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Inserted TUDELFT data to downloads_stats"); + + // Inserting B2SHARE downloads stats + logger.info("Inserting B2SHARE data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Inserted B2SHARE data to downloads_stats"); + // Inserting Lareferencia downloads stats + logger.info("Inserting LaReferencia data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Lareferencia downloads updated to downloads_stats"); + + // Inserting IRUS downloads stats + logger.info("Inserting IRUS data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("IRUS downloads updated to downloads_stats"); + + // Inserting IRUS_R5 downloads stats + logger.info("Inserting IRUS_R5 views to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT source, repository_id, result_id, `date`, views, openaire FROM " + + ConnectDB.getUsageStatsDBSchema() + + ".irus_R5_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("IRUS_R5 views updated to views_stats"); + + // Inserting IRUS_R5 downloads stats + logger.info("Inserting IRUS_R5 data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT source, repository_id, result_id, `date`, downloads, openaire FROM " + + ConnectDB.getUsageStatsDBSchema() + + ".irus_R5_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("IRUS_R5 downloads updated to downloads_stats"); + + // Inserting SARC-OJS downloads stats + logger.info("Inserting SARC data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("SARC-OJS downloads updated to downloads_stats"); + + // Inserting Datacite views stats + logger.info("Inserting Datacite views to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + logger.info("Datacite views updated to views_stats"); + + // Inserting Datacite downloads stats + logger.info("Inserting Datacite downloads to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Datacite downloads updated to downloads_stats"); + + logger.info("Creating pageviews_stats table"); + String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".pageviews_stats " + + "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_pageviews_stats_tmp STORED AS PARQUET"; + stmt.executeUpdate(create_pageviews_stats); + logger.info("Created pageviews_stats table"); + + // Inserting OpenAIRE views stats from Portal + logger.info("Inserting data to page_views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_pageviews_stats_tmp"; + stmt.executeUpdate(sql); + + logger.info("Dropping full_dates table"); + String dropFullDates = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".full_dates"; + stmt.executeUpdate(dropFullDates); + logger.info("Dropped full_dates table"); + + Calendar startCalendar = Calendar.getInstance(); + startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01")); + Calendar endCalendar = Calendar.getInstance(); + int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR); + int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH); + + logger.info("Creating full_dates table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".full_dates AS " + + "SELECT from_unixtime(unix_timestamp(cast(add_months(from_date,i) AS DATE)), 'yyyy/MM') AS txn_date " + + "FROM (SELECT DATE '2016-01-01' AS from_date) p " + + "LATERAL VIEW " + + "posexplode(split(space(" + diffMonth + "),' ')) pe AS i,x"; + stmt.executeUpdate(sql); + logger.info("Created full_dates table"); + + logger.info("Inserting data to usage_stats"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats AS " + + "SELECT coalesce(ds.source, vs.source) as source, " + + "coalesce(ds.repository_id, vs.repository_id) as repository_id, " + + "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " + + "coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " + + "coalesce(ds.openaire, 0) as openaire_downloads, " + + "coalesce(vs.openaire, 0) as openaire_views " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats AS ds FULL OUTER JOIN " + + ConnectDB.getUsageStatsDBSchema() + ".views_stats AS vs ON ds.source=vs.source " + + "AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date"; + stmt.executeUpdate(sql); + logger.info("Inserted data to usage_stats"); + + // Inserting LaReferencia CoP R5 Metrics + logger.info("Inserting Lareferencia data to tbl_all_r5_metrics"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".tbl_all_r5_metrics " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_all_r5_metrics"; + stmt.executeUpdate(sql); + + // Inserting IRUS-UK CoP R5 Metrics + logger.info("Inserting IRUS-UK data into tbl_all_r5_metrics"); + String insertΡ5Stats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".tbl_all_r5_metrics " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, " + + "s.unique_item_investigations , s.total_item_investigations, " + + "s.unique_item_requests, s.total_item_requests " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog_cop_r5 s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE s.repository=d.oid AND s.rid=ro.oid AND s.source='IRUS-UK'"; + stmt.executeUpdate(insertΡ5Stats); + logger.info("Inserted IRUS-UK data into tbl_all_r5_metrics"); + + logger.info("Building views at permanent DB starts at: " + new Timestamp(System.currentTimeMillis())); + + logger.info("Dropping view views_stats on permanent usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".views_stats"; + stmt.executeUpdate(sql); + logger.info("Dropped view views_stats on permanent usagestats DB"); + + logger.info("Create view views_stats on permanent usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".views_stats" + + " AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".views_stats"; + stmt.executeUpdate(sql); + logger.info("Created view views_stats on permanent usagestats DB"); + + logger.info("Dropping view pageviews_stats on permanent usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".pageviews_stats"; + stmt.executeUpdate(sql); + logger.info("Dropped view pageviews_stats on permanent usagestats DB"); + + logger.info("Create view pageviews_stats on permanent usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".pageviews_stats" + + " AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats"; + stmt.executeUpdate(sql); + logger.info("Created view pageviews_stats on permanent usagestats DB"); + + logger.info("Dropping view downloads_stats on permanent usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".downloads_stats"; + stmt.executeUpdate(sql); + logger.info("Dropped view on downloads_stats on permanent usagestats DB"); + + logger.info("Create view on downloads_stats on permanent usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".downloads_stats" + + " AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats"; + stmt.executeUpdate(sql); + logger.info("Created view on downloads_stats on permanent usagestats DB"); + + logger.info("Dropping view usage_stats on permanent usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".usage_stats"; + stmt.executeUpdate(sql); + logger.info("Dropped view on usage_stats on permanent usagestats DB"); + + logger.info("Create view on usage_stats on permanent usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".usage_stats" + + " AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats"; + stmt.executeUpdate(sql); + logger.info("Created view on usage_stats on permanent usagestats DB"); + + logger.info("Dropping view COUNTER_R5_Metrics on permanent usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + ".counter_r5_stats_with_metrics"; + stmt.executeUpdate(sql); + logger.info("Dropped view COUNTER_R5_Metrics on permanent usagestats DB"); + + logger.info("Create view on COUNTER_R5_Metrics on permanent usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getUsagestatsPermanentDBSchema() + + ".counter_r5_stats_with_metrics" + + " AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".tbl_all_r5_metrics"; + stmt.executeUpdate(sql); + logger.info("Created view on COUNTER_R5_Metrics on permanent usagestats DB"); + + logger.info("Building views at permanent DB ends at: " + new Timestamp(System.currentTimeMillis())); + + stmt.close(); + ConnectDB.getHiveConnection().close(); + } + + private Connection getConnection() throws SQLException { + return ConnectDB.getHiveConnection(); + } +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java new file mode 100755 index 000000000..880233f00 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java @@ -0,0 +1,107 @@ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.io.*; +// import java.io.BufferedReader; +// import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class SarcStats { + + private Statement stmtHive = null; + private Statement stmtImpala = null; + + private static final Logger logger = LoggerFactory.getLogger(SarcStats.class); + + public SarcStats() throws Exception { +// createTables(); + } + + private void createTables() throws Exception { + try { + + stmtHive = ConnectDB.getHiveConnection().createStatement(); + String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));"; + stmtHive.executeUpdate(sqlCreateTableSushiLog); + + // String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;"; + // stmt.executeUpdate(sqlCopyPublicSushiLog); + String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " + + " ON INSERT TO sushilog " + + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository," + + "sushilog.rid, sushilog.date " + + "FROM sushilog " + + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;"; + stmtHive.executeUpdate(sqlcreateRuleSushiLog); + String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);"; + stmtHive.executeUpdate(createSushiIndex); + + stmtHive.close(); + ConnectDB.getHiveConnection().close(); + logger.info("Sushi Tables Created"); + } catch (Exception e) { + logger.error("Failed to create tables: " + e); + throw new Exception("Failed to create tables: " + e.toString(), e); + } + } + + public void processSarc() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating sarc_downloads_stats_tmp table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sarc_downloads_stats_tmp " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`count` bigint, " + + "`openaire` bigint)"; + stmt.executeUpdate(createDownloadsStats); + logger.info("Created sarc_downloads_stats_tmp table"); + + logger.info("Inserting into sarc_downloads_stats_tmp"); + String insertSarcStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_downloads_stats_tmp " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " + + "LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".result_pids ro " + + "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND d.id like CONCAT('%', 'sarcservicod', '%') " + + "AND s.rid=ro.pid AND ro.type='Digital Object Identifier' AND s.metric_type='ft_total' AND s.source='SARC-OJS'"; + stmt.executeUpdate(insertSarcStats); + logger.info("Inserted into sarc_downloads_stats_tmp"); + + stmt.close(); + // ConnectDB.getHiveConnection().close(); + } + +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java new file mode 100755 index 000000000..886ebca23 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java @@ -0,0 +1,137 @@ + +package eu.dnetlib.oa.graph.usagestatsbuild.export; + +import java.io.IOException; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main class for downloading and processing Usage statistics + * + * @author D. Pierrakos, S. Zoupanos + */ +public class UsageStatsExporter { + + public UsageStatsExporter() { + + } + + private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class); + + public void export() throws Exception { + + logger.info("Initialising DB properties"); + ConnectDB.init(); + +// runImpalaQuery(); + PiwikStatsDB piwikstatsdb = new PiwikStatsDB(); + logger.info("Re-creating database and tables"); + if (ExecuteWorkflow.recreateDbAndTables) { + piwikstatsdb.recreateDBAndTables(); + logger.info("DB-Tables are created "); + } +// else { +// piwikstatsdb.createTmpTables(); +// logger.info("TmpTables are created "); +// } + if (ExecuteWorkflow.processPiwikLogs) { + logger.info("Creating distinct piwik log"); + piwikstatsdb.createDistinctPiwikLog(); + logger.info("Processing OpenAIRE logs"); + piwikstatsdb.processLogs(); + logger.info("OpenAIRE logs Done"); + logger.info("Processing Episciences logs"); + piwikstatsdb.processEpisciencesLogs(); + logger.info("Episciences logs Done"); + logger.info("Processing Pedocs Old Stats"); + piwikstatsdb.uploadOldPedocs(); + logger.info("Processing Pedocs Old Stats Done"); + logger.info("Processing TUDELFT Stats"); + piwikstatsdb.uploadTUDELFTStats(); + logger.info("Processing TUDELFT Stats Done"); + logger.info("Processing B2SHARE Stats"); + piwikstatsdb.uploadB2SHAREStats(); + logger.info("Processing B2SHARE Stats Done"); + + } + + LaReferenciaStats lastats = new LaReferenciaStats(); + + if (ExecuteWorkflow.processLaReferenciaLogs) { + logger.info("Processing LaReferencia logs"); + lastats.processLogs(); + logger.info("LaReferencia logs done"); + } + + IrusStats irusstats = new IrusStats(); + + if (ExecuteWorkflow.irusProcessStats) { + logger.info("Processing IRUS"); + irusstats.processIrusStats(); + logger.info("Irus done"); + } + + SarcStats sarcStats = new SarcStats(); + + if (ExecuteWorkflow.sarcProcessStats) { + sarcStats.processSarc(); + } + logger.info("Sarc done"); + + // finalize usagestats + if (ExecuteWorkflow.finalizeStats) { + piwikstatsdb.finalizeStats(); + logger.info("Finalized stats"); + } + + // Make the tables available to Impala + if (ExecuteWorkflow.finalTablesVisibleToImpala) { + logger.info("Making tables visible to Impala"); + invalidateMetadata(); + } + + logger.info("End"); + } + + private void invalidateMetadata() throws SQLException { + Statement stmt = null; + + stmt = ConnectDB.getImpalaConnection().createStatement(); + + String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".downloads_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".views_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".usage_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".pageviews_stats"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".counter_r5_stats_with_metrics"; + stmt.executeUpdate(sql); + + stmt.close(); + ConnectDB.getHiveConnection().close(); + } +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json new file mode 100755 index 000000000..242e5a477 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json @@ -0,0 +1,92 @@ +[ + { + "paramName": "pmi", + "paramLongName": "portalMatomoID", + "paramDescription": "namoNode of the target cluster", + "paramRequired": true + }, + { + "paramName": "dbhu", + "paramLongName": "dbHiveUrl", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "dbiu", + "paramLongName": "dbImpalaUrl", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "urdbs", + "paramLongName": "usageRawDataDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "usdbs", + "paramLongName": "usageStatsDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "sdbs", + "paramLongName": "statsDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "uspdbs", + "paramLongName": "usagestatsPermanentDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "rdbt", + "paramLongName": "recreateDbAndTables", + "paramDescription": "Re-create database and initial tables?", + "paramRequired": true + }, + { + "paramName": "ppwl", + "paramLongName": "processPiwikLogs", + "paramDescription": "Process the piwiklogs (create & fill in the needed tables and process the data) based on the downloaded data", + "paramRequired": true + }, + { + "paramName": "plrl", + "paramLongName": "processLaReferenciaLogs", + "paramDescription": "Process the La Referencia logs (create & fill in the needed tables and process the data) based on the downloaded data", + "paramRequired": true + }, + { + "paramName": "ipr", + "paramLongName": "irusProcessStats", + "paramDescription": "Irus section: Process stats?", + "paramRequired": true + }, + { + "paramName": "ipr", + "paramLongName": "sarcProcessStats", + "paramDescription": "Sarc section: Process stats?", + "paramRequired": true + }, + { + "paramName": "fs", + "paramLongName": "finalizeStats", + "paramDescription": "Create the usage_stats table?", + "paramRequired": true + }, + { + "paramName": "ftvi", + "paramLongName": "finalTablesVisibleToImpala", + "paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala", + "paramRequired": true + }, + { + "paramName": "nodt", + "paramLongName": "numberOfDownloadThreads", + "paramDescription": "Number of download threads", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml new file mode 100755 index 000000000..7310ec70a --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + ${jobTracker} + + + nameNode + ${nameNode} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1;?spark.executor.memory=19166291558;spark.yarn.executor.memoryOverhead=3225;spark.driver.memory=11596411699;spark.yarn.driver.memoryOverhead=1228 + + + impalaJdbcUrl + jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl; + + + oozie.wf.workflow.notification.url + {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status + + + oozie.use.system.libpath + true + + diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml new file mode 100755 index 000000000..488578b24 --- /dev/null +++ b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml @@ -0,0 +1,83 @@ + + + + hiveMetastoreUris + Hive server metastore URIs + + + hiveJdbcUrl + Hive server jdbc url + + + impalaJdbcUrl + Impala server jdbc url + + + + + ${jobTracker} + ${nameNode} + + + hive.metastore.uris + ${hiveMetastoreUris} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + spark.executor.memory + 19166291558 + + + spark.yarn.executor.memoryOverhead + 3225 + + + spark.driver.memory + 11596411699 + + + spark.yarn.driver.memoryOverhead + 1228 + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + eu.dnetlib.oa.graph.usagestatsbuild.export.ExecuteWorkflow + --portalMatomoID${portalMatomoID} + --dbHiveUrl${hiveJdbcUrl} + --dbImpalaUrl${impalaJdbcUrl} + --usageRawDataDBSchema${usageRawDataDBSchema} + --usageStatsDBSchema${usageStatsDBSchema} + --usagestatsPermanentDBSchema${usagestatsPermanentDBSchema} + --statsDBSchema${statsDBSchema} + --recreateDbAndTables${recreateDbAndTables} + --processPiwikLogs${processPiwikLogs} + --processLaReferenciaLogs${processLaReferenciaLogs} + --irusProcessStats${irusProcessStats} + --sarcProcessStats${sarcProcessStats} + --finalizeStats${finalizeStats} + --finalTablesVisibleToImpala${finalTablesVisibleToImpala} + --numberOfDownloadThreads${numberOfDownloadThreads} + + + + + + + +