From 32bf943979f5e111f0841bdeb7c61a2aa7cbeb1b Mon Sep 17 00:00:00 2001 From: Dimitris Date: Mon, 2 Nov 2020 09:08:25 +0200 Subject: [PATCH] Changes to download only updates --- .../oa/graph/usagestats/export/ConnectDB.java | 52 +- .../oa/graph/usagestats/export/IrusStats.java | 600 +++++------ .../export/LaReferenciaDownloadLogs.java | 324 +++--- .../usagestats/export/PiwikDownloadLogs.java | 145 +-- .../graph/usagestats/export/PiwikStatsDB.java | 85 +- .../oa/graph/usagestats/export/SarcStats.java | 929 +++++++++--------- .../usagestats/export/UsageStatsExporter.java | 2 +- 7 files changed, 1072 insertions(+), 1065 deletions(-) diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java index ffc7c74cd..29dd5648b 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ConnectDB.java @@ -78,20 +78,20 @@ public abstract class ConnectDB { */ ComboPooledDataSource cpds = new ComboPooledDataSource(); cpds.setJdbcUrl(dbHiveUrl); - cpds.setAcquireIncrement(1); - cpds.setMaxPoolSize(100); - cpds.setMinPoolSize(1); - cpds.setInitialPoolSize(1); - cpds.setMaxIdleTime(300); - cpds.setMaxConnectionAge(36000); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); - cpds.setAcquireRetryAttempts(5); - cpds.setAcquireRetryDelay(2000); - cpds.setBreakAfterAcquireFailure(false); + cpds.setAcquireRetryAttempts(5); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); - cpds.setCheckoutTimeout(30000); - cpds.setPreferredTestQuery("SELECT 1"); - cpds.setIdleConnectionTestPeriod(60); + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); return cpds.getConnection(); } @@ -103,23 +103,23 @@ public abstract class ConnectDB { */ ComboPooledDataSource cpds = new ComboPooledDataSource(); cpds.setJdbcUrl(dbImpalaUrl); - cpds.setAcquireIncrement(1); - cpds.setMaxPoolSize(100); - cpds.setMinPoolSize(1); - cpds.setInitialPoolSize(1); - cpds.setMaxIdleTime(300); - cpds.setMaxConnectionAge(36000); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); - cpds.setAcquireRetryAttempts(5); - cpds.setAcquireRetryDelay(2000); - cpds.setBreakAfterAcquireFailure(false); + cpds.setAcquireRetryAttempts(5); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); - cpds.setCheckoutTimeout(30000); - cpds.setPreferredTestQuery("SELECT 1"); - cpds.setIdleConnectionTestPeriod(60); + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); - return cpds.getConnection(); + return cpds.getConnection(); } -} +} diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/IrusStats.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/IrusStats.java index 749687ec5..6947381c9 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/IrusStats.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/IrusStats.java @@ -1,4 +1,3 @@ - package eu.dnetlib.oa.graph.usagestats.export; import java.io.*; @@ -28,38 +27,38 @@ import org.slf4j.LoggerFactory; */ public class IrusStats { - private String irusUKURL; + private String irusUKURL; - private static final Logger logger = LoggerFactory.getLogger(IrusStats.class); + private static final Logger logger = LoggerFactory.getLogger(IrusStats.class); - public IrusStats(String irusUKURL) throws Exception { - this.irusUKURL = irusUKURL; - // The following may not be needed - It will be created when JSON tables are created + public IrusStats(String irusUKURL) throws Exception { + this.irusUKURL = irusUKURL; + // The following may not be needed - It will be created when JSON tables are created // createTmpTables(); - } + } - public void reCreateLogDirs() throws Exception { - FileSystem dfs = FileSystem.get(new Configuration()); + public void reCreateLogDirs() throws Exception { + FileSystem dfs = FileSystem.get(new Configuration()); - logger.info("Deleting irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath); - dfs.delete(new Path(ExecuteWorkflow.irusUKReportPath), true); + logger.info("Deleting irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath); + dfs.delete(new Path(ExecuteWorkflow.irusUKReportPath), true); - logger.info("Creating irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath); - dfs.mkdirs(new Path(ExecuteWorkflow.irusUKReportPath)); - } + logger.info("Creating irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath); + dfs.mkdirs(new Path(ExecuteWorkflow.irusUKReportPath)); + } - public void createTables() throws Exception { - try { - logger.info("Creating sushilog"); - Statement stmt = ConnectDB.getHiveConnection().createStatement(); - String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sushilog(source STRING, " + - "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, " + - "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')"; - stmt.executeUpdate(sqlCreateTableSushiLog); - logger.info("Created sushilog"); + public void createTables() throws Exception { + try { + logger.info("Creating sushilog"); + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog(source STRING, " + + "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, " + + "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(sqlCreateTableSushiLog); + logger.info("Created sushilog"); - // To see how to apply to the ignore duplicate rules and indexes + // To see how to apply to the ignore duplicate rules and indexes // stmt.executeUpdate(sqlCreateTableSushiLog); // String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " // + " ON INSERT TO sushilog " @@ -70,15 +69,14 @@ public class IrusStats { // stmt.executeUpdate(sqlcreateRuleSushiLog); // String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);"; // stmt.executeUpdate(createSushiIndex); - - stmt.close(); - ConnectDB.getHiveConnection().close(); - logger.info("Sushi Tables Created"); - } catch (Exception e) { - logger.error("Failed to create tables: " + e); - throw new Exception("Failed to create tables: " + e.toString(), e); - } - } + stmt.close(); + ConnectDB.getHiveConnection().close(); + logger.info("Sushi Tables Created"); + } catch (Exception e) { + logger.error("Failed to create tables: " + e); + throw new Exception("Failed to create tables: " + e.toString(), e); + } + } // // The following may not be needed - It will be created when JSON tables are created // private void createTmpTables() throws Exception { @@ -107,311 +105,315 @@ public class IrusStats { // throw new Exception("Failed to create tables: " + e.toString(), e); // } // } + public void processIrusStats() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); - public void processIrusStats() throws Exception { - Statement stmt = ConnectDB.getHiveConnection().createStatement(); - ConnectDB.getHiveConnection().setAutoCommit(false); + logger.info("Adding JSON Serde jar"); + stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); + logger.info("Added JSON Serde jar"); - logger.info("Adding JSON Serde jar"); - stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); - logger.info("Added JSON Serde jar"); + logger.info("Dropping sushilogtmp_json table"); + String dropSushilogtmpJson = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".sushilogtmp_json"; + stmt.executeUpdate(dropSushilogtmpJson); + logger.info("Dropped sushilogtmp_json table"); - logger.info("Dropping sushilogtmp_json table"); - String dropSushilogtmpJson = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".sushilogtmp_json"; - stmt.executeUpdate(dropSushilogtmpJson); - logger.info("Dropped sushilogtmp_json table"); + logger.info("Creating irus_sushilogtmp_json table"); + String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n" + + " `ItemIdentifier` ARRAY<\n" + + " struct<\n" + + " Type: STRING,\n" + + " Value: STRING\n" + + " >\n" + + " >,\n" + + " `ItemPerformance` ARRAY<\n" + + " struct<\n" + + " `Period`: struct<\n" + + " `Begin`: STRING,\n" + + " `End`: STRING\n" + + " >,\n" + + " `Instance`: struct<\n" + + " `Count`: STRING,\n" + + " `MetricType`: STRING\n" + + " >\n" + + " >\n" + + " >\n" + + ")\n" + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n" + + "TBLPROPERTIES (\"transactional\"=\"false\")"; + stmt.executeUpdate(createSushilogtmpJson); + logger.info("Created irus_sushilogtmp_json table"); - logger.info("Creating irus_sushilogtmp_json table"); - String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS " + - ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n" + - " `ItemIdentifier` ARRAY<\n" + - " struct<\n" + - " Type: STRING,\n" + - " Value: STRING\n" + - " >\n" + - " >,\n" + - " `ItemPerformance` ARRAY<\n" + - " struct<\n" + - " `Period`: struct<\n" + - " `Begin`: STRING,\n" + - " `End`: STRING\n" + - " >,\n" + - " `Instance`: struct<\n" + - " `Count`: STRING,\n" + - " `MetricType`: STRING\n" + - " >\n" + - " >\n" + - " >\n" + - ")\n" + - "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + - "LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n" + - "TBLPROPERTIES (\"transactional\"=\"false\")"; - stmt.executeUpdate(createSushilogtmpJson); - logger.info("Created irus_sushilogtmp_json table"); + logger.info("Dropping irus_sushilogtmp table"); + String dropSushilogtmp = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".irus_sushilogtmp"; + stmt.executeUpdate(dropSushilogtmp); + logger.info("Dropped irus_sushilogtmp table"); - logger.info("Dropping irus_sushilogtmp table"); - String dropSushilogtmp = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".irus_sushilogtmp"; - stmt.executeUpdate(dropSushilogtmp); - logger.info("Dropped irus_sushilogtmp table"); + logger.info("Creating irus_sushilogtmp table"); + String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + + ".irus_sushilogtmp(source STRING, repository STRING, " + + "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc " + + "tblproperties('transactional'='true')"; + stmt.executeUpdate(createSushilogtmp); + logger.info("Created irus_sushilogtmp table"); - logger.info("Creating irus_sushilogtmp table"); - String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() - + ".irus_sushilogtmp(source STRING, repository STRING, " + - "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc " - + - "tblproperties('transactional'='true')"; - stmt.executeUpdate(createSushilogtmp); - logger.info("Created irus_sushilogtmp table"); + logger.info("Inserting to irus_sushilogtmp table"); + String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp " + + "SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), " + + "`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " + + "`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json " + + "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " + + "LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " + + "WHERE `ItemIdent`.`Type`= 'OAI'"; + stmt.executeUpdate(insertSushilogtmp); + logger.info("Inserted to irus_sushilogtmp table"); - logger.info("Inserting to irus_sushilogtmp table"); - String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp " + - "SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), " + - "`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " + - "`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json " + - "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " + - "LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " + - "WHERE `ItemIdent`.`Type`= 'OAI'"; - stmt.executeUpdate(insertSushilogtmp); - logger.info("Inserted to irus_sushilogtmp table"); + logger.info("Creating downloads_stats table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`count` bigint, " + + "`openaire` bigint)"; + stmt.executeUpdate(createDownloadsStats); + logger.info("Created downloads_stats table"); - logger.info("Creating downloads_stats table"); - String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".downloads_stats " + - "(`source` string, " + - "`repository_id` string, " + - "`result_id` string, " + - "`date` string, " + - "`count` bigint, " + - "`openaire` bigint)"; - stmt.executeUpdate(createDownloadsStats); - logger.info("Created downloads_stats table"); + logger.info("Inserting into downloads_stats"); + String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'"; + stmt.executeUpdate(insertDStats); + logger.info("Inserted into downloads_stats"); - logger.info("Inserting into downloads_stats"); - String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + - "SELECT s.source, d.id AS repository_id, " + - "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp s, " + - ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + - ConnectDB.getStatsDBSchema() + ".result_oids ro " + - "WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'"; - stmt.executeUpdate(insertDStats); - logger.info("Inserted into downloads_stats"); + logger.info("Creating sushilog table"); + String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog " + + "(`source` string, " + + "`repository_id` string, " + + "`rid` string, " + + "`date` string, " + + "`metric_type` string, " + + "`count` int)"; + stmt.executeUpdate(createSushilog); + logger.info("Created sushilog table"); - logger.info("Creating sushilog table"); - String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sushilog " + - "(`source` string, " + - "`repository_id` string, " + - "`rid` string, " + - "`date` string, " + - "`metric_type` string, " + - "`count` int)"; - stmt.executeUpdate(createSushilog); - logger.info("Created sushilog table"); + logger.info("Inserting to sushilog table"); + String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM " + + ConnectDB.getUsageStatsDBSchema() + + ".irus_sushilogtmp"; + stmt.executeUpdate(insertToShushilog); + logger.info("Inserted to sushilog table"); - logger.info("Inserting to sushilog table"); - String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM " + - ConnectDB.getUsageStatsDBSchema() - + ".irus_sushilogtmp"; - stmt.executeUpdate(insertToShushilog); - logger.info("Inserted to sushilog table"); + ConnectDB.getHiveConnection().close(); + } - ConnectDB.getHiveConnection().close(); - } + public void getIrusRRReport(String irusUKReportPath) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM"); + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("(getIrusRRReport) Starting period for log download: " + sdf.format(start.getTime())); - public void getIrusRRReport(String irusUKReportPath) throws Exception { - SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM"); - // Setting the starting period - Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); - logger.info("(getIrusRRReport) Starting period for log download: " + sdf.format(start.getTime())); + // Setting the ending period (last day of the month) + Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + end.add(Calendar.MONTH, +1); + end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("(getIrusRRReport) Ending period for log download: " + sdf.format(end.getTime())); - // Setting the ending period (last day of the month) - Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); - end.add(Calendar.MONTH, +1); - end.add(Calendar.DAY_OF_MONTH, -1); - logger.info("(getIrusRRReport) Ending period for log download: " + sdf.format(end.getTime())); + String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=" + + sdf.format(start.getTime()) + "&EndDate=" + sdf.format(end.getTime()) + + "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback="; - String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=" + - sdf.format(start.getTime()) + "&EndDate=" + sdf.format(end.getTime()) + - "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback="; + logger.info("(getIrusRRReport) Getting report: " + reportUrl); - logger.info("(getIrusRRReport) Getting report: " + reportUrl); + String text = getJson(reportUrl, "", ""); - String text = getJson(reportUrl, "", ""); + List opendoarsToVisit = new ArrayList(); + JSONParser parser = new JSONParser(); + JSONObject jsonObject = (JSONObject) parser.parse(text); + jsonObject = (JSONObject) jsonObject.get("ReportResponse"); + jsonObject = (JSONObject) jsonObject.get("Report"); + jsonObject = (JSONObject) jsonObject.get("Report"); + jsonObject = (JSONObject) jsonObject.get("Customer"); + JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems"); + int i = 0; + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRow = (JSONObject) aJsonArray; + JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier"); + for (Object identifier : itemIdentifier) { + JSONObject opendoar = (JSONObject) identifier; + if (opendoar.get("Type").toString().equals("OpenDOAR")) { + i++; + opendoarsToVisit.add(opendoar.get("Value").toString()); + break; + } + } + // break; + } - List opendoarsToVisit = new ArrayList(); - JSONParser parser = new JSONParser(); - JSONObject jsonObject = (JSONObject) parser.parse(text); - jsonObject = (JSONObject) jsonObject.get("ReportResponse"); - jsonObject = (JSONObject) jsonObject.get("Report"); - jsonObject = (JSONObject) jsonObject.get("Report"); - jsonObject = (JSONObject) jsonObject.get("Customer"); - JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems"); - int i = 0; - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRow = (JSONObject) aJsonArray; - JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier"); - for (Object identifier : itemIdentifier) { - JSONObject opendoar = (JSONObject) identifier; - if (opendoar.get("Type").toString().equals("OpenDOAR")) { - i++; - opendoarsToVisit.add(opendoar.get("Value").toString()); - break; - } - } - // break; - } + logger.info("(getIrusRRReport) Found the following opendoars for download: " + opendoarsToVisit); - logger.info("(getIrusRRReport) Found the following opendoars for download: " + opendoarsToVisit); + if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0 + && ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) { + logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.irusNumberOfOpendoarsToDownload); + opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload); + } - if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0 && - ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) { - logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.irusNumberOfOpendoarsToDownload); - opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload); - } + logger.info("(getIrusRRReport) Downloading the followins opendoars: " + opendoarsToVisit); - logger.info("(getIrusRRReport) Downloading the followins opendoars: " + opendoarsToVisit); + for (String opendoar : opendoarsToVisit) { + logger.info("Now working on openDoar: " + opendoar); + this.getIrusIRReport(opendoar, irusUKReportPath); + } - for (String opendoar : opendoarsToVisit) { - logger.info("Now working on openDoar: " + opendoar); - this.getIrusIRReport(opendoar, irusUKReportPath); - } + logger.info("(getIrusRRReport) Finished with report: " + reportUrl); + } - logger.info("(getIrusRRReport) Finished with report: " + reportUrl); - } + private void getIrusIRReport(String opendoar, String irusUKReportPath) throws Exception { - private void getIrusIRReport(String opendoar, String irusUKReportPath) throws Exception { + logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar); - logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar); + ConnectDB.getHiveConnection().setAutoCommit(false); - ConnectDB.getHiveConnection().setAutoCommit(false); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); - SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("(getIrusIRReport) Starting period for log download: " + simpleDateFormat.format(start.getTime())); - // Setting the starting period - Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); - logger.info("(getIrusIRReport) Starting period for log download: " + simpleDateFormat.format(start.getTime())); + // Setting the ending period (last day of the month) + Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + end.add(Calendar.MONTH, +1); + end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("(getIrusIRReport) Ending period for log download: " + simpleDateFormat.format(end.getTime())); - // Setting the ending period (last day of the month) - Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); - end.add(Calendar.MONTH, +1); - end.add(Calendar.DAY_OF_MONTH, -1); - logger.info("(getIrusIRReport) Ending period for log download: " + simpleDateFormat.format(end.getTime())); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + PreparedStatement st = ConnectDB + .getHiveConnection() + .prepareStatement( + "SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?"); + st.setString(1, "opendoar____::" + opendoar); + ResultSet rs_date = st.executeQuery(); + Date dateMax = null; + while (rs_date.next()) { + if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") + && !rs_date.getString(1).equals("")) { + start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); + } + } + rs_date.close(); + int batch_size = 0; - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - PreparedStatement st = ConnectDB - .getHiveConnection() - .prepareStatement( - "SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?"); - st.setString(1, "opendoar____::" + opendoar); - ResultSet rs_date = st.executeQuery(); - while (rs_date.next()) { - if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") - && !rs_date.getString(1).equals("")) { - start.setTime(sdf.parse(rs_date.getString(1))); - } - } - rs_date.close(); - int batch_size = 0; + if (dateMax != null && start.getTime().compareTo(dateMax) <= 0) { + logger.info("Date found in logs " + dateMax + " and not downloanding logs for " + opendoar); + } else { + while (start.before(end)) { + logger.info("date: " + simpleDateFormat.format(start.getTime())); + String reportUrl = this.irusUKURL + "GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate=" + + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()) + + "&RepositoryIdentifier=opendoar%3A" + opendoar + + "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback="; + start.add(Calendar.MONTH, 1); - while (start.before(end)) { - // log.info("date: " + simpleDateFormat.format(start.getTime())); - String reportUrl = this.irusUKURL + "GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate=" - + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()) - + "&RepositoryIdentifier=opendoar%3A" + opendoar - + "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback="; - start.add(Calendar.MONTH, 1); + logger.info("Downloading file: " + reportUrl); + String text = getJson(reportUrl, "", ""); + if (text == null) { + continue; + } - logger.info("Downloading file: " + reportUrl); - String text = getJson(reportUrl, "", ""); - if (text == null) { - continue; - } + FileSystem fs = FileSystem.get(new Configuration()); + String filePath = irusUKReportPath + "/" + "IrusIRReport_" + + opendoar + "_" + simpleDateFormat.format(start.getTime()) + ".json"; + logger.info("Storing to file: " + filePath); + FSDataOutputStream fin = fs.create(new Path(filePath), true); - FileSystem fs = FileSystem.get(new Configuration()); - String filePath = irusUKReportPath + "/" + "IrusIRReport_" + - opendoar + "_" + simpleDateFormat.format(start.getTime()) + ".json"; - logger.info("Storing to file: " + filePath); - FSDataOutputStream fin = fs.create(new Path(filePath), true); + JSONParser parser = new JSONParser(); + JSONObject jsonObject = (JSONObject) parser.parse(text); + jsonObject = (JSONObject) jsonObject.get("ReportResponse"); + jsonObject = (JSONObject) jsonObject.get("Report"); + jsonObject = (JSONObject) jsonObject.get("Report"); + jsonObject = (JSONObject) jsonObject.get("Customer"); + JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems"); + if (jsonArray == null) { + continue; + } + String oai = ""; + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRow = (JSONObject) aJsonArray; + fin.write(jsonObjectRow.toJSONString().getBytes()); + fin.writeChar('\n'); + } - JSONParser parser = new JSONParser(); - JSONObject jsonObject = (JSONObject) parser.parse(text); - jsonObject = (JSONObject) jsonObject.get("ReportResponse"); - jsonObject = (JSONObject) jsonObject.get("Report"); - jsonObject = (JSONObject) jsonObject.get("Report"); - jsonObject = (JSONObject) jsonObject.get("Customer"); - JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems"); - if (jsonArray == null) { - continue; - } - String oai = ""; - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRow = (JSONObject) aJsonArray; - fin.write(jsonObjectRow.toJSONString().getBytes()); - fin.writeChar('\n'); - } + fin.close(); + } - fin.close(); - } + } + //ConnectDB.getHiveConnection().close(); - ConnectDB.getHiveConnection().close(); + logger.info("(getIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar); + } - logger.info("(getIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar); - } + private String getJson(String url) throws Exception { + try { + System.out.println("===> Connecting to: " + url); + URL website = new URL(url); + System.out.println("Connection url -----> " + url); + URLConnection connection = website.openConnection(); - private String getJson(String url) throws Exception { - try { - System.out.println("===> Connecting to: " + url); - URL website = new URL(url); - System.out.println("Connection url -----> " + url); - URLConnection connection = website.openConnection(); - - // connection.setRequestProperty ("Authorization", "Basic "+encoded); - StringBuilder response; - try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - response = new StringBuilder(); - String inputLine; - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); + // connection.setRequestProperty ("Authorization", "Basic "+encoded); + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); // response.append("\n"); - } - } + } + } - System.out.println("response ====> " + response.toString()); + System.out.println("response ====> " + response.toString()); - return response.toString(); - } catch (Exception e) { - logger.error("Failed to get URL: " + e); - System.out.println("Failed to get URL: " + e); - throw new Exception("Failed to get URL: " + e.toString(), e); - } - } + return response.toString(); + } catch (Exception e) { + logger.error("Failed to get URL: " + e); + System.out.println("Failed to get URL: " + e); + throw new Exception("Failed to get URL: " + e.toString(), e); + } + } - private String getJson(String url, String username, String password) throws Exception { - // String cred=username+":"+password; - // String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes()); - try { - URL website = new URL(url); - URLConnection connection = website.openConnection(); - // connection.setRequestProperty ("Authorization", "Basic "+encoded); - StringBuilder response; - try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - response = new StringBuilder(); - String inputLine; - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); - response.append("\n"); - } - } - return response.toString(); - } catch (Exception e) { - logger.error("Failed to get URL", e); - return null; - } - } + private String getJson(String url, String username, String password) throws Exception { + // String cred=username+":"+password; + // String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes()); + try { + URL website = new URL(url); + URLConnection connection = website.openConnection(); + // connection.setRequestProperty ("Authorization", "Basic "+encoded); + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + response.append("\n"); + } + } + return response.toString(); + } catch (Exception e) { + logger.error("Failed to get URL", e); + return null; + } + } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java index 0e0e013cf..7a61b1f46 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java @@ -1,4 +1,3 @@ - package eu.dnetlib.oa.graph.usagestats.export; import java.io.*; @@ -28,49 +27,49 @@ import org.slf4j.LoggerFactory; */ public class LaReferenciaDownloadLogs { - private final String piwikUrl; - private Date startDate; - private final String tokenAuth; + private final String piwikUrl; + private Date startDate; + private final String tokenAuth; - /* + /* * The Piwik's API method - */ - private final String APImethod = "?module=API&method=Live.getLastVisitsDetails"; - private final String format = "&format=json"; - private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess"; + */ + private final String APImethod = "?module=API&method=Live.getLastVisitsDetails"; + private final String format = "&format=json"; + private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess"; - private static final Logger logger = LoggerFactory.getLogger(LaReferenciaDownloadLogs.class); + private static final Logger logger = LoggerFactory.getLogger(LaReferenciaDownloadLogs.class); - public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception { - this.piwikUrl = piwikUrl; - this.tokenAuth = tokenAuth; - this.createTables(); + public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception { + this.piwikUrl = piwikUrl; + this.tokenAuth = tokenAuth; + this.createTables(); // this.createTmpTables(); - } + } - public void reCreateLogDirs() throws IllegalArgumentException, IOException { - FileSystem dfs = FileSystem.get(new Configuration()); + public void reCreateLogDirs() throws IllegalArgumentException, IOException { + FileSystem dfs = FileSystem.get(new Configuration()); - logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); - dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true); + logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); + dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true); - logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); - dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath)); - } + logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); + dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath)); + } - private void createTables() throws Exception { - try { - Statement stmt = ConnectDB.getHiveConnection().createStatement(); + private void createTables() throws Exception { + try { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); - logger.info("Creating LaReferencia tables"); - String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " + - ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " + - "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " + - "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + - "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " + - "stored as orc tblproperties('transactional'='true')"; - stmt.executeUpdate(sqlCreateTableLareferenciaLog); - logger.info("Created LaReferencia tables"); + logger.info("Creating LaReferencia tables"); + String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " + + "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " + + "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + + "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " + + "stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(sqlCreateTableLareferenciaLog); + logger.info("Created LaReferencia tables"); // String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " // + " ON INSERT TO lareferencialog " // + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit," @@ -81,16 +80,16 @@ public class LaReferenciaDownloadLogs { // stmt.executeUpdate(sqlcreateRuleLaReferenciaLog); // stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog); - stmt.close(); - ConnectDB.getHiveConnection().close(); - logger.info("Lareferencia Tables Created"); + stmt.close(); + ConnectDB.getHiveConnection().close(); + logger.info("Lareferencia Tables Created"); - } catch (Exception e) { - logger.error("Failed to create tables: " + e); - throw new Exception("Failed to create tables: " + e.toString(), e); - // System.exit(0); - } - } + } catch (Exception e) { + logger.error("Failed to create tables: " + e); + throw new Exception("Failed to create tables: " + e.toString(), e); + // System.exit(0); + } + } // private void createTmpTables() throws Exception { // @@ -115,147 +114,152 @@ public class LaReferenciaDownloadLogs { // // System.exit(0); // } // } + private String getPiwikLogUrl() { + return piwikUrl + "/"; + } - private String getPiwikLogUrl() { - return piwikUrl + "/"; - } + private String getJson(String url) throws Exception { + try { + URL website = new URL(url); + URLConnection connection = website.openConnection(); - private String getJson(String url) throws Exception { - try { - URL website = new URL(url); - URLConnection connection = website.openConnection(); - - StringBuilder response; - try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - response = new StringBuilder(); - String inputLine; - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); // response.append("\n"); - } - } + } + } - return response.toString(); - } catch (Exception e) { - logger.error("Failed to get URL: " + e); - throw new Exception("Failed to get URL: " + e.toString(), e); - } - } + return response.toString(); + } catch (Exception e) { + logger.error("Failed to get URL: " + e); + throw new Exception("Failed to get URL: " + e.toString(), e); + } + } - public void GetLaReferenciaRepos(String repoLogsPath) throws Exception { + public void GetLaReferenciaRepos(String repoLogsPath) throws Exception { - String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth; - String content = ""; + String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth; + String content = ""; - List siteIdsToVisit = new ArrayList(); + List siteIdsToVisit = new ArrayList(); - // Getting all the siteIds in a list for logging reasons & limiting the list - // to the max number of siteIds - content = getJson(baseApiUrl); - JSONParser parser = new JSONParser(); - JSONArray jsonArray = (JSONArray) parser.parse(content); - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRow = (JSONObject) aJsonArray; - siteIdsToVisit.add(Integer.parseInt(jsonObjectRow.get("idsite").toString())); - } - logger.info("Found the following siteIds for download: " + siteIdsToVisit); + // Getting all the siteIds in a list for logging reasons & limiting the list + // to the max number of siteIds + content = getJson(baseApiUrl); + JSONParser parser = new JSONParser(); + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRow = (JSONObject) aJsonArray; + siteIdsToVisit.add(Integer.parseInt(jsonObjectRow.get("idsite").toString())); + } + logger.info("Found the following siteIds for download: " + siteIdsToVisit); - if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 && - ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) { - logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload); - siteIdsToVisit = siteIdsToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload); - } + if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 + && ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) { + logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload); + siteIdsToVisit = siteIdsToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload); + } - logger.info("Downloading from repos with the followins siteIds: " + siteIdsToVisit); + logger.info("Downloading from repos with the followins siteIds: " + siteIdsToVisit); - for (int siteId : siteIdsToVisit) { - logger.info("Now working on piwikId: " + siteId); - this.GetLaReFerenciaLogs(repoLogsPath, siteId); - } - } + for (int siteId : siteIdsToVisit) { + logger.info("Now working on LaReferencia MatomoId: " + siteId); + this.GetLaReFerenciaLogs(repoLogsPath, siteId); + } + } - public void GetLaReFerenciaLogs(String repoLogsPath, - int laReferencialMatomoID) throws Exception { + public void GetLaReFerenciaLogs(String repoLogsPath, + int laReferencialMatomoID) throws Exception { - logger.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID); + logger.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - // Setting the starting period - Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); - logger.info("Starting period for log download: " + sdf.format(start.getTime())); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("Starting period for log download: " + sdf.format(start.getTime())); - // Setting the ending period (last day of the month) - Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); - end.add(Calendar.MONTH, +1); - end.add(Calendar.DAY_OF_MONTH, -1); - logger.info("Ending period for log download: " + sdf.format(end.getTime())); + // Setting the ending period (last day of the month) + Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + end.add(Calendar.MONTH, +1); + end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("Ending period for log download: " + sdf.format(end.getTime())); - PreparedStatement st = ConnectDB - .getHiveConnection() - .prepareStatement( - "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + - ".lareferencialog WHERE matomoid=? GROUP BY timestamp HAVING max(timestamp) is not null"); - st.setInt(1, laReferencialMatomoID); + PreparedStatement st = ConnectDB + .getHiveConnection() + .prepareStatement( + "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + + ".lareferencialog WHERE matomoid=?"); + st.setInt(1, laReferencialMatomoID); + Date dateMax = null; - ResultSet rs_date = st.executeQuery(); - while (rs_date.next()) { - if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") - && !rs_date.getString(1).equals("")) { - start.setTime(sdf.parse(rs_date.getString(1))); - } - } - rs_date.close(); + ResultSet rs_date = st.executeQuery(); + while (rs_date.next()) { + if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") + && !rs_date.getString(1).equals("")) { + start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); + } + } + rs_date.close(); - for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { - Date date = currDay.getTime(); - logger - .info( - "Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for " - + sdf.format(date)); + for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { + Date date = currDay.getTime(); + if (dateMax != null && currDay.getTime().compareTo(dateMax) <= 0) { + logger.info("Date found in logs " + dateMax + " and not downloanding Matomo logs for " + laReferencialMatomoID); + } else { + logger + .info( + "Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for " + + sdf.format(date)); - String period = "&period=day&date=" + sdf.format(date); - String outFolder = ""; - outFolder = repoLogsPath; + String period = "&period=day&date=" + sdf.format(date); + String outFolder = ""; + outFolder = repoLogsPath; - FileSystem fs = FileSystem.get(new Configuration()); - FSDataOutputStream fin = fs - .create( - new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + ".json"), - true); + FileSystem fs = FileSystem.get(new Configuration()); + FSDataOutputStream fin = fs + .create( + new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + ".json"), + true); - String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format - + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; - String content = ""; - int i = 0; + String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format + + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; + String content = ""; + int i = 0; - JSONParser parser = new JSONParser(); - do { - String apiUrl = baseApiUrl; + JSONParser parser = new JSONParser(); + do { + String apiUrl = baseApiUrl; - if (i > 0) { - apiUrl += "&filter_offset=" + (i * 1000); - } + if (i > 0) { + apiUrl += "&filter_offset=" + (i * 1000); + } - content = getJson(apiUrl); - if (content.length() == 0 || content.equals("[]")) - break; + content = getJson(apiUrl); + if (content.length() == 0 || content.equals("[]")) { + break; + } - JSONArray jsonArray = (JSONArray) parser.parse(content); - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRaw = (JSONObject) aJsonArray; - fin.write(jsonObjectRaw.toJSONString().getBytes()); - fin.writeChar('\n'); - } + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRaw = (JSONObject) aJsonArray; + fin.write(jsonObjectRaw.toJSONString().getBytes()); + fin.writeChar('\n'); + } - logger - .info( - "Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID - + " and for " - + sdf.format(date)); - i++; - } while (true); - fin.close(); - - } - } + logger + .info( + "Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID + + " and for " + + sdf.format(date)); + i++; + } while (true); + fin.close(); + } + } + } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java index 65816518f..7a64e48d2 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java @@ -204,6 +204,9 @@ public class PiwikDownloadLogs { logger.info("Downloading from repos with the followins piwikIds: " + piwikIdToVisit); + + // ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads); + for (int siteId : piwikIdToVisit) { // Setting the starting period Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); logger.info("Starting period for log download: " + sdf.format(start.getTime())); @@ -214,9 +217,6 @@ public class PiwikDownloadLogs { end.add(Calendar.DAY_OF_MONTH, -1); logger.info("Ending period for log download: " + sdf.format(end.getTime())); - //ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads); - for (int siteId : piwikIdToVisit) { - logger.info("Now working on piwikId: " + siteId); PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION @@ -224,7 +224,7 @@ public class PiwikDownloadLogs { "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog WHERE source=?"); st.setInt(1, siteId); - + Date dateMax=null; ResultSet rs_date = st.executeQuery(); while (rs_date.next()) { logger.info("Found max date: " + rs_date.getString(1) + " for repository " + siteId); @@ -232,85 +232,92 @@ public class PiwikDownloadLogs { if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") && !rs_date.getString(1).equals("")) { start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); } - } + } rs_date.close(); for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { - //logger.info("Date used " + currDay.toString()); - //Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); - //executor.execute(worker);// calling execute method of ExecutorService - GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + // logger.info("Date used " + currDay.toString()); + // Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + // executor.execute(worker);// calling execute method of ExecutorService + logger.info("Date used " + currDay.getTime().toString()); + + if(dateMax!=null && currDay.getTime().compareTo(dateMax)<=0) + logger.info("Date found in logs "+dateMax+ " and not downloanding Matomo logs for "+siteId); + else + GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + } } - //executor.shutdown(); - //while (!executor.isTerminated()) { - //} - //System.out.println("Finished all threads"); + // executor.shutdown(); + // while (!executor.isTerminated()) { + // } + // System.out.println("Finished all threads"); } - - public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath, - String portalMatomoID) throws Exception { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - Date date = currDay.getTime(); - logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); + public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath, + String portalMatomoID) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - String period = "&period=day&date=" + sdf.format(date); - String outFolder = ""; - if (siteId == Integer.parseInt(portalMatomoID)) { - outFolder = portalLogPath; - } else { - outFolder = repoLogsPath; + Date date = currDay.getTime(); + logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); + + String period = "&period=day&date=" + sdf.format(date); + String outFolder = ""; + if (siteId == Integer.parseInt(portalMatomoID)) { + outFolder = portalLogPath; + } else { + outFolder = repoLogsPath; + } + + String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format + + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; + String content = ""; + + int i = 0; + + JSONParser parser = new JSONParser(); + StringBuffer totalContent = new StringBuffer(); + FileSystem fs = FileSystem.get(new Configuration()); + + do { + int writtenBytes = 0; + String apiUrl = baseApiUrl; + + if (i > 0) { + apiUrl += "&filter_offset=" + (i * 1000); } - String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format - + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; - String content = ""; + content = getJson(apiUrl); + if (content.length() == 0 || content.equals("[]")) + break; - int i = 0; + FSDataOutputStream fin = fs + .create( + new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"), + true); + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRaw = (JSONObject) aJsonArray; + byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes(); + fin.write(jsonObjectRawBytes); + fin.writeChar('\n'); - JSONParser parser = new JSONParser(); - StringBuffer totalContent = new StringBuffer(); - FileSystem fs = FileSystem.get(new Configuration()); + writtenBytes += jsonObjectRawBytes.length + 1; + } - do { - int writtenBytes = 0; - String apiUrl = baseApiUrl; + fin.close(); + System.out + .println( + Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes + + " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"); - if (i > 0) { - apiUrl += "&filter_offset=" + (i * 1000); - } + i++; + } while (true); - content = getJson(apiUrl); - if (content.length() == 0 || content.equals("[]")) - break; - - FSDataOutputStream fin = fs - .create( - new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i - + ".json"), - true); - JSONArray jsonArray = (JSONArray) parser.parse(content); - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRaw = (JSONObject) aJsonArray; - byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes(); - fin.write(jsonObjectRawBytes); - fin.writeChar('\n'); - - writtenBytes += jsonObjectRawBytes.length + 1; - } - - fin.close(); - System.out - .println( - Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes - + " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i - + ".json"); - - i++; - } while (true); - - fs.close(); - } + fs.close(); + } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java index 6e015acf4..6625c381b 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikStatsDB.java @@ -199,14 +199,14 @@ public class PiwikStatsDB { cleanOAI(); logger.info("Cleaning oai done"); - logger.info("Processing portal logs"); + logger.info("Processing portal logs"); processPortalLog(); logger.info("Portal logs process done"); logger.info("Processing portal usagestats"); portalStats(); logger.info("Portal usagestats process done"); - + logger.info("ViewsStats processing starts"); viewsStats(); logger.info("ViewsStats processing ends"); @@ -215,8 +215,6 @@ public class PiwikStatsDB { downloadsStats(); logger.info("DownloadsStats processing starts"); - - logger.info("Updating Production Tables"); updateProdTables(); logger.info("Updated Production Tables"); @@ -313,7 +311,7 @@ public class PiwikStatsDB { "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p1, " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p2\n" + - "WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n" + "WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n" + "AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n" + "AND p1.timestamp\n" + - " >,\n" + - " `ItemPerformance` struct<\n" + - " `Period`: struct<\n" + - " `Begin`: STRING,\n" + - " `End`: STRING\n" + - " >,\n" + - " `Instance`: struct<\n" + - " `Count`: STRING,\n" + - " `MetricType`: STRING\n" + - " >\n" + - " >\n" + - ")" + - "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + - "LOCATION '" + sarcsReportPathArray + "/'\n" + - "TBLPROPERTIES (\"transactional\"=\"false\")"; - stmt.executeUpdate(create_sarc_sushilogtmp_json_array); - logger.info("Created sarc_sushilogtmp_json_array table"); + logger.info("Creating sarc_sushilogtmp_json_array table"); + String create_sarc_sushilogtmp_json_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n" + + " `ItemIdentifier` ARRAY<\n" + + " struct<\n" + + " `Type`: STRING,\n" + + " `Value`: STRING\n" + + " >\n" + + " >,\n" + + " `ItemPerformance` struct<\n" + + " `Period`: struct<\n" + + " `Begin`: STRING,\n" + + " `End`: STRING\n" + + " >,\n" + + " `Instance`: struct<\n" + + " `Count`: STRING,\n" + + " `MetricType`: STRING\n" + + " >\n" + + " >\n" + + ")" + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + sarcsReportPathArray + "/'\n" + + "TBLPROPERTIES (\"transactional\"=\"false\")"; + stmt.executeUpdate(create_sarc_sushilogtmp_json_array); + logger.info("Created sarc_sushilogtmp_json_array table"); - logger.info("Dropping sarc_sushilogtmp_json_non_array table"); - String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".sarc_sushilogtmp_json_non_array"; - stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array); - logger.info("Dropped sarc_sushilogtmp_json_non_array table"); + logger.info("Dropping sarc_sushilogtmp_json_non_array table"); + String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp_json_non_array"; + stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array); + logger.info("Dropped sarc_sushilogtmp_json_non_array table"); - logger.info("Creating sarc_sushilogtmp_json_non_array table"); - String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " + - ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n" + - " `ItemIdentifier` struct<\n" + - " `Type`: STRING,\n" + - " `Value`: STRING\n" + - " >,\n" + - " `ItemPerformance` struct<\n" + - " `Period`: struct<\n" + - " `Begin`: STRING,\n" + - " `End`: STRING\n" + - " >,\n" + - " `Instance`: struct<\n" + - " `Count`: STRING,\n" + - " `MetricType`: STRING\n" + - " >\n" + - " >" + - ")" + - "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + - "LOCATION '" + sarcsReportPathNonArray + "/'\n" + - "TBLPROPERTIES (\"transactional\"=\"false\")"; - stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array); - logger.info("Created sarc_sushilogtmp_json_non_array table"); + logger.info("Creating sarc_sushilogtmp_json_non_array table"); + String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n" + + " `ItemIdentifier` struct<\n" + + " `Type`: STRING,\n" + + " `Value`: STRING\n" + + " >,\n" + + " `ItemPerformance` struct<\n" + + " `Period`: struct<\n" + + " `Begin`: STRING,\n" + + " `End`: STRING\n" + + " >,\n" + + " `Instance`: struct<\n" + + " `Count`: STRING,\n" + + " `MetricType`: STRING\n" + + " >\n" + + " >" + + ")" + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + sarcsReportPathNonArray + "/'\n" + + "TBLPROPERTIES (\"transactional\"=\"false\")"; + stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array); + logger.info("Created sarc_sushilogtmp_json_non_array table"); - logger.info("Creating sarc_sushilogtmp table"); - String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sarc_sushilogtmp(source STRING, repository STRING, " + - "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc " - + - "tblproperties('transactional'='true')"; - stmt.executeUpdate(create_sarc_sushilogtmp); - logger.info("Created sarc_sushilogtmp table"); + logger.info("Creating sarc_sushilogtmp table"); + String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp(source STRING, repository STRING, " + + "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc " + + "tblproperties('transactional'='true')"; + stmt.executeUpdate(create_sarc_sushilogtmp); + logger.info("Created sarc_sushilogtmp table"); - logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); - String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + - "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " + - " `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + - "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array " + - "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " + - "WHERE `ItemIdent`.`Type`='DOI'"; - stmt.executeUpdate(insert_sarc_sushilogtmp); - logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); + logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); + String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + + "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " + + " `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + + "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array " + + "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " + + "WHERE `ItemIdent`.`Type`='DOI'"; + stmt.executeUpdate(insert_sarc_sushilogtmp); + logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); - logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); - insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + - "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " + - "`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + - "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array"; - stmt.executeUpdate(insert_sarc_sushilogtmp); - logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); + logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); + insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + + "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " + + "`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + + "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array"; + stmt.executeUpdate(insert_sarc_sushilogtmp); + logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); - ConnectDB.getHiveConnection().close(); - } + ConnectDB.getHiveConnection().close(); + } - public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception { + public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception { - Statement stmt = ConnectDB.getHiveConnection().createStatement(); - ConnectDB.getHiveConnection().setAutoCommit(false); + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); - logger.info("Creating sushilog table"); - String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sushilog " + - "(`source` string, " + - "`repository` string, " + - "`rid` string, " + - "`date` string, " + - "`metric_type` string, " + - "`count` int)"; - stmt.executeUpdate(createSushilog); - logger.info("Created sushilog table"); + logger.info("Creating sushilog table"); + String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog " + + "(`source` string, " + + "`repository` string, " + + "`rid` string, " + + "`date` string, " + + "`metric_type` string, " + + "`count` int)"; + stmt.executeUpdate(createSushilog); + logger.info("Created sushilog table"); - logger.info("Dropping sarc_sushilogtmp table"); - String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".sarc_sushilogtmp"; - stmt.executeUpdate(drop_sarc_sushilogtmp); - logger.info("Dropped sarc_sushilogtmp table"); - ConnectDB.getHiveConnection().close(); + logger.info("Dropping sarc_sushilogtmp table"); + String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp"; + stmt.executeUpdate(drop_sarc_sushilogtmp); + logger.info("Dropped sarc_sushilogtmp table"); + ConnectDB.getHiveConnection().close(); - List issnAndUrls = new ArrayList(); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/motricidade/sushiLite/v1_7/", "1646-107X" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/antropologicas/sushiLite/v1_7/", "0873-819X" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/interaccoes/sushiLite/v1_7/", "1646-2335" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/cct/sushiLite/v1_7/", "2182-3030" - }); - issnAndUrls.add(new String[] { - "https://actapediatrica.spp.pt/sushiLite/v1_7/", "0873-9781" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/sociologiapp/sushiLite/v1_7/", "0873-6529" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/finisterra/sushiLite/v1_7/", "0430-5027" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/sisyphus/sushiLite/v1_7/", "2182-8474" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/anestesiologia/sushiLite/v1_7/", "0871-6099" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/rpe/sushiLite/v1_7/", "0871-9187" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/psilogos/sushiLite/v1_7/", "1646-091X" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/juridica/sushiLite/v1_7/", "2183-5799" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/ecr/sushiLite/v1_7/", "1647-2098" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/nascercrescer/sushiLite/v1_7/", "0872-0754" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/cea/sushiLite/v1_7/", "1645-3794" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/proelium/sushiLite/v1_7/", "1645-8826" - }); - issnAndUrls.add(new String[] { - "https://revistas.rcaap.pt/millenium/sushiLite/v1_7/", "0873-3015" - }); + List issnAndUrls = new ArrayList(); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/motricidade/sushiLite/v1_7/", "1646-107X" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/antropologicas/sushiLite/v1_7/", "0873-819X" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/interaccoes/sushiLite/v1_7/", "1646-2335" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/cct/sushiLite/v1_7/", "2182-3030" + }); + issnAndUrls.add(new String[]{ + "https://actapediatrica.spp.pt/sushiLite/v1_7/", "0873-9781" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/sociologiapp/sushiLite/v1_7/", "0873-6529" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/finisterra/sushiLite/v1_7/", "0430-5027" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/sisyphus/sushiLite/v1_7/", "2182-8474" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/anestesiologia/sushiLite/v1_7/", "0871-6099" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/rpe/sushiLite/v1_7/", "0871-9187" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/psilogos/sushiLite/v1_7/", "1646-091X" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/juridica/sushiLite/v1_7/", "2183-5799" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/ecr/sushiLite/v1_7/", "1647-2098" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/nascercrescer/sushiLite/v1_7/", "0872-0754" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/cea/sushiLite/v1_7/", "1645-3794" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/proelium/sushiLite/v1_7/", "1645-8826" + }); + issnAndUrls.add(new String[]{ + "https://revistas.rcaap.pt/millenium/sushiLite/v1_7/", "0873-3015" + }); - if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0 && - ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) { - logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.sarcNumberOfIssnToDownload); - issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload); - } + if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0 + && ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) { + logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.sarcNumberOfIssnToDownload); + issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload); + } - logger.info("(getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls); + logger.info("(getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls); - for (String[] issnAndUrl : issnAndUrls) { - logger.info("Now working on ISSN: " + issnAndUrl[1]); - getARReport(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]); - } + for (String[] issnAndUrl : issnAndUrls) { + logger.info("Now working on ISSN: " + issnAndUrl[1]); + getARReport(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]); + } - } + } - public void finalizeSarcStats() throws Exception { - stmtHive = ConnectDB.getHiveConnection().createStatement(); - ConnectDB.getHiveConnection().setAutoCommit(false); - stmtImpala = ConnectDB.getImpalaConnection().createStatement(); + public void finalizeSarcStats() throws Exception { + stmtHive = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + stmtImpala = ConnectDB.getImpalaConnection().createStatement(); - logger.info("Creating downloads_stats table"); - String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".downloads_stats " + - "(`source` string, " + - "`repository_id` string, " + - "`result_id` string, " + - "`date` string, " + - "`count` bigint, " + - "`openaire` bigint)"; - stmtHive.executeUpdate(createDownloadsStats); - logger.info("Created downloads_stats table"); + logger.info("Creating downloads_stats table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`count` bigint, " + + "`openaire` bigint)"; + stmtHive.executeUpdate(createDownloadsStats); + logger.info("Created downloads_stats table"); - logger.info("Dropping sarc_sushilogtmp_impala table"); - String drop_sarc_sushilogtmp_impala = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".sarc_sushilogtmp_impala"; - stmtHive.executeUpdate(drop_sarc_sushilogtmp_impala); - logger.info("Dropped sarc_sushilogtmp_impala table"); + logger.info("Dropping sarc_sushilogtmp_impala table"); + String drop_sarc_sushilogtmp_impala = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp_impala"; + stmtHive.executeUpdate(drop_sarc_sushilogtmp_impala); + logger.info("Dropped sarc_sushilogtmp_impala table"); - logger.info("Creating sarc_sushilogtmp_impala, a table readable by impala"); - String createSarcSushilogtmpImpala = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sarc_sushilogtmp_impala " + - "STORED AS PARQUET AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp"; - stmtHive.executeUpdate(createSarcSushilogtmpImpala); - logger.info("Created sarc_sushilogtmp_impala"); + logger.info("Creating sarc_sushilogtmp_impala, a table readable by impala"); + String createSarcSushilogtmpImpala = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp_impala " + + "STORED AS PARQUET AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp"; + stmtHive.executeUpdate(createSarcSushilogtmpImpala); + logger.info("Created sarc_sushilogtmp_impala"); - logger.info("Making sarc_sushilogtmp visible to impala"); - String invalidateMetadata = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() - + ".sarc_sushilogtmp_impala;"; - stmtImpala.executeUpdate(invalidateMetadata); + logger.info("Making sarc_sushilogtmp visible to impala"); + String invalidateMetadata = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + + ".sarc_sushilogtmp_impala;"; + stmtImpala.executeUpdate(invalidateMetadata); - logger.info("Dropping downloads_stats_impala table"); - String drop_downloads_stats_impala = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".downloads_stats_impala"; - stmtHive.executeUpdate(drop_downloads_stats_impala); - logger.info("Dropped downloads_stats_impala table"); + logger.info("Dropping downloads_stats_impala table"); + String drop_downloads_stats_impala = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats_impala"; + stmtHive.executeUpdate(drop_downloads_stats_impala); + logger.info("Dropped downloads_stats_impala table"); - logger.info("Making downloads_stats_impala deletion visible to impala"); - try { - String invalidateMetadataDownloadsStatsImpala = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() - + ".downloads_stats_impala;"; - stmtImpala.executeUpdate(invalidateMetadataDownloadsStatsImpala); - } catch (SQLException sqle) { - } + logger.info("Making downloads_stats_impala deletion visible to impala"); + try { + String invalidateMetadataDownloadsStatsImpala = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats_impala;"; + stmtImpala.executeUpdate(invalidateMetadataDownloadsStatsImpala); + } catch (SQLException sqle) { + } - // We run the following query in Impala because it is faster - logger.info("Creating downloads_stats_impala"); - String createDownloadsStatsImpala = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() - + ".downloads_stats_impala AS " + - "SELECT s.source, d.id AS repository_id, " + - "ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " + - "LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_impala s, " + - ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + - ConnectDB.getStatsDBSchema() + ".datasource_results dr, " + - ConnectDB.getStatsDBSchema() + ".result_pids ro " + - "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " + - "s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS'"; - stmtImpala.executeUpdate(createDownloadsStatsImpala); - logger.info("Creating downloads_stats_impala"); + // We run the following query in Impala because it is faster + logger.info("Creating downloads_stats_impala"); + String createDownloadsStatsImpala = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats_impala AS " + + "SELECT s.source, d.id AS repository_id, " + + "ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " + + "LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_impala s, " + + ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + + ConnectDB.getStatsDBSchema() + ".datasource_results dr, " + + ConnectDB.getStatsDBSchema() + ".result_pids ro " + + "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " + + "s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS'"; + stmtImpala.executeUpdate(createDownloadsStatsImpala); + logger.info("Creating downloads_stats_impala"); - // Insert into downloads_stats - logger.info("Inserting data from downloads_stats_impala into downloads_stats"); - String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() - + ".downloads_stats SELECT * " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_impala"; - stmtHive.executeUpdate(insertDStats); - logger.info("Inserted into downloads_stats"); + // Insert into downloads_stats + logger.info("Inserting data from downloads_stats_impala into downloads_stats"); + String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats SELECT * " + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_impala"; + stmtHive.executeUpdate(insertDStats); + logger.info("Inserted into downloads_stats"); - logger.info("Creating sushilog table"); - String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() - + ".sushilog " + - "(`source` string, " + - "`repository_id` string, " + - "`rid` string, " + - "`date` string, " + - "`metric_type` string, " + - "`count` int)"; - stmtHive.executeUpdate(createSushilog); - logger.info("Created sushilog table"); + logger.info("Creating sushilog table"); + String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog " + + "(`source` string, " + + "`repository_id` string, " + + "`rid` string, " + + "`date` string, " + + "`metric_type` string, " + + "`count` int)"; + stmtHive.executeUpdate(createSushilog); + logger.info("Created sushilog table"); - // Insert into sushilog - logger.info("Inserting into sushilog"); - String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() - + ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp"; - stmtHive.executeUpdate(insertSushiLog); - logger.info("Inserted into sushilog"); + // Insert into sushilog + logger.info("Inserting into sushilog"); + String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp"; + stmtHive.executeUpdate(insertSushiLog); + logger.info("Inserted into sushilog"); - stmtHive.close(); - ConnectDB.getHiveConnection().close(); - } + stmtHive.close(); + ConnectDB.getHiveConnection().close(); + } - public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray, - String url, String issn) throws Exception { - logger.info("Processing SARC! issn: " + issn + " with url: " + url); - ConnectDB.getHiveConnection().setAutoCommit(false); + public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray, + String url, String issn) throws Exception { + logger.info("Processing SARC! issn: " + issn + " with url: " + url); + ConnectDB.getHiveConnection().setAutoCommit(false); - SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); - // Setting the starting period - Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); - logger.info("(getARReport) Starting period for log download: " + simpleDateFormat.format(start.getTime())); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("(getARReport) Starting period for log download: " + simpleDateFormat.format(start.getTime())); - // Setting the ending period (last day of the month) - Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); - end.add(Calendar.MONTH, +1); - end.add(Calendar.DAY_OF_MONTH, -1); - logger.info("(getARReport) Ending period for log download: " + simpleDateFormat.format(end.getTime())); + // Setting the ending period (last day of the month) + Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + end.add(Calendar.MONTH, +1); + end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("(getARReport) Ending period for log download: " + simpleDateFormat.format(end.getTime())); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - PreparedStatement st = ConnectDB - .getHiveConnection() - .prepareStatement( - "SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?"); - st.setString(1, issn); - ResultSet rs_date = st.executeQuery(); - while (rs_date.next()) { - if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") - && !rs_date.getString(1).equals("")) { - start.setTime(sdf.parse(rs_date.getString(1))); - } - } - rs_date.close(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + PreparedStatement st = ConnectDB + .getHiveConnection() + .prepareStatement( + "SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?"); + st.setString(1, issn); + ResultSet rs_date = st.executeQuery(); + Date dateMax = null; + while (rs_date.next()) { + if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") + && !rs_date.getString(1).equals("")) { + start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); + } + } + rs_date.close(); - // Creating the needed configuration for the correct storing of data - Configuration config = new Configuration(); - config.addResource(new Path("/etc/hadoop/conf/core-site.xml")); - config.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml")); - config - .set( - "fs.hdfs.impl", - org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - config - .set( - "fs.file.impl", - org.apache.hadoop.fs.LocalFileSystem.class.getName()); - FileSystem dfs = FileSystem.get(config); + // Creating the needed configuration for the correct storing of data + Configuration config = new Configuration(); + config.addResource(new Path("/etc/hadoop/conf/core-site.xml")); + config.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml")); + config + .set( + "fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + config + .set( + "fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem dfs = FileSystem.get(config); - while (start.before(end)) { - String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate=" - + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()); - start.add(Calendar.MONTH, 1); + if (dateMax != null && start.getTime().compareTo(dateMax) <= 0) { + logger.info("Date found in logs " + dateMax + " and not downloanding logs for " + issn); + } else { + + while (start.before(end)) { + String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate=" + + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()); + start.add(Calendar.MONTH, 1); - logger.info("(getARReport) Getting report: " + reportUrl); - String text = getJson(reportUrl); - if (text == null) { - continue; - } + logger.info("(getARReport) Getting report: " + reportUrl); + String text = getJson(reportUrl); + if (text == null) { + continue; + } - JSONParser parser = new JSONParser(); - JSONObject jsonObject = null; - try { - jsonObject = (JSONObject) parser.parse(text); - } - // if there is a parsing error continue with the next url - catch (ParseException pe) { - continue; - } + JSONParser parser = new JSONParser(); + JSONObject jsonObject = null; + try { + jsonObject = (JSONObject) parser.parse(text); + } // if there is a parsing error continue with the next url + catch (ParseException pe) { + continue; + } - jsonObject = (JSONObject) jsonObject.get("sc:ReportResponse"); - jsonObject = (JSONObject) jsonObject.get("sc:Report"); - if (jsonObject == null) { - continue; - } - jsonObject = (JSONObject) jsonObject.get("c:Report"); - jsonObject = (JSONObject) jsonObject.get("c:Customer"); - Object obj = jsonObject.get("c:ReportItems"); - JSONArray jsonArray = new JSONArray(); - if (obj instanceof JSONObject) { - jsonArray.add(obj); - } else { - jsonArray = (JSONArray) obj; - // jsonArray = (JSONArray) jsonObject.get("c:ReportItems"); - } - if (jsonArray == null) { - continue; - } + jsonObject = (JSONObject) jsonObject.get("sc:ReportResponse"); + jsonObject = (JSONObject) jsonObject.get("sc:Report"); + if (jsonObject == null) { + continue; + } + jsonObject = (JSONObject) jsonObject.get("c:Report"); + jsonObject = (JSONObject) jsonObject.get("c:Customer"); + Object obj = jsonObject.get("c:ReportItems"); + JSONArray jsonArray = new JSONArray(); + if (obj instanceof JSONObject) { + jsonArray.add(obj); + } else { + jsonArray = (JSONArray) obj; + // jsonArray = (JSONArray) jsonObject.get("c:ReportItems"); + } + if (jsonArray == null) { + continue; + } - // Creating the file in the filesystem for the ItemIdentifier as array object - String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_" + - simpleDateFormat.format(start.getTime()) + ".json"; - logger.info("Storing to file: " + filePathArray); - FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true); + // Creating the file in the filesystem for the ItemIdentifier as array object + String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_" + + simpleDateFormat.format(start.getTime()) + ".json"; + logger.info("Storing to file: " + filePathArray); + FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true); - // Creating the file in the filesystem for the ItemIdentifier as array object - String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_" + - simpleDateFormat.format(start.getTime()) + ".json"; - logger.info("Storing to file: " + filePathNonArray); - FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true); + // Creating the file in the filesystem for the ItemIdentifier as array object + String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_" + + simpleDateFormat.format(start.getTime()) + ".json"; + logger.info("Storing to file: " + filePathNonArray); + FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true); - for (Object aJsonArray : jsonArray) { + for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRow = (JSONObject) aJsonArray; - renameKeysRecursively(":", jsonObjectRow); + JSONObject jsonObjectRow = (JSONObject) aJsonArray; + renameKeysRecursively(":", jsonObjectRow); - if (jsonObjectRow.get("ItemIdentifier") instanceof JSONObject) { - finNonArray.write(jsonObjectRow.toJSONString().getBytes()); - finNonArray.writeChar('\n'); - } else { - finArray.write(jsonObjectRow.toJSONString().getBytes()); - finArray.writeChar('\n'); - } - } + if (jsonObjectRow.get("ItemIdentifier") instanceof JSONObject) { + finNonArray.write(jsonObjectRow.toJSONString().getBytes()); + finNonArray.writeChar('\n'); + } else { + finArray.write(jsonObjectRow.toJSONString().getBytes()); + finArray.writeChar('\n'); + } + } - finArray.close(); - finNonArray.close(); + finArray.close(); + finNonArray.close(); - // Check the file size and if it is too big, delete it - File fileArray = new File(filePathArray); + // Check the file size and if it is too big, delete it + File fileArray = new File(filePathArray); if (fileArray.length() == 0) - fileArray.delete(); - File fileNonArray = new File(filePathNonArray); + fileArray.delete(); + File fileNonArray = new File(filePathNonArray); if (fileNonArray.length() == 0) - fileNonArray.delete(); + fileNonArray.delete(); - } + } - dfs.close(); + dfs.close(); + } + //ConnectDB.getHiveConnection().close(); + } - ConnectDB.getHiveConnection().close(); - } - - private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception { - for (Object jjval : givenJsonObj) { - if (jjval instanceof JSONArray) - renameKeysRecursively(delimiter, (JSONArray) jjval); - else if (jjval instanceof JSONObject) - renameKeysRecursively(delimiter, (JSONObject) jjval); - // All other types of vals - else + private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception { + for (Object jjval : givenJsonObj) { + if (jjval instanceof JSONArray) { + renameKeysRecursively(delimiter, (JSONArray) jjval); + } else if (jjval instanceof JSONObject) { + renameKeysRecursively(delimiter, (JSONObject) jjval); + } // All other types of vals + else ; - } - } + } + } - private void renameKeysRecursively(String delimiter, JSONObject givenJsonObj) throws Exception { - Set jkeys = new HashSet(givenJsonObj.keySet()); - for (String jkey : jkeys) { + private void renameKeysRecursively(String delimiter, JSONObject givenJsonObj) throws Exception { + Set jkeys = new HashSet(givenJsonObj.keySet()); + for (String jkey : jkeys) { - String[] splitArray = jkey.split(delimiter); - String newJkey = splitArray[splitArray.length - 1]; + String[] splitArray = jkey.split(delimiter); + String newJkey = splitArray[splitArray.length - 1]; - Object jval = givenJsonObj.get(jkey); - givenJsonObj.remove(jkey); - givenJsonObj.put(newJkey, jval); + Object jval = givenJsonObj.get(jkey); + givenJsonObj.remove(jkey); + givenJsonObj.put(newJkey, jval); - if (jval instanceof JSONObject) - renameKeysRecursively(delimiter, (JSONObject) jval); + if (jval instanceof JSONObject) { + renameKeysRecursively(delimiter, (JSONObject) jval); + } - if (jval instanceof JSONArray) { - renameKeysRecursively(delimiter, (JSONArray) jval); - } - } - } + if (jval instanceof JSONArray) { + renameKeysRecursively(delimiter, (JSONArray) jval); + } + } + } - private String getJson(String url) throws Exception { - // String cred=username+":"+password; - // String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes()); - try { - URL website = new URL(url); - URLConnection connection = website.openConnection(); - // connection.setRequestProperty ("Authorization", "Basic "+encoded); - StringBuilder response; - try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - response = new StringBuilder(); - String inputLine; - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); - response.append("\n"); - } - } - return response.toString(); - } catch (Exception e) { + private String getJson(String url) throws Exception { + // String cred=username+":"+password; + // String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes()); + try { + URL website = new URL(url); + URLConnection connection = website.openConnection(); + // connection.setRequestProperty ("Authorization", "Basic "+encoded); + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + response.append("\n"); + } + } + return response.toString(); + } catch (Exception e) { - // Logging error and silently continuing - logger.error("Failed to get URL: " + e); - System.out.println("Failed to get URL: " + e); + // Logging error and silently continuing + logger.error("Failed to get URL: " + e); + System.out.println("Failed to get URL: " + e); // return null; // throw new Exception("Failed to get URL: " + e.toString(), e); - } - return ""; - } + } + return ""; + } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java index 405b58bd5..ae901dfa5 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java @@ -173,7 +173,7 @@ public class UsageStatsExporter { sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats"; stmt.executeUpdate(sql); - stmt.close(); + stmt.close(); ConnectDB.getHiveConnection().close(); } }