2020-05-07 18:00:03 +02:00
2020-05-12 19:38:31 +02:00
package eu.dnetlib.oa.graph.usagestats.export ;
2020-05-07 18:00:03 +02:00
import java.io.* ;
// import java.io.BufferedReader;
// import java.io.InputStreamReader;
import java.net.URL ;
import java.net.URLConnection ;
import java.sql.PreparedStatement ;
import java.sql.ResultSet ;
2020-09-16 20:46:32 +02:00
import java.sql.SQLException ;
2020-05-07 18:00:03 +02:00
import java.sql.Statement ;
import java.text.SimpleDateFormat ;
2020-09-17 20:58:05 +02:00
import java.util.ArrayList ;
2020-05-07 18:00:03 +02:00
import java.util.Calendar ;
2020-09-16 19:30:36 +02:00
import java.util.HashSet ;
2020-09-17 20:58:05 +02:00
import java.util.List ;
2020-09-15 17:08:42 +02:00
import java.util.Set ;
2020-05-07 18:00:03 +02:00
2020-07-25 12:17:47 +02:00
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.FSDataOutputStream ;
import org.apache.hadoop.fs.FileSystem ;
import org.apache.hadoop.fs.Path ;
2020-05-07 18:00:03 +02:00
import org.json.simple.JSONArray ;
import org.json.simple.JSONObject ;
import org.json.simple.parser.JSONParser ;
2020-09-16 19:30:36 +02:00
import org.json.simple.parser.ParseException ;
2020-10-02 15:25:21 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2020-05-07 18:00:03 +02:00
/ * *
2020-10-02 15:25:21 +02:00
* @author D . Pierrakos , S . Zoupanos
2020-05-07 18:00:03 +02:00
* /
public class SarcStats {
2020-10-10 11:06:31 +02:00
private Statement stmtHive = null ;
private Statement stmtImpala = null ;
2020-05-07 18:00:03 +02:00
2020-10-02 15:25:21 +02:00
private static final Logger logger = LoggerFactory . getLogger ( SarcStats . class ) ;
2020-05-07 18:00:03 +02:00
public SarcStats ( ) throws Exception {
2020-07-22 18:22:04 +02:00
// createTables();
2020-05-07 18:00:03 +02:00
}
private void createTables ( ) throws Exception {
try {
2020-10-10 11:06:31 +02:00
stmtHive = ConnectDB . getHiveConnection ( ) . createStatement ( ) ;
2020-05-07 18:00:03 +02:00
String sqlCreateTableSushiLog = " CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type)); " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( sqlCreateTableSushiLog ) ;
2020-05-07 18:00:03 +02:00
// String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
String sqlcreateRuleSushiLog = " CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO sushilog "
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository, "
+ " sushilog.rid, sushilog.date "
+ " FROM sushilog "
+ " WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING; " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( sqlcreateRuleSushiLog ) ;
2020-05-07 18:00:03 +02:00
String createSushiIndex = " create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type); " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( createSushiIndex ) ;
2020-05-07 18:00:03 +02:00
2020-10-10 11:06:31 +02:00
stmtHive . close ( ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Sushi Tables Created " ) ;
2020-05-07 18:00:03 +02:00
} catch ( Exception e ) {
2020-10-02 15:25:21 +02:00
logger . error ( " Failed to create tables: " + e ) ;
2020-05-07 18:00:03 +02:00
throw new Exception ( " Failed to create tables: " + e . toString ( ) , e ) ;
}
}
2020-10-07 23:24:58 +02:00
public void reCreateLogDirs ( ) throws IOException {
FileSystem dfs = FileSystem . get ( new Configuration ( ) ) ;
2020-10-08 17:53:23 +02:00
2020-10-07 23:24:58 +02:00
logger . info ( " Deleting sarcsReport (Array) directory: " + ExecuteWorkflow . sarcsReportPathArray ) ;
dfs . delete ( new Path ( ExecuteWorkflow . sarcsReportPathArray ) , true ) ;
logger . info ( " Deleting sarcsReport (NonArray) directory: " + ExecuteWorkflow . sarcsReportPathNonArray ) ;
dfs . delete ( new Path ( ExecuteWorkflow . sarcsReportPathNonArray ) , true ) ;
2020-10-08 17:53:23 +02:00
2020-10-07 23:24:58 +02:00
logger . info ( " Creating sarcsReport (Array) directory: " + ExecuteWorkflow . sarcsReportPathArray ) ;
dfs . mkdirs ( new Path ( ExecuteWorkflow . sarcsReportPathArray ) ) ;
logger . info ( " Creating sarcsReport (NonArray) directory: " + ExecuteWorkflow . sarcsReportPathNonArray ) ;
dfs . mkdirs ( new Path ( ExecuteWorkflow . sarcsReportPathNonArray ) ) ;
}
2020-10-08 17:53:23 +02:00
2020-10-09 20:48:52 +02:00
public void processSarc ( String sarcsReportPathArray , String sarcsReportPathNonArray ) throws Exception {
2020-09-27 12:19:45 +02:00
Statement stmt = ConnectDB . getHiveConnection ( ) . createStatement ( ) ;
ConnectDB . getHiveConnection ( ) . setAutoCommit ( false ) ;
2020-09-16 20:46:32 +02:00
2020-10-02 15:25:21 +02:00
logger . info ( " Adding JSON Serde jar " ) ;
2020-09-16 20:46:32 +02:00
stmt . executeUpdate ( " add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar " ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Added JSON Serde jar " ) ;
2020-09-16 20:46:32 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Dropping sarc_sushilogtmp_json_array table " ) ;
2020-09-16 20:46:32 +02:00
String drop_sarc_sushilogtmp_json_array = " DROP TABLE IF EXISTS " +
2020-10-09 20:48:52 +02:00
ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_json_array " ;
2020-09-16 20:46:32 +02:00
stmt . executeUpdate ( drop_sarc_sushilogtmp_json_array ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Dropped sarc_sushilogtmp_json_array table " ) ;
2020-09-16 20:46:32 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Creating sarc_sushilogtmp_json_array table " ) ;
2020-09-16 20:46:32 +02:00
String create_sarc_sushilogtmp_json_array = " CREATE EXTERNAL TABLE IF NOT EXISTS " +
2020-10-09 20:48:52 +02:00
ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_json_array( \ n " +
2020-09-16 20:46:32 +02:00
" `ItemIdentifier` ARRAY< \ n " +
" struct< \ n " +
" `Type`: STRING, \ n " +
" `Value`: STRING \ n " +
" > \ n " +
" >, \ n " +
" `ItemPerformance` struct< \ n " +
" `Period`: struct< \ n " +
" `Begin`: STRING, \ n " +
" `End`: STRING \ n " +
" >, \ n " +
" `Instance`: struct< \ n " +
" `Count`: STRING, \ n " +
" `MetricType`: STRING \ n " +
" > \ n " +
" > \ n " +
" ) " +
" ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' \ n " +
2020-10-09 20:48:52 +02:00
" LOCATION ' " + sarcsReportPathArray + " /' \ n " +
2020-09-16 20:46:32 +02:00
" TBLPROPERTIES ( \" transactional \" = \" false \" ) " ;
stmt . executeUpdate ( create_sarc_sushilogtmp_json_array ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Created sarc_sushilogtmp_json_array table " ) ;
2020-09-16 20:46:32 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Dropping sarc_sushilogtmp_json_non_array table " ) ;
2020-09-16 20:46:32 +02:00
String drop_sarc_sushilogtmp_json_non_array = " DROP TABLE IF EXISTS " +
ConnectDB . getUsageStatsDBSchema ( ) +
2020-10-09 20:48:52 +02:00
" .sarc_sushilogtmp_json_non_array " ;
2020-09-16 20:46:32 +02:00
stmt . executeUpdate ( drop_sarc_sushilogtmp_json_non_array ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Dropped sarc_sushilogtmp_json_non_array table " ) ;
2020-09-16 20:46:32 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Creating sarc_sushilogtmp_json_non_array table " ) ;
2020-09-16 20:46:32 +02:00
String create_sarc_sushilogtmp_json_non_array = " CREATE EXTERNAL TABLE IF NOT EXISTS " +
2020-10-09 20:48:52 +02:00
ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_json_non_array ( \ n " +
2020-09-16 20:46:32 +02:00
" `ItemIdentifier` struct< \ n " +
" `Type`: STRING, \ n " +
" `Value`: STRING \ n " +
" >, \ n " +
" `ItemPerformance` struct< \ n " +
" `Period`: struct< \ n " +
" `Begin`: STRING, \ n " +
" `End`: STRING \ n " +
" >, \ n " +
" `Instance`: struct< \ n " +
" `Count`: STRING, \ n " +
" `MetricType`: STRING \ n " +
" > \ n " +
" > " +
" ) " +
" ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' \ n " +
2020-10-09 20:48:52 +02:00
" LOCATION ' " + sarcsReportPathNonArray + " /' \ n " +
2020-09-16 20:46:32 +02:00
" TBLPROPERTIES ( \" transactional \" = \" false \" ) " ;
stmt . executeUpdate ( create_sarc_sushilogtmp_json_non_array ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Created sarc_sushilogtmp_json_non_array table " ) ;
2020-09-17 20:58:05 +02:00
2020-10-02 15:25:21 +02:00
logger . info ( " Creating sarc_sushilogtmp table " ) ;
2020-09-19 21:04:42 +02:00
String create_sarc_sushilogtmp = " CREATE TABLE IF NOT EXISTS " + ConnectDB . getUsageStatsDBSchema ( )
2020-09-17 20:58:05 +02:00
+ " .sarc_sushilogtmp(source STRING, repository STRING, " +
" rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
" tblproperties('transactional'='true') " ;
stmt . executeUpdate ( create_sarc_sushilogtmp ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Created sarc_sushilogtmp table " ) ;
2020-09-17 20:58:05 +02:00
2020-10-02 15:25:21 +02:00
logger . info ( " Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array) " ) ;
2020-09-17 20:58:05 +02:00
String insert_sarc_sushilogtmp = " INSERT INTO " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp " +
2020-10-10 09:59:05 +02:00
" SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
2020-10-09 20:48:52 +02:00
" `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
2020-09-17 20:58:05 +02:00
" `ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
2020-10-09 20:48:52 +02:00
" FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_json_array " +
2020-10-20 19:30:26 +02:00
" LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
" WHERE `ItemIdent`.`Type`='DOI' " ;
2020-09-17 20:58:05 +02:00
stmt . executeUpdate ( insert_sarc_sushilogtmp ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array) " ) ;
2020-09-17 20:58:05 +02:00
2020-10-02 15:25:21 +02:00
logger . info ( " Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array) " ) ;
2020-09-17 20:58:05 +02:00
insert_sarc_sushilogtmp = " INSERT INTO " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp " +
2020-10-10 09:59:05 +02:00
" SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
2020-10-09 20:48:52 +02:00
" `ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
2020-09-17 20:58:05 +02:00
" `ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
2020-10-10 09:59:05 +02:00
" FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_json_non_array " ;
2020-09-17 20:58:05 +02:00
stmt . executeUpdate ( insert_sarc_sushilogtmp ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array) " ) ;
2020-09-16 20:46:32 +02:00
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-09-16 20:28:05 +02:00
}
2020-09-16 20:46:32 +02:00
2020-09-17 20:58:05 +02:00
public void getAndProcessSarc ( String sarcsReportPathArray , String sarcsReportPathNonArray ) throws Exception {
2020-09-27 12:19:45 +02:00
Statement stmt = ConnectDB . getHiveConnection ( ) . createStatement ( ) ;
ConnectDB . getHiveConnection ( ) . setAutoCommit ( false ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Creating sushilog table " ) ;
String createSushilog = " CREATE TABLE IF NOT EXISTS " + ConnectDB . getUsageStatsDBSchema ( )
+ " .sushilog " +
" (`source` string, " +
" `repository` string, " +
" `rid` string, " +
" `date` string, " +
" `metric_type` string, " +
" `count` int) " ;
stmt . executeUpdate ( createSushilog ) ;
logger . info ( " Created sushilog table " ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Dropping sarc_sushilogtmp table " ) ;
2020-09-17 20:58:05 +02:00
String drop_sarc_sushilogtmp = " DROP TABLE IF EXISTS " +
ConnectDB . getUsageStatsDBSchema ( ) +
" .sarc_sushilogtmp " ;
stmt . executeUpdate ( drop_sarc_sushilogtmp ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Dropped sarc_sushilogtmp table " ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-09-17 20:58:05 +02:00
List < String [ ] > issnAndUrls = new ArrayList < String [ ] > ( ) ;
2020-09-19 22:43:07 +02:00
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/motricidade/sushiLite/v1_7/ " , " 1646-107X "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/antropologicas/sushiLite/v1_7/ " , " 0873-819X "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/interaccoes/sushiLite/v1_7/ " , " 1646-2335 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/cct/sushiLite/v1_7/ " , " 2182-3030 "
} ) ;
2020-09-17 20:58:05 +02:00
issnAndUrls . add ( new String [ ] {
" https://actapediatrica.spp.pt/sushiLite/v1_7/ " , " 0873-9781 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/sociologiapp/sushiLite/v1_7/ " , " 0873-6529 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/finisterra/sushiLite/v1_7/ " , " 0430-5027 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/sisyphus/sushiLite/v1_7/ " , " 2182-8474 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/anestesiologia/sushiLite/v1_7/ " , " 0871-6099 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/rpe/sushiLite/v1_7/ " , " 0871-9187 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/psilogos/sushiLite/v1_7/ " , " 1646-091X "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/juridica/sushiLite/v1_7/ " , " 2183-5799 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/ecr/sushiLite/v1_7/ " , " 1647-2098 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/nascercrescer/sushiLite/v1_7/ " , " 0872-0754 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/cea/sushiLite/v1_7/ " , " 1645-3794 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/proelium/sushiLite/v1_7/ " , " 1645-8826 "
} ) ;
issnAndUrls . add ( new String [ ] {
" https://revistas.rcaap.pt/millenium/sushiLite/v1_7/ " , " 0873-3015 "
} ) ;
2020-10-09 20:48:52 +02:00
if ( ExecuteWorkflow . sarcNumberOfIssnToDownload > 0 & &
ExecuteWorkflow . sarcNumberOfIssnToDownload < = issnAndUrls . size ( ) ) {
logger . info ( " Trimming siteIds list to the size of: " + ExecuteWorkflow . sarcNumberOfIssnToDownload ) ;
issnAndUrls = issnAndUrls . subList ( 0 , ExecuteWorkflow . sarcNumberOfIssnToDownload ) ;
}
logger . info ( " (getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls ) ;
2020-09-17 20:58:05 +02:00
for ( String [ ] issnAndUrl : issnAndUrls ) {
2020-10-09 20:48:52 +02:00
logger . info ( " Now working on ISSN: " + issnAndUrl [ 1 ] ) ;
2020-09-17 20:58:05 +02:00
getARReport ( sarcsReportPathArray , sarcsReportPathNonArray , issnAndUrl [ 0 ] , issnAndUrl [ 1 ] ) ;
}
2020-05-07 18:00:03 +02:00
}
2020-09-19 22:43:07 +02:00
public void finalizeSarcStats ( ) throws Exception {
2020-10-10 11:06:31 +02:00
stmtHive = ConnectDB . getHiveConnection ( ) . createStatement ( ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . setAutoCommit ( false ) ;
2020-10-10 11:06:31 +02:00
stmtImpala = ConnectDB . getImpalaConnection ( ) . createStatement ( ) ;
2020-05-07 18:00:03 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Creating downloads_stats table " ) ;
String createDownloadsStats = " CREATE TABLE IF NOT EXISTS " + ConnectDB . getUsageStatsDBSchema ( )
+ " .downloads_stats " +
" (`source` string, " +
" `repository_id` string, " +
" `result_id` string, " +
" `date` string, " +
" `count` bigint, " +
" `openaire` bigint) " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( createDownloadsStats ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Created downloads_stats table " ) ;
2020-10-10 11:06:31 +02:00
logger . info ( " Dropping sarc_sushilogtmp_impala table " ) ;
String drop_sarc_sushilogtmp_impala = " DROP TABLE IF EXISTS " +
ConnectDB . getUsageStatsDBSchema ( ) +
" .sarc_sushilogtmp_impala " ;
stmtHive . executeUpdate ( drop_sarc_sushilogtmp_impala ) ;
logger . info ( " Dropped sarc_sushilogtmp_impala table " ) ;
logger . info ( " Creating sarc_sushilogtmp_impala, a table readable by impala " ) ;
String createSarcSushilogtmpImpala = " CREATE TABLE IF NOT EXISTS " + ConnectDB . getUsageStatsDBSchema ( )
+ " .sarc_sushilogtmp_impala " +
" STORED AS PARQUET AS SELECT * FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp " ;
stmtHive . executeUpdate ( createSarcSushilogtmpImpala ) ;
logger . info ( " Created sarc_sushilogtmp_impala " ) ;
logger . info ( " Making sarc_sushilogtmp visible to impala " ) ;
String invalidateMetadata = " INVALIDATE METADATA " + ConnectDB . getUsageStatsDBSchema ( )
+ " .sarc_sushilogtmp_impala; " ;
stmtImpala . executeUpdate ( invalidateMetadata ) ;
logger . info ( " Dropping downloads_stats_impala table " ) ;
String drop_downloads_stats_impala = " DROP TABLE IF EXISTS " +
ConnectDB . getUsageStatsDBSchema ( ) +
" .downloads_stats_impala " ;
stmtHive . executeUpdate ( drop_downloads_stats_impala ) ;
logger . info ( " Dropped downloads_stats_impala table " ) ;
logger . info ( " Making downloads_stats_impala deletion visible to impala " ) ;
try {
String invalidateMetadataDownloadsStatsImpala = " INVALIDATE METADATA " + ConnectDB . getUsageStatsDBSchema ( )
+ " .downloads_stats_impala; " ;
stmtImpala . executeUpdate ( invalidateMetadataDownloadsStatsImpala ) ;
} catch ( SQLException sqle ) {
}
// We run the following query in Impala because it is faster
logger . info ( " Creating downloads_stats_impala " ) ;
String createDownloadsStatsImpala = " CREATE TABLE " + ConnectDB . getUsageStatsDBSchema ( )
+ " .downloads_stats_impala AS " +
" SELECT s.source, d.id AS repository_id, " +
" ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " +
" LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " +
" FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp_impala s, " +
2020-09-19 22:43:07 +02:00
ConnectDB . getStatsDBSchema ( ) + " .datasource_oids d, " +
ConnectDB . getStatsDBSchema ( ) + " .datasource_results dr, " +
2020-10-10 11:06:31 +02:00
ConnectDB . getStatsDBSchema ( ) + " .result_pids ro " +
2020-09-19 22:43:07 +02:00
" WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " +
2020-10-10 13:25:20 +02:00
" s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS' " ;
2020-10-10 11:06:31 +02:00
stmtImpala . executeUpdate ( createDownloadsStatsImpala ) ;
logger . info ( " Creating downloads_stats_impala " ) ;
// Insert into downloads_stats
logger . info ( " Inserting data from downloads_stats_impala into downloads_stats " ) ;
String insertDStats = " INSERT INTO " + ConnectDB . getUsageStatsDBSchema ( )
+ " .downloads_stats SELECT * " +
" FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .downloads_stats_impala " ;
stmtHive . executeUpdate ( insertDStats ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Inserted into downloads_stats " ) ;
2020-09-19 22:43:07 +02:00
2020-10-09 20:48:52 +02:00
logger . info ( " Creating sushilog table " ) ;
String createSushilog = " CREATE TABLE IF NOT EXISTS " + ConnectDB . getUsageStatsDBSchema ( )
+ " .sushilog " +
" (`source` string, " +
" `repository_id` string, " +
" `rid` string, " +
" `date` string, " +
" `metric_type` string, " +
" `count` int) " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( createSushilog ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " Created sushilog table " ) ;
2020-09-19 22:43:07 +02:00
// Insert into sushilog
2020-10-02 15:25:21 +02:00
logger . info ( " Inserting into sushilog " ) ;
2020-09-19 22:43:07 +02:00
String insertSushiLog = " INSERT INTO " + ConnectDB . getUsageStatsDBSchema ( )
+ " .sushilog SELECT * " + " FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sarc_sushilogtmp " ;
2020-10-10 11:06:31 +02:00
stmtHive . executeUpdate ( insertSushiLog ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Inserted into sushilog " ) ;
2020-05-07 18:00:03 +02:00
2020-10-10 11:06:31 +02:00
stmtHive . close ( ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-05-07 18:00:03 +02:00
}
2020-09-16 20:28:05 +02:00
public void getARReport ( String sarcsReportPathArray , String sarcsReportPathNonArray ,
2020-09-16 19:30:36 +02:00
String url , String issn ) throws Exception {
2020-10-02 15:25:21 +02:00
logger . info ( " Processing SARC! issn: " + issn + " with url: " + url ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . setAutoCommit ( false ) ;
2020-05-07 18:00:03 +02:00
SimpleDateFormat simpleDateFormat = new SimpleDateFormat ( " YYYY-MM " ) ;
2020-10-09 20:48:52 +02:00
// Setting the starting period
Calendar start = ( Calendar ) ExecuteWorkflow . startingLogPeriod . clone ( ) ;
logger . info ( " (getARReport) Starting period for log download: " + simpleDateFormat . format ( start . getTime ( ) ) ) ;
2020-05-07 18:00:03 +02:00
2020-10-09 20:48:52 +02:00
// Setting the ending period (last day of the month)
Calendar end = ( Calendar ) ExecuteWorkflow . endingLogPeriod . clone ( ) ;
end . add ( Calendar . MONTH , + 1 ) ;
2020-05-07 18:00:03 +02:00
end . add ( Calendar . DAY_OF_MONTH , - 1 ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " (getARReport) Ending period for log download: " + simpleDateFormat . format ( end . getTime ( ) ) ) ;
2020-05-07 18:00:03 +02:00
SimpleDateFormat sdf = new SimpleDateFormat ( " yyyy-MM-dd " ) ;
PreparedStatement st = ConnectDB
2020-09-27 12:19:45 +02:00
. getHiveConnection ( )
2020-09-14 19:10:53 +02:00
. prepareStatement (
" SELECT max(date) FROM " + ConnectDB . getUsageStatsDBSchema ( ) + " .sushilog WHERE repository=? " ) ;
2020-05-07 18:00:03 +02:00
st . setString ( 1 , issn ) ;
ResultSet rs_date = st . executeQuery ( ) ;
while ( rs_date . next ( ) ) {
if ( rs_date . getString ( 1 ) ! = null & & ! rs_date . getString ( 1 ) . equals ( " null " )
& & ! rs_date . getString ( 1 ) . equals ( " " ) ) {
start . setTime ( sdf . parse ( rs_date . getString ( 1 ) ) ) ;
}
}
rs_date . close ( ) ;
2020-09-19 21:04:42 +02:00
// Creating the needed configuration for the correct storing of data
Configuration config = new Configuration ( ) ;
config . addResource ( new Path ( " /etc/hadoop/conf/core-site.xml " ) ) ;
config . addResource ( new Path ( " /etc/hadoop/conf/hdfs-site.xml " ) ) ;
config
. set (
" fs.hdfs.impl " ,
org . apache . hadoop . hdfs . DistributedFileSystem . class . getName ( ) ) ;
config
. set (
" fs.file.impl " ,
org . apache . hadoop . fs . LocalFileSystem . class . getName ( ) ) ;
FileSystem dfs = FileSystem . get ( config ) ;
2020-05-07 18:00:03 +02:00
while ( start . before ( end ) ) {
String reportUrl = url + " GetReport/?Report=AR1&Format=json&BeginDate= "
+ simpleDateFormat . format ( start . getTime ( ) ) + " &EndDate= " + simpleDateFormat . format ( start . getTime ( ) ) ;
start . add ( Calendar . MONTH , 1 ) ;
2020-10-09 20:48:52 +02:00
logger . info ( " (getARReport) Getting report: " + reportUrl ) ;
2020-05-07 18:00:03 +02:00
String text = getJson ( reportUrl ) ;
if ( text = = null ) {
continue ;
}
JSONParser parser = new JSONParser ( ) ;
2020-09-16 19:30:36 +02:00
JSONObject jsonObject = null ;
try {
jsonObject = ( JSONObject ) parser . parse ( text ) ;
}
// if there is a parsing error continue with the next url
catch ( ParseException pe ) {
continue ;
}
2020-05-07 18:00:03 +02:00
jsonObject = ( JSONObject ) jsonObject . get ( " sc:ReportResponse " ) ;
jsonObject = ( JSONObject ) jsonObject . get ( " sc:Report " ) ;
if ( jsonObject = = null ) {
continue ;
}
jsonObject = ( JSONObject ) jsonObject . get ( " c:Report " ) ;
jsonObject = ( JSONObject ) jsonObject . get ( " c:Customer " ) ;
Object obj = jsonObject . get ( " c:ReportItems " ) ;
JSONArray jsonArray = new JSONArray ( ) ;
if ( obj instanceof JSONObject ) {
jsonArray . add ( obj ) ;
} else {
jsonArray = ( JSONArray ) obj ;
// jsonArray = (JSONArray) jsonObject.get("c:ReportItems");
}
if ( jsonArray = = null ) {
continue ;
}
2020-09-16 19:30:36 +02:00
// Creating the file in the filesystem for the ItemIdentifier as array object
2020-10-09 20:48:52 +02:00
String filePathArray = sarcsReportPathArray + " /SarcsARReport_ " + issn + " _ " +
2020-07-25 12:17:47 +02:00
simpleDateFormat . format ( start . getTime ( ) ) + " .json " ;
2020-10-09 20:48:52 +02:00
logger . info ( " Storing to file: " + filePathArray ) ;
2020-09-19 21:04:42 +02:00
FSDataOutputStream finArray = dfs . create ( new Path ( filePathArray ) , true ) ;
2020-09-16 19:30:36 +02:00
// Creating the file in the filesystem for the ItemIdentifier as array object
2020-10-09 20:48:52 +02:00
String filePathNonArray = sarcsReportPathNonArray + " /SarcsARReport_ " + issn + " _ " +
2020-09-16 19:30:36 +02:00
simpleDateFormat . format ( start . getTime ( ) ) + " .json " ;
2020-10-09 20:48:52 +02:00
logger . info ( " Storing to file: " + filePathNonArray ) ;
2020-09-19 21:04:42 +02:00
FSDataOutputStream finNonArray = dfs . create ( new Path ( filePathNonArray ) , true ) ;
2020-07-25 12:17:47 +02:00
2020-05-07 18:00:03 +02:00
for ( Object aJsonArray : jsonArray ) {
2020-07-25 12:17:47 +02:00
2020-05-07 18:00:03 +02:00
JSONObject jsonObjectRow = ( JSONObject ) aJsonArray ;
2020-09-16 19:30:36 +02:00
renameKeysRecursively ( " : " , jsonObjectRow ) ;
if ( jsonObjectRow . get ( " ItemIdentifier " ) instanceof JSONObject ) {
finNonArray . write ( jsonObjectRow . toJSONString ( ) . getBytes ( ) ) ;
finNonArray . writeChar ( '\n' ) ;
} else {
finArray . write ( jsonObjectRow . toJSONString ( ) . getBytes ( ) ) ;
finArray . writeChar ( '\n' ) ;
2020-09-15 17:08:42 +02:00
}
2020-05-07 18:00:03 +02:00
}
2020-09-16 20:28:05 +02:00
2020-09-16 19:30:36 +02:00
finArray . close ( ) ;
2020-09-16 20:28:05 +02:00
finNonArray . close ( ) ;
2020-09-16 19:30:36 +02:00
2020-09-17 20:58:05 +02:00
// Check the file size and if it is too big, delete it
File fileArray = new File ( filePathArray ) ;
if ( fileArray . length ( ) = = 0 )
fileArray . delete ( ) ;
File fileNonArray = new File ( filePathNonArray ) ;
if ( fileNonArray . length ( ) = = 0 )
fileNonArray . delete ( ) ;
2020-05-07 18:00:03 +02:00
}
2020-09-19 21:04:42 +02:00
dfs . close ( ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-05-07 18:00:03 +02:00
}
2020-09-16 20:28:05 +02:00
2020-09-16 19:30:36 +02:00
private void renameKeysRecursively ( String delimiter , JSONArray givenJsonObj ) throws Exception {
2020-09-16 20:28:05 +02:00
for ( Object jjval : givenJsonObj ) {
if ( jjval instanceof JSONArray )
renameKeysRecursively ( delimiter , ( JSONArray ) jjval ) ;
else if ( jjval instanceof JSONObject )
renameKeysRecursively ( delimiter , ( JSONObject ) jjval ) ;
2020-09-16 19:30:36 +02:00
// All other types of vals
2020-09-16 20:28:05 +02:00
else
;
2020-09-16 19:30:36 +02:00
}
}
2020-09-16 20:28:05 +02:00
2020-09-16 19:30:36 +02:00
private void renameKeysRecursively ( String delimiter , JSONObject givenJsonObj ) throws Exception {
Set < String > jkeys = new HashSet < String > ( givenJsonObj . keySet ( ) ) ;
for ( String jkey : jkeys ) {
2020-09-16 20:28:05 +02:00
2020-09-16 19:30:36 +02:00
String [ ] splitArray = jkey . split ( delimiter ) ;
String newJkey = splitArray [ splitArray . length - 1 ] ;
2020-09-16 20:28:05 +02:00
2020-09-16 19:30:36 +02:00
Object jval = givenJsonObj . get ( jkey ) ;
givenJsonObj . remove ( jkey ) ;
givenJsonObj . put ( newJkey , jval ) ;
2020-09-16 20:28:05 +02:00
if ( jval instanceof JSONObject )
renameKeysRecursively ( delimiter , ( JSONObject ) jval ) ;
if ( jval instanceof JSONArray ) {
renameKeysRecursively ( delimiter , ( JSONArray ) jval ) ;
2020-09-16 19:30:36 +02:00
}
}
}
2020-07-22 18:22:04 +02:00
private String getJson ( String url ) throws Exception {
2020-05-07 18:00:03 +02:00
// String cred=username+":"+password;
// String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
try {
URL website = new URL ( url ) ;
URLConnection connection = website . openConnection ( ) ;
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response ;
try ( BufferedReader in = new BufferedReader ( new InputStreamReader ( connection . getInputStream ( ) ) ) ) {
response = new StringBuilder ( ) ;
String inputLine ;
while ( ( inputLine = in . readLine ( ) ) ! = null ) {
response . append ( inputLine ) ;
response . append ( " \ n " ) ;
}
}
return response . toString ( ) ;
} catch ( Exception e ) {
2020-07-22 18:22:04 +02:00
2020-09-16 19:30:36 +02:00
// Logging error and silently continuing
2020-10-02 15:25:21 +02:00
logger . error ( " Failed to get URL: " + e ) ;
2020-07-22 18:22:04 +02:00
System . out . println ( " Failed to get URL: " + e ) ;
// return null;
2020-09-16 19:30:36 +02:00
// throw new Exception("Failed to get URL: " + e.toString(), e);
2020-05-07 18:00:03 +02:00
}
2020-09-16 19:30:36 +02:00
return " " ;
2020-05-07 18:00:03 +02:00
}
}