diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ExecuteWorkflow.java index a48f945f3..91b27636d 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ExecuteWorkflow.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/ExecuteWorkflow.java @@ -56,7 +56,7 @@ public class ExecuteWorkflow { static boolean sarcCreateTablesEmptyDirs; static boolean sarcDownloadReports; static boolean sarcProcessStats; - static int sarcNumberOfOpendoarsToDownload; + static int sarcNumberOfIssnToDownload; public static void main(String args[]) throws Exception { @@ -152,7 +152,7 @@ public class ExecuteWorkflow { sarcProcessStats = true; else sarcProcessStats = false; - sarcNumberOfOpendoarsToDownload = Integer.parseInt(parser.get("sarcNumberOfOpendoarsToDownload")); + sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload")); UsageStatsExporter usagestatsExport = new UsageStatsExporter(); usagestatsExport.export(); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/SarcStats.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/SarcStats.java index d42c9ce0b..093457853 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/SarcStats.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/SarcStats.java @@ -85,8 +85,7 @@ public class SarcStats { dfs.mkdirs(new Path(ExecuteWorkflow.sarcsReportPathNonArray)); } - public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray, - String url, String issn) throws Exception { + public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception { Statement stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false); @@ -94,16 +93,17 @@ public class SarcStats { stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); logger.info("Added JSON Serde jar"); - logger.info("Dropping sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + " table"); - String drop_sarc_sushilogtmp_json_array = "DROP TABLE IF EXISTS " + - ConnectDB.getUsageStatsDBSchema() + - ".sarc_sushilogtmp_json_array_" + issn.replace("-", "_"); - stmt.executeUpdate(drop_sarc_sushilogtmp_json_array); - logger.info("Dropped sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + " table"); +// " + issn.replace("-", "_" - logger.info("Creating sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + " table"); + logger.info("Dropping sarc_sushilogtmp_json_array table"); + String drop_sarc_sushilogtmp_json_array = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array"; + stmt.executeUpdate(drop_sarc_sushilogtmp_json_array); + logger.info("Dropped sarc_sushilogtmp_json_array table"); + + logger.info("Creating sarc_sushilogtmp_json_array table"); String create_sarc_sushilogtmp_json_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " + - ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + "(\n" + + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n" + " `ItemIdentifier` ARRAY<\n" + " struct<\n" + " `Type`: STRING,\n" + @@ -122,21 +122,21 @@ public class SarcStats { " >\n" + ")" + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + - "LOCATION '" + sarcsReportPathArray + "/" + issn + "'\n" + + "LOCATION '" + sarcsReportPathArray + "/'\n" + "TBLPROPERTIES (\"transactional\"=\"false\")"; stmt.executeUpdate(create_sarc_sushilogtmp_json_array); - logger.info("Created sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + " table"); + logger.info("Created sarc_sushilogtmp_json_array table"); - logger.info("Dropping sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_") + " table"); + logger.info("Dropping sarc_sushilogtmp_json_non_array table"); String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + - ".sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_"); + ".sarc_sushilogtmp_json_non_array"; stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array); - logger.info("Dropped sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_") + " table"); + logger.info("Dropped sarc_sushilogtmp_json_non_array table"); - logger.info("Creating sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_") + " table"); + logger.info("Creating sarc_sushilogtmp_json_non_array table"); String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " + - ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_") + "(\n" + + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n" + " `ItemIdentifier` struct<\n" + " `Type`: STRING,\n" + " `Value`: STRING\n" + @@ -153,10 +153,10 @@ public class SarcStats { " >" + ")" + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + - "LOCATION '" + sarcsReportPathNonArray + "/" + issn + "'\n" + + "LOCATION '" + sarcsReportPathNonArray + "/'\n" + "TBLPROPERTIES (\"transactional\"=\"false\")"; stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array); - logger.info("Created sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_") + " table"); + logger.info("Created sarc_sushilogtmp_json_non_array table"); logger.info("Creating sarc_sushilogtmp table"); String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() @@ -169,33 +169,43 @@ public class SarcStats { logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + - "SELECT 'SARC-OJS', '" + issn + "' , `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + + "SELECT 'SARC-OJS', 'split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0]'," + + " `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array_" + issn.replace("-", "_") + " " - + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array " + "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent "; stmt.executeUpdate(insert_sarc_sushilogtmp); logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)"); logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " + - "SELECT 'SARC-OJS', '" + issn + "' , `ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + + "SELECT 'SARC-OJS', 'split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0]', " + + "`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " + "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " + - "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array_" + issn.replace("-", "_"); + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_arra"; stmt.executeUpdate(insert_sarc_sushilogtmp); logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)"); ConnectDB.getHiveConnection().close(); - - //////////////////////////////////// - // Add everything to the sushilog table!!!! - //////////////////////////////////// } public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception { Statement stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating sushilog table"); + String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog " + + "(`source` string, " + + "`repository` string, " + + "`rid` string, " + + "`date` string, " + + "`metric_type` string, " + + "`count` int)"; + stmt.executeUpdate(createSushilog); + logger.info("Created sushilog table"); + logger.info("Dropping sarc_sushilogtmp table"); String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + @@ -257,9 +267,17 @@ public class SarcStats { "https://revistas.rcaap.pt/millenium/sushiLite/v1_7/", "0873-3015" }); + if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0 && + ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) { + logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.sarcNumberOfIssnToDownload); + issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload); + } + + logger.info("(getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls); + for (String[] issnAndUrl : issnAndUrls) { + logger.info("Now working on ISSN: " + issnAndUrl[1]); getARReport(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]); - processSarc(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]); } } @@ -268,9 +286,20 @@ public class SarcStats { stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false); + logger.info("Creating downloads_stats table"); + String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".downloads_stats " + + "(`source` string, " + + "`repository_id` string, " + + "`result_id` string, " + + "`date` string, " + + "`count` bigint, " + + "`openaire` bigint)"; + stmt.executeUpdate(createDownloadsStats); + logger.info("Created downloads_stats table"); + // Insert into downloads_stats logger.info("Inserting into downloads_stats"); -// String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilog s, public.datasource_oids d, public.datasource_results dr, public.result_pids ro WHERE d.orid LIKE '%' || s.repository || '%' AND dr.id=d.id AND dr.result=ro.id AND s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS';"; String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats SELECT s.source, d.id AS repository_id, " + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) AS date, s.count, '0' " + @@ -283,6 +312,18 @@ public class SarcStats { stmt.executeUpdate(insertDStats); logger.info("Inserted into downloads_stats"); + logger.info("Creating sushilog table"); + String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".sushilog " + + "(`source` string, " + + "`repository_id` string, " + + "`rid` string, " + + "`date` string, " + + "`metric_type` string, " + + "`count` int)"; + stmt.executeUpdate(createSushilog); + logger.info("Created sushilog table"); + // Insert into sushilog logger.info("Inserting into sushilog"); String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() @@ -300,14 +341,15 @@ public class SarcStats { ConnectDB.getHiveConnection().setAutoCommit(false); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("(getARReport) Starting period for log download: " + simpleDateFormat.format(start.getTime())); - Calendar start = Calendar.getInstance(); - start.set(Calendar.YEAR, 2016); - start.set(Calendar.MONTH, Calendar.JANUARY); - // start.setTime(simpleDateFormat.parse("2016-01")); - - Calendar end = Calendar.getInstance(); + // Setting the ending period (last day of the month) + Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + end.add(Calendar.MONTH, +1); end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("(getARReport) Ending period for log download: " + simpleDateFormat.format(end.getTime())); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); PreparedStatement st = ConnectDB @@ -324,12 +366,6 @@ public class SarcStats { } rs_date.close(); - PreparedStatement preparedStatement = ConnectDB - .getHiveConnection() - .prepareStatement( - "INSERT INTO sushilog (source, repository, rid, date, metric_type, count) VALUES (?,?,?,?,?,?)"); - int batch_size = 0; - // Creating the needed configuration for the correct storing of data Configuration config = new Configuration(); config.addResource(new Path("/etc/hadoop/conf/core-site.xml")); @@ -344,31 +380,17 @@ public class SarcStats { org.apache.hadoop.fs.LocalFileSystem.class.getName()); FileSystem dfs = FileSystem.get(config); - // Creating the directory to save the files - dfs.mkdirs(new Path(sarcsReportPathArray + "/" + issn)); - dfs.mkdirs(new Path(sarcsReportPathNonArray + "/" + issn)); - while (start.before(end)) { - // String reportUrl = - // "http://irus.mimas.ac.uk/api/sushilite/v1_7/GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate=" - // + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()) + - // "&RepositoryIdentifier=opendoar%3A" + opendoar + - // "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback="; String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate=" + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()); - // System.out.println(reportUrl); start.add(Calendar.MONTH, 1); + logger.info("(getARReport) Getting report: " + reportUrl); String text = getJson(reportUrl); if (text == null) { continue; } - /* - * PrintWriter wr = new PrintWriter(new FileWriter("logs/" + simpleDateFormat.format(start.getTime()) + - * ".json")); wr.print(text); wr.close(); - */ - JSONParser parser = new JSONParser(); JSONObject jsonObject = null; try { @@ -399,22 +421,21 @@ public class SarcStats { } // Creating the file in the filesystem for the ItemIdentifier as array object - String filePathArray = sarcsReportPathArray + "/" + issn + "/" + "SarcsARReport" + + String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_" + simpleDateFormat.format(start.getTime()) + ".json"; - System.out.println("Storing to file: " + filePathArray); + logger.info("Storing to file: " + filePathArray); FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true); // Creating the file in the filesystem for the ItemIdentifier as array object - String filePathNonArray = sarcsReportPathNonArray + "/" + issn + "/" + "SarcsARReport" + + String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_" + simpleDateFormat.format(start.getTime()) + ".json"; - System.out.println("Storing to file: " + filePathNonArray); + logger.info("Storing to file: " + filePathNonArray); FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true); for (Object aJsonArray : jsonArray) { JSONObject jsonObjectRow = (JSONObject) aJsonArray; renameKeysRecursively(":", jsonObjectRow); - System.out.println("oooo====> " + jsonObjectRow.toJSONString()); if (jsonObjectRow.get("ItemIdentifier") instanceof JSONObject) { finNonArray.write(jsonObjectRow.toJSONString().getBytes()); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java index 24f292df1..2c40a84d1 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java @@ -148,8 +148,6 @@ public class UsageStatsExporter { logger.info("Irus done"); } - System.exit(0); - SarcStats sarcStats = new SarcStats(); if (ExecuteWorkflow.sarcCreateTablesEmptyDirs) { sarcStats.reCreateLogDirs(); @@ -158,6 +156,7 @@ public class UsageStatsExporter { sarcStats.getAndProcessSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray); } if (ExecuteWorkflow.sarcProcessStats) { + sarcStats.processSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray); sarcStats.finalizeSarcStats(); } logger.info("Sarc done"); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json index cb31744cb..20f73caf1 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json +++ b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json @@ -193,8 +193,8 @@ }, { "paramName": "inod", - "paramLongName": "sarcNumberOfOpendoarsToDownload", - "paramDescription": "Limit the number of the downloaded Opendoars (Sarc) to the first sarcNumberOfOpendoarsToDownload", + "paramLongName": "sarcNumberOfIssnToDownload", + "paramDescription": "Limit the number of the downloaded ISSN (Sarc) to the first sarcNumberOfIssnToDownload", "paramRequired": true } ] diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/oozie_app/workflow.xml index a900d8f49..231f96892 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-usage-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestats/oozie_app/workflow.xml @@ -74,7 +74,7 @@ --sarcCreateTablesEmptyDirs${sarcCreateTablesEmptyDirs} --sarcDownloadReports${sarcDownloadReports} --sarcProcessStats${sarcProcessStats} - --sarcNumberOfOpendoarsToDownload${sarcNumberOfOpendoarsToDownload} + --sarcNumberOfIssnToDownload${sarcNumberOfIssnToDownload}