forked from D-Net/dnet-hadoop
Corrections for irus stats
This commit is contained in:
parent
8da64d8f54
commit
2de17e7f32
|
@ -117,15 +117,15 @@ public class IrusStats {
|
|||
logger.info("Added JSON Serde jar");
|
||||
|
||||
logger.info("Dropping sushilogtmp_json table");
|
||||
String drop_sushilogtmp_json = "DROP TABLE IF EXISTS " +
|
||||
String dropSushilogtmpJson = "DROP TABLE IF EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() +
|
||||
".sushilogtmp_json";
|
||||
stmt.executeUpdate(drop_sushilogtmp_json);
|
||||
stmt.executeUpdate(dropSushilogtmpJson);
|
||||
logger.info("Dropped sushilogtmp_json table");
|
||||
|
||||
logger.info("Creating sushilogtmp_json table");
|
||||
String create_sushilogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp_json(\n" +
|
||||
logger.info("Creating irus_sushilogtmp_json table");
|
||||
String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n" +
|
||||
" `ItemIdentifier` ARRAY<\n" +
|
||||
" struct<\n" +
|
||||
" Type: STRING,\n" +
|
||||
|
@ -148,80 +148,85 @@ public class IrusStats {
|
|||
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
|
||||
"LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n" +
|
||||
"TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
stmt.executeUpdate(create_sushilogtmp_json);
|
||||
logger.info("Created sushilogtmp_json table");
|
||||
stmt.executeUpdate(createSushilogtmpJson);
|
||||
logger.info("Created irus_sushilogtmp_json table");
|
||||
|
||||
logger.info("Dropping sushilogtmp table");
|
||||
String drop_sushilogtmp = "DROP TABLE IF EXISTS " +
|
||||
logger.info("Dropping irus_sushilogtmp table");
|
||||
String dropSushilogtmp = "DROP TABLE IF EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() +
|
||||
".sushilogtmp";
|
||||
stmt.executeUpdate(drop_sushilogtmp);
|
||||
logger.info("Dropped sushilogtmp table");
|
||||
".irus_sushilogtmp";
|
||||
stmt.executeUpdate(dropSushilogtmp);
|
||||
logger.info("Dropped irus_sushilogtmp table");
|
||||
|
||||
logger.info("Creating sushilogtmp table");
|
||||
String create_sushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilogtmp(source STRING, repository STRING, " +
|
||||
logger.info("Creating irus_sushilogtmp table");
|
||||
String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp(source STRING, repository STRING, " +
|
||||
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
|
||||
+
|
||||
"tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(create_sushilogtmp);
|
||||
logger.info("Created sushilogtmp table");
|
||||
stmt.executeUpdate(createSushilogtmp);
|
||||
logger.info("Created irus_sushilogtmp table");
|
||||
|
||||
logger.info("Inserting to sushilogtmp table");
|
||||
String insert_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp " +
|
||||
logger.info("Inserting to irus_sushilogtmp table");
|
||||
String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp " +
|
||||
"SELECT 'IRUS-UK', 'opendoar____::', `ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " +
|
||||
"`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " +
|
||||
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilogtmp_json " +
|
||||
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json " +
|
||||
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
|
||||
"LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " +
|
||||
"WHERE `ItemIdent`.`Type`= 'OAI'";
|
||||
stmt.executeUpdate(insert_sushilogtmp);
|
||||
logger.info("Inserted to sushilogtmp table");
|
||||
stmt.executeUpdate(insertSushilogtmp);
|
||||
logger.info("Inserted to irus_sushilogtmp table");
|
||||
|
||||
logger.info("Creating downloads_stats table");
|
||||
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".downloads_stats " +
|
||||
"(`source` string, " +
|
||||
"`repository_id` string, " +
|
||||
"`result_id` string, " +
|
||||
"`date` string, " +
|
||||
"`count` bigint, " +
|
||||
"`openaire` bigint)";
|
||||
stmt.executeUpdate(createDownloadsStats);
|
||||
logger.info("Created downloads_stats table");
|
||||
|
||||
logger.info("Inserting into downloads_stats");
|
||||
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
|
||||
"SELECT s.source, d.id AS repository_id, " +
|
||||
"ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " +
|
||||
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp s, " +
|
||||
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
|
||||
ConnectDB.getStatsDBSchema() + ".result_oids ro " +
|
||||
"WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'";
|
||||
stmt.executeUpdate(insertDStats);
|
||||
logger.info("Inserted into downloads_stats");
|
||||
|
||||
String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM " +
|
||||
ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp";
|
||||
stmt.executeUpdate(insertToShushilog);
|
||||
|
||||
ConnectDB.getHiveConnection().close();
|
||||
|
||||
// // !!!!!!!!!!!!!!!!!!!!!
|
||||
// // To do the following
|
||||
// // !!!!!!!!!!!!!!!!!!!!!
|
||||
//
|
||||
// // String sql = "INSERT INTO sushi_result_downloads SELECT s.source, d.id AS repository, ro.id, s.date, s.count
|
||||
// // FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND s.oai=ro.orid AND
|
||||
// // metric_type='ft_total'";
|
||||
// // String sql = "SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date)
|
||||
// // ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count INTO
|
||||
// // downloads_stats FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND
|
||||
// // s.oai=ro.orid AND metric_type='ft_total'";
|
||||
// // String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id,
|
||||
// // extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0')
|
||||
// // as date, s.count FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND
|
||||
// // s.oai=ro.orid AND metric_type='ft_total';";
|
||||
// String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilogtmp s, public.datasource_oids d, public.result_oids ro WHERE s.repository=d.orid AND s.rid=ro.orid AND metric_type='ft_total' AND s.source='IRUS-UK';";
|
||||
//
|
||||
// stmt.executeUpdate(sql);
|
||||
//
|
||||
// sql = "Insert into sushilog select * from sushilogtmp;";
|
||||
// stmt.executeUpdate(sql);
|
||||
//
|
||||
// ConnectDB.getConnection().close();
|
||||
}
|
||||
|
||||
public void getIrusRRReport(String irusUKReportPath) throws Exception {
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM");
|
||||
// Setting the starting period
|
||||
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
|
||||
logger.info("Starting period for log download: " + sdf.format(start.getTime()));
|
||||
logger.info("(getIrusRRReport) Starting period for log download: " + sdf.format(start.getTime()));
|
||||
|
||||
// Setting the ending period (last day of the month)
|
||||
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
|
||||
end.add(Calendar.MONTH, +1);
|
||||
end.add(Calendar.DAY_OF_MONTH, -1);
|
||||
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
|
||||
logger.info("(getIrusRRReport) Ending period for log download: " + sdf.format(end.getTime()));
|
||||
|
||||
String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=" +
|
||||
sdf.format(start.getTime()) + "&EndDate=" + sdf.format(end.getTime()) +
|
||||
"&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
|
||||
|
||||
logger.info("(processIrusRRReport) Getting report: " + reportUrl);
|
||||
logger.info("(getIrusRRReport) Getting report: " + reportUrl);
|
||||
|
||||
String text = getJson(reportUrl, "", "");
|
||||
|
||||
|
@ -242,14 +247,13 @@ public class IrusStats {
|
|||
if (opendoar.get("Type").toString().equals("OpenDOAR")) {
|
||||
i++;
|
||||
opendoarsToVisit.add(opendoar.get("Value").toString());
|
||||
getIrusIRReport(opendoar.get("Value").toString(), irusUKReportPath);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// break;
|
||||
}
|
||||
|
||||
logger.info("Found the following opendoars for download: " + opendoarsToVisit);
|
||||
logger.info("(getIrusRRReport) Found the following opendoars for download: " + opendoarsToVisit);
|
||||
|
||||
if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0 &&
|
||||
ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) {
|
||||
|
@ -257,19 +261,19 @@ public class IrusStats {
|
|||
opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
|
||||
}
|
||||
|
||||
logger.info("Downloading the followins opendoars: " + opendoarsToVisit);
|
||||
logger.info("(getIrusRRReport) Downloading the followins opendoars: " + opendoarsToVisit);
|
||||
|
||||
for (String opendoar : opendoarsToVisit) {
|
||||
logger.info("Now working on piwikId: " + opendoar);
|
||||
this.getIrusIRReport(opendoar, irusUKReportPath);
|
||||
}
|
||||
|
||||
logger.info("Finished with report: " + reportUrl);
|
||||
logger.info("(getIrusRRReport) Finished with report: " + reportUrl);
|
||||
}
|
||||
|
||||
private void getIrusIRReport(String opendoar, String irusUKReportPath) throws Exception {
|
||||
|
||||
logger.info("(processIrusIRReport) Getting report(s) with opendoar: " + opendoar);
|
||||
logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar);
|
||||
|
||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
|
||||
|
@ -277,13 +281,13 @@ public class IrusStats {
|
|||
|
||||
// Setting the starting period
|
||||
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
|
||||
logger.info("Starting period for log download: " + simpleDateFormat.format(start.getTime()));
|
||||
logger.info("(getIrusIRReport) Starting period for log download: " + simpleDateFormat.format(start.getTime()));
|
||||
|
||||
// Setting the ending period (last day of the month)
|
||||
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
|
||||
end.add(Calendar.MONTH, +1);
|
||||
end.add(Calendar.DAY_OF_MONTH, -1);
|
||||
logger.info("Ending period for log download: " + simpleDateFormat.format(end.getTime()));
|
||||
logger.info("(getIrusIRReport) Ending period for log download: " + simpleDateFormat.format(end.getTime()));
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
PreparedStatement st = ConnectDB
|
||||
|
@ -343,7 +347,7 @@ public class IrusStats {
|
|||
|
||||
ConnectDB.getHiveConnection().close();
|
||||
|
||||
logger.info("(processIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar);
|
||||
logger.info("(getIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar);
|
||||
}
|
||||
|
||||
private String getJson(String url) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue