diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
index 774dcf0b7..81e34b3e7 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
@@ -112,13 +112,13 @@ public class ExecuteWorkflow {
downloadPiwikLogs = true;
else
downloadPiwikLogs = false;
-/*
+
if (parser.get("processPiwikLogs").toLowerCase().equals("true"))
processPiwikLogs = true;
else
processPiwikLogs = false;
-*/
- String startingLogPeriodStr = parser.get("startingLogPeriod");
+
+ String startingLogPeriodStr = parser.get("startingLogPeriod");
Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr);
startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate);
@@ -138,12 +138,12 @@ public class ExecuteWorkflow {
downloadLaReferenciaLogs = true;
else
downloadLaReferenciaLogs = false;
-/*
+
if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true"))
processLaReferenciaLogs = true;
else
processLaReferenciaLogs = false;
-*/
+
if (parser.get("irusCreateTablesEmptyDirs").toLowerCase().equals("true"))
irusCreateTablesEmptyDirs = true;
else
@@ -153,13 +153,13 @@ public class ExecuteWorkflow {
irusDownloadReports = true;
else
irusDownloadReports = false;
-/*
+
if (parser.get("irusProcessStats").toLowerCase().equals("true"))
irusProcessStats = true;
else
irusProcessStats = false;
irusNumberOfOpendoarsToDownload = Integer.parseInt(parser.get("irusNumberOfOpendoarsToDownload"));
-*/
+
if (parser.get("sarcCreateTablesEmptyDirs").toLowerCase().equals("true"))
sarcCreateTablesEmptyDirs = true;
else
@@ -169,13 +169,13 @@ public class ExecuteWorkflow {
sarcDownloadReports = true;
else
sarcDownloadReports = false;
-/*
+
if (parser.get("sarcProcessStats").toLowerCase().equals("true"))
sarcProcessStats = true;
else
sarcProcessStats = false;
sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload"));
-*/
+
/*
if (parser.get("finalizeStats").toLowerCase().equals("true"))
finalizeStats = true;
@@ -184,8 +184,8 @@ public class ExecuteWorkflow {
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
finalTablesVisibleToImpala = true;
else
-*/ finalTablesVisibleToImpala = false;
-
+ finalTablesVisibleToImpala = false;
+*/
numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/IrusStats.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/IrusStats.java
index 090f76ff5..bb8d8565e 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/IrusStats.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/IrusStats.java
@@ -174,7 +174,7 @@ public class IrusStats {
+ "WHERE `ItemIdent`.`Type`= 'OAI'";
stmt.executeUpdate(insertSushilogtmp);
logger.info("Inserted to irus_sushilogtmp table");
-
+/*
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats "
@@ -209,7 +209,7 @@ public class IrusStats {
+ "`count` int)";
stmt.executeUpdate(createSushilog);
logger.info("Created sushilog table");
-
+*/
logger.info("Inserting to sushilog table");
String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM "
+ ConnectDB.getUsageStatsDBSchema()
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/LaReferenciaStats.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/LaReferenciaStats.java
index ef7636099..c20781767 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/LaReferenciaStats.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/LaReferenciaStats.java
@@ -116,13 +116,16 @@ public class LaReferenciaStats {
removeDoubleClicks();
logger.info("LaReferencia removed double clicks");
+/********
logger.info("LaReferencia creating viewsStats");
viewsStats();
logger.info("LaReferencia created viewsStats");
logger.info("LaReferencia creating downloadsStats");
downloadsStats();
logger.info("LaReferencia created downloadsStats");
- logger.info("LaReferencia updating Production Tables");
+
+************/
+ logger.info("LaReferencia updating Production Tables");
updateProdTables();
logger.info("LaReferencia updated Production Tables");
@@ -343,7 +346,7 @@ public class LaReferenciaStats {
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp";
stmt.executeUpdate(sql);
-
+/*****
logger.info("Updating views_stats");
sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp";
@@ -372,6 +375,11 @@ public class LaReferenciaStats {
logger.info("Inserted data to usage_stats from lareferencia");
// sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
// stmt.executeUpdate(sql);
+****/
+ logger.info("Dropping lareferencialogtmp");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp";
+ logger.info("Dropped lareferencialogtmp");
+ stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
index 681105de4..5cc9ec563 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
@@ -196,6 +196,7 @@ public class PiwikDownloadLogs {
//while (rs.next())
//piwikIdToVisit.add(rs.getInt(1));
piwikIdToVisit.add(13);
+ piwikIdToVisit.add(109);
logger.info("Found the following piwikIds for download: " + piwikIdToVisit);
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
index e0225d49a..4903d9599 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
@@ -60,7 +60,7 @@ public class PiwikStatsDB {
this.createTables();
// The piwiklog table is not needed since it is built
// on top of JSON files
- this.createTmpTables();
+ ////////////this.createTmpTables();
}
public ArrayList getRobotsList() {
@@ -141,7 +141,7 @@ public class PiwikStatsDB {
}
}
- private void createTmpTables() throws Exception {
+/***** public void createTmpTables() throws Exception {
try {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS "
@@ -181,7 +181,7 @@ public class PiwikStatsDB {
// System.exit(0);
}
}
-
+******/
public void processLogs() throws Exception {
try {
ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
@@ -203,9 +203,10 @@ public class PiwikStatsDB {
processPortalLog();
logger.info("Portal logs process done");
- logger.info("Processing portal usagestats");
+ logger.info("Processing portal usagestats");
portalStats();
logger.info("Portal usagestats process done");
+/*****
logger.info("ViewsStats processing starts");
viewsStats();
@@ -214,11 +215,12 @@ public class PiwikStatsDB {
logger.info("DownloadsStats processing starts");
downloadsStats();
logger.info("DownloadsStats processing starts");
-
+*****/
logger.info("Updating Production Tables");
updateProdTables();
logger.info("Updated Production Tables");
+
} catch (Exception e) {
logger.error("Failed to process logs: " + e);
throw new Exception("Failed to process logs: " + e.toString(), e);
@@ -338,369 +340,6 @@ public class PiwikStatsDB {
stmt.close();
}
- public void viewsStats() throws Exception {
- Statement stmt = ConnectDB.getHiveConnection().createStatement();
- ConnectDB.getHiveConnection().setAutoCommit(false);
-
- logger.info("Dropping result_views_monthly_tmp table");
- String drop_result_views_monthly_tmp = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".result_views_monthly_tmp";
- stmt.executeUpdate(drop_result_views_monthly_tmp);
- logger.info("Dropped result_views_monthly_tmp table");
-
- logger.info("Creating result_views_monthly_tmp table");
- String create_result_views_monthly_tmp = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
- + ".result_views_monthly_tmp " +
- "AS SELECT entity_id AS id, " +
- "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) " +
- "AS openaire_referrer, " +
- "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
- "FROM " + ConnectDB.getUsageStatsDBSchema()
- + ".piwiklogtmp where action='action' and (source_item_type='oaItem' or " +
- "source_item_type='repItem') " +
- "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
- "source ORDER BY source, entity_id";
- stmt.executeUpdate(create_result_views_monthly_tmp);
- logger.info("Created result_views_monthly_tmp table");
-
- logger.info("Dropping views_stats_tmp table");
- String drop_views_stats_tmp = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".views_stats_tmp";
- stmt.executeUpdate(drop_views_stats_tmp);
- logger.info("Dropped views_stats_tmp table");
-
- logger.info("Creating views_stats_tmp table");
- String create_views_stats_tmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
- + ".views_stats_tmp " +
- "AS SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " +
- "max(views) AS count, max(openaire_referrer) AS openaire " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".result_views_monthly_tmp p, " +
- ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
- "WHERE p.source=d.piwik_id AND p.id=ro.oid " +
- "GROUP BY d.id, ro.id, month " +
- "ORDER BY d.id, ro.id, month";
- stmt.executeUpdate(create_views_stats_tmp);
- logger.info("Created views_stats_tmp table");
-/*
- logger.info("Dropping views_stats table");
- String drop_views_stats = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".views_stats";
- stmt.executeUpdate(drop_views_stats);
- logger.info("Dropped views_stats table");
-*/
- logger.info("Creating views_stats table");
- String create_view_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
- "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp STORED AS PARQUET";
- stmt.executeUpdate(create_view_stats);
- logger.info("Created views_stats table");
-
- logger.info("Dropping pageviews_stats_tmp table");
- String drop_pageviews_stats_tmp = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".pageviews_stats_tmp";
- stmt.executeUpdate(drop_pageviews_stats_tmp);
- logger.info("Dropped pageviews_stats_tmp table");
-
- logger.info("Creating pageviews_stats_tmp table");
- String create_pageviews_stats_tmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
- + ".pageviews_stats_tmp AS SELECT " +
- "'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".result_views_monthly_tmp p, " +
- ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
- "WHERE p.source=" + ExecuteWorkflow.portalMatomoID + " AND p.source=d.piwik_id and p.id=ro.id \n" +
- "GROUP BY d.id, ro.id, month " +
- "ORDER BY d.id, ro.id, month";
- stmt.executeUpdate(create_pageviews_stats_tmp);
- logger.info("Created pageviews_stats_tmp table");
-
-/* logger.info("Droping pageviews_stats table");
- String drop_pageviews_stats = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".pageviews_stats";
- stmt.executeUpdate(drop_pageviews_stats);
- logger.info("Dropped pageviews_stats table");
-*/
- logger.info("Creating pageviews_stats table");
- String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
- + ".pageviews_stats " +
- "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats_tmp STORED AS PARQUET";
- stmt.executeUpdate(create_pageviews_stats);
- logger.info("Created pageviews_stats table");
-
- stmt.close();
- ConnectDB.getHiveConnection().close();
- }
-
- private void downloadsStats() throws Exception {
- Statement stmt = ConnectDB.getHiveConnection().createStatement();
- ConnectDB.getHiveConnection().setAutoCommit(false);
-
- logger.info("Dropping result_downloads_monthly_tmp view");
- String drop_result_downloads_monthly_tmp = "DROP VIEW IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".result_downloads_monthly_tmp";
- stmt.executeUpdate(drop_result_downloads_monthly_tmp);
- logger.info("Dropped result_downloads_monthly_tmp view");
-
- logger.info("Creating result_downloads_monthly_tmp view");
- String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".result_downloads_monthly_tmp " +
- "AS SELECT entity_id AS id, COUNT(entity_id) as downloads, " +
- "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " +
- "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp where action='download' " +
- "AND (source_item_type='oaItem' OR source_item_type='repItem') " +
- "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) , source " +
- "ORDER BY source, entity_id, month";
- stmt.executeUpdate(sql);
- logger.info("Created result_downloads_monthly_tmp view");
-
- logger.info("Dropping downloads_stats_tmp table");
- String drop_views_stats = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".downloads_stats_tmp";
- stmt.executeUpdate(drop_views_stats);
- logger.info("Dropped downloads_stats_tmp table");
-
- logger.info("Creating downloads_stats_tmp table");
- sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp AS " +
- "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " +
- "max(downloads) AS count, max(openaire_referrer) AS openaire " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".result_downloads_monthly_tmp p, " +
- ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
- "WHERE p.source=d.piwik_id and p.id=ro.oid " +
- "GROUP BY d.id, ro.id, month " +
- "ORDER BY d.id, ro.id, month";
- stmt.executeUpdate(sql);
- logger.info("Created downloads_stats_tmp table");
-/*
- logger.info("Dropping downloads_stats table");
- String drop_downloads_stats = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".downloads_stats";
- stmt.executeUpdate(drop_downloads_stats);
- logger.info("Dropped downloads_stats table");
-*/
- logger.info("Creating downloads_stats table");
- String create_downloads_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
- + ".downloads_stats " +
- "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp STORED AS PARQUET ";
- stmt.executeUpdate(create_downloads_stats);
- logger.info("Created downloads_stats table");
-
- logger.info("Dropping result_downloads_monthly_tmp view");
- sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp";
- logger.info("Dropped result_downloads_monthly_tmp view");
- stmt.executeUpdate(sql);
-
- stmt.close();
- ConnectDB.getHiveConnection().close();
- }
-
- public void finalizeStats() throws Exception {
- stmt = ConnectDB.getHiveConnection().createStatement();
- ConnectDB.getHiveConnection().setAutoCommit(false);
-
- logger.info("Dropping full_dates table");
- String dropFullDates = "DROP TABLE IF EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".full_dates";
- stmt.executeUpdate(dropFullDates);
- logger.info("Dropped full_dates table");
-
- Calendar startCalendar = Calendar.getInstance();
- startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
- Calendar endCalendar = Calendar.getInstance();
- int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
- int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
-
- logger.info("Creating full_dates table");
- String sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".full_dates AS " +
- "SELECT from_unixtime(unix_timestamp(cast(add_months(from_date,i) AS DATE)), 'yyyy/MM') AS txn_date " +
- "FROM (SELECT DATE '2016-01-01' AS from_date) p " +
- "LATERAL VIEW " +
- "posexplode(split(space(" + diffMonth + "),' ')) pe AS i,x";
- stmt.executeUpdate(sql);
- logger.info("Created full_dates table");
-
- logger.info("Creating downloads_stats table");
- String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".downloads_stats " +
- "(`source` string, " +
- "`repository_id` string, " +
- "`result_id` string, " +
- "`date` string, " +
- "`count` bigint, " +
- "`openaire` bigint)";
- stmt.executeUpdate(createDownloadsStats);
- logger.info("Created downloads_stats table");
-
- logger.info("Creating views_stats table");
- String createViewsStats = "CREATE TABLE IF NOT EXISTS " +
- ConnectDB.getUsageStatsDBSchema() +
- ".views_stats " +
- "(`source` string, " +
- "`repository_id` string, " +
- "`result_id` string, " +
- "`date` string, " +
- "`count` bigint, " +
- "`openaire` bigint)";
- stmt.executeUpdate(createViewsStats);
- logger.info("Created views_stats table");
-
- logger.info("Inserting data to usage_stats");
- sql = "INSERT INTO "+ConnectDB.getUsageStatsDBSchema() + ".usage_stats " +
- "SELECT coalesce(ds.source, vs.source) as source, " +
- "coalesce(ds.repository_id, vs.repository_id) as repository_id, " +
- "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " +
- "coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
- "coalesce(ds.openaire, 0) as openaire_downloads, " +
- "coalesce(vs.openaire, 0) as openaire_views " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp AS ds FULL OUTER JOIN " +
- ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp AS vs ON ds.source=vs.source " +
- "AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
- stmt.executeUpdate(sql);
- logger.info("Inserted data to usage_stats");
-
-
- stmt.close();
- ConnectDB.getHiveConnection().close();
- }
-
- // Create repository Views statistics
- private void repositoryViewsStats() throws Exception {
- stmt = ConnectDB.getHiveConnection().createStatement();
- ConnectDB.getHiveConnection().setAutoCommit(false);
-
-// String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
- String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
- stmt.executeUpdate(sql);
-
-// sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
- sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
- stmt.executeUpdate(sql);
-
- Calendar startCalendar = Calendar.getInstance();
- startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
- Calendar endCalendar = Calendar.getInstance();
- int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
- int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
-
- // sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is
- // null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month',
- // ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0,
- // " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source
- // from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order
- // by d.id, d.new_date";
-// sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
- sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, "
- + diffMonth
- + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
- stmt.executeUpdate(sql);
-
- stmt.close();
- ConnectDB.getHiveConnection().commit();
- ConnectDB.getHiveConnection().close();
- }
-
- // Create repository downloads statistics
- private void repositoryDownloadsStats() throws Exception {
- stmt = ConnectDB.getHiveConnection().createStatement();
- ConnectDB.getHiveConnection().setAutoCommit(false);
-
-// String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
- String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
- stmt.executeUpdate(sql);
-
-// sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
- sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
- stmt.executeUpdate(sql);
-
- Calendar startCalendar = Calendar.getInstance();
- startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
- Calendar endCalendar = Calendar.getInstance();
- int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
- int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
-
- // sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when
- // rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month',
- // ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0,
- // " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum,
- // source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and
- // d.source=rdm.source order by d.id, d.new_date";
- // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO
- // repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date +
- // interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1)
- // AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from
- // repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order
- // by d.id, d.new_date";
- sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, "
- + diffMonth
- + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
- stmt.executeUpdate(sql);
-
- sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
- stmt.executeUpdate(sql);
-
- stmt.close();
- ConnectDB.getHiveConnection().commit();
- ConnectDB.getHiveConnection().close();
- }
-
public void processPortalLog() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
@@ -1154,47 +793,17 @@ public class PiwikStatsDB {
String sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog " +
"SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp";
stmt.executeUpdate(sql);
-
- logger.info("Inserting data to views_stats");
- sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
- "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp";
- stmt.executeUpdate(sql);
-
- logger.info("Inserting data to downloads_stats");
- sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
- "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp";
- stmt.executeUpdate(sql);
-
- logger.info("Inserting data to pageviews_stats");
- sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats " +
- "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats_tmp";
- stmt.executeUpdate(sql);
-
- logger.info("Creating usage_stats table");
- String createUsageStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats " +
- "AS SELECT coalesce(ds.source, vs.source) as source, " +
- "coalesce(ds.repository_id, vs.repository_id) as repository_id, " +
- "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " +
- "coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
- "coalesce(ds.openaire, 0) as openaire_downloads, " +
- "coalesce(vs.openaire, 0) as openaire_views " +
- "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats AS ds FULL OUTER JOIN " +
- ConnectDB.getUsageStatsDBSchema() + ".views_stats AS vs ON ds.source=vs.source " +
- "AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
- stmt.executeUpdate(createUsageStats);
- logger.info("Created usage_stats table");
-
- /*
- * logger.info("Dropping table views_stats_tmp"); sql = "DROP TABLE IF EXISTS " +
- * ConnectDB.getUsageStatsDBSchema() + ".views_stats_tmp"; stmt.executeUpdate(sql);
- * logger.info("Dropping table downloads_stats_tmp"); sql = "DROP TABLE IF EXISTS " +
- * ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_tmp"; stmt.executeUpdate(sql);
- * logger.info("Dropping table pageviews_stats_tmp"); sql = "DROP TABLE IF EXISTS " +
- * ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats_tmp"; stmt.executeUpdate(sql);
- * logger.info("Dropping table process_portal_log_tmp"); sql = "DROP TABLE IF EXISTS " +
- * ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp"; stmt.executeUpdate(sql);
- */
+ logger.info("Dropping piwiklogtmp");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped piwiklogtmp");
+
+ logger.info("Dropping process_portal_log_tmp");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".process_portal_log_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped process_portal_log_tmp");
+
stmt.close();
ConnectDB.getHiveConnection().close();
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/SarcStats.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/SarcStats.java
index 54ed286cb..4ca20c52e 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/SarcStats.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/SarcStats.java
@@ -285,7 +285,7 @@ public class SarcStats {
stmtHive = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
-
+/*
logger.info("Creating downloads_stats table_tmp");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_tmp "
@@ -366,7 +366,7 @@ public class SarcStats {
+ "`count` int)";
stmtHive.executeUpdate(createSushilog);
logger.info("Created sushilog table");
-
+*/
// Insert into sushilog
logger.info("Inserting into sushilog");
String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
index c4ee7d63c..bf2187569 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
@@ -56,9 +56,14 @@ public class UsageStatsExporter {
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(ExecuteWorkflow.repoLogPath, ExecuteWorkflow.portalLogPath);
logger.info("Re-creating database and tables");
- if (ExecuteWorkflow.recreateDbAndTables)
+ if (ExecuteWorkflow.recreateDbAndTables){
piwikstatsdb.recreateDBAndTables();
- ;
+ logger.info("DB-Tables-TmpTables are created ");
+ }
+// else {
+// piwikstatsdb.createTmpTables();
+// logger.info("TmpTables are created ");
+// }
logger.info("Initializing the download logs module");
PiwikDownloadLogs piwd = new PiwikDownloadLogs(ExecuteWorkflow.matomoBaseURL, ExecuteWorkflow.matomoAuthToken);
@@ -77,7 +82,7 @@ public class UsageStatsExporter {
ExecuteWorkflow.portalLogPath, ExecuteWorkflow.portalMatomoID);
}
logger.info("Downloaded piwik logs");
-/*
+
// Create DB tables, insert/update statistics
String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json";
piwikstatsdb.setCounterRobotsURL(cRobotsUrl);
@@ -86,7 +91,7 @@ public class UsageStatsExporter {
logger.info("Processing logs");
piwikstatsdb.processLogs();
}
-*/
+
logger.info("Creating LaReferencia tables");
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(ExecuteWorkflow.lareferenciaBaseURL,
ExecuteWorkflow.lareferenciaAuthToken);
@@ -101,7 +106,8 @@ public class UsageStatsExporter {
lrf.GetLaReferenciaRepos(ExecuteWorkflow.lareferenciaLogPath);
logger.info("Downloaded LaReferencia logs");
}
-/*
+
+
LaReferenciaStats lastats = new LaReferenciaStats(ExecuteWorkflow.lareferenciaLogPath);
if (ExecuteWorkflow.processLaReferenciaLogs) {
@@ -109,7 +115,8 @@ public class UsageStatsExporter {
lastats.processLogs();
logger.info("LaReferencia logs done");
}
-*/
+
+
IrusStats irusstats = new IrusStats(ExecuteWorkflow.irusUKBaseURL);
if (ExecuteWorkflow.irusCreateTablesEmptyDirs) {
logger.info("Creating Irus Stats tables");
@@ -124,12 +131,15 @@ public class UsageStatsExporter {
if (ExecuteWorkflow.irusDownloadReports) {
irusstats.getIrusRRReport(ExecuteWorkflow.irusUKReportPath);
}
-/*
+
+
if (ExecuteWorkflow.irusProcessStats) {
irusstats.processIrusStats();
logger.info("Irus done");
}
-*/
+
+
+
SarcStats sarcStats = new SarcStats();
if (ExecuteWorkflow.sarcCreateTablesEmptyDirs) {
sarcStats.reCreateLogDirs();
@@ -137,13 +147,14 @@ public class UsageStatsExporter {
if (ExecuteWorkflow.sarcDownloadReports) {
sarcStats.getAndProcessSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
}
-/*
+
+
if (ExecuteWorkflow.sarcProcessStats) {
sarcStats.processSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
sarcStats.finalizeSarcStats();
}
logger.info("Sarc done");
-*/
+
/*
// finalize usagestats
@@ -160,6 +171,7 @@ public class UsageStatsExporter {
invalidateMetadata();
}
*/
+
logger.info("End");
}
diff --git a/dhp-workflows/dhp-usage-stats-build/pom.xml b/dhp-workflows/dhp-usage-stats-build/pom.xml
new file mode 100644
index 000000000..f400239f5
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/pom.xml
@@ -0,0 +1,79 @@
+
+
+
+
+
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.1.7-SNAPSHOT
+
+ 4.0.0
+ dhp-usage-stats-build
+
+
+ UTF-8
+ UTF-8
+ 0.13.1-cdh5.2.1
+ 2.5.0-cdh5.2.1
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+ 2.2.0
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.4.5
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
+ org.json
+ json
+ 20180130
+ jar
+
+
+ org.apache.hive
+ hive-jdbc
+ ${cdh.hive.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${cdh.hadoop.version}
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ jar
+
+
+ dhp-usage-stats-build
+
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java
new file mode 100644
index 000000000..8f0f8eae7
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java
@@ -0,0 +1,130 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+
+public abstract class ConnectDB {
+
+ public static Connection DB_HIVE_CONNECTION;
+ public static Connection DB_IMPALA_CONNECTION;
+
+ private static String dbHiveUrl;
+ private static String dbImpalaUrl;
+ private static String usageRawDataDBSchema;
+ private static String usageStatsDBSchema;
+ private static String statsDBSchema;
+ private final static Logger log = Logger.getLogger(ConnectDB.class);
+
+ static void init() throws ClassNotFoundException {
+
+ dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
+ dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
+ usageStatsDBSchema = ExecuteWorkflow.usageStatsDBSchema;
+ statsDBSchema = ExecuteWorkflow.statsDBSchema;
+ usageRawDataDBSchema = ExecuteWorkflow.usageRawDataDBSchema;
+
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+ }
+
+ public static Connection getHiveConnection() throws SQLException {
+ if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
+ return DB_HIVE_CONNECTION;
+ } else {
+ DB_HIVE_CONNECTION = connectHive();
+
+ return DB_HIVE_CONNECTION;
+ }
+ }
+
+ public static Connection getImpalaConnection() throws SQLException {
+ if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
+ return DB_IMPALA_CONNECTION;
+ } else {
+ DB_IMPALA_CONNECTION = connectImpala();
+
+ return DB_IMPALA_CONNECTION;
+ }
+ }
+
+ public static String getUsageRawDataDBSchema() {
+ return usageRawDataDBSchema;
+ }
+
+ public static String getUsageStatsDBSchema() {
+ return ConnectDB.usageStatsDBSchema;
+ }
+
+ public static String getStatsDBSchema() {
+ return ConnectDB.statsDBSchema;
+ }
+
+ private static Connection connectHive() throws SQLException {
+ /*
+ * Connection connection = DriverManager.getConnection(dbHiveUrl); Statement stmt =
+ * connection.createStatement(); log.debug("Opened database successfully"); return connection;
+ */
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbHiveUrl);
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(30);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+ return cpds.getConnection();
+
+ }
+
+ private static Connection connectImpala() throws SQLException {
+ /*
+ * Connection connection = DriverManager.getConnection(dbImpalaUrl); Statement stmt =
+ * connection.createStatement(); log.debug("Opened database successfully"); return connection;
+ */
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbImpalaUrl);
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(30);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+
+ return cpds.getConnection();
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java
new file mode 100644
index 000000000..3f958abba
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ExecuteWorkflow.java
@@ -0,0 +1,172 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.BasicConfigurator;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class ExecuteWorkflow {
+
+ static String matomoAuthToken;
+ static String matomoBaseURL;
+ static String repoLogPath;
+ static String portalLogPath;
+ static String portalMatomoID;
+ static String irusUKBaseURL;
+ static String irusUKReportPath;
+ static String sarcsReportPathArray;
+ static String sarcsReportPathNonArray;
+ static String lareferenciaLogPath;
+ static String lareferenciaBaseURL;
+ static String lareferenciaAuthToken;
+ static String dbHiveUrl;
+ static String dbImpalaUrl;
+ static String usageRawDataDBSchema;
+ static String usageStatsDBSchema;
+ static String statsDBSchema;
+ static boolean recreateDbAndTables;
+
+ static boolean piwikEmptyDirs;
+ static boolean downloadPiwikLogs;
+ static boolean processPiwikLogs;
+
+ static Calendar startingLogPeriod;
+ static Calendar endingLogPeriod;
+ static int numberOfPiwikIdsToDownload;
+ static int numberOfSiteIdsToDownload;
+
+ static boolean laReferenciaEmptyDirs;
+ static boolean downloadLaReferenciaLogs;
+ static boolean processLaReferenciaLogs;
+
+ static boolean irusCreateTablesEmptyDirs;
+ static boolean irusDownloadReports;
+ static boolean irusProcessStats;
+ static int irusNumberOfOpendoarsToDownload;
+
+ static boolean sarcCreateTablesEmptyDirs;
+ static boolean sarcDownloadReports;
+ static boolean sarcProcessStats;
+ static int sarcNumberOfIssnToDownload;
+
+ static boolean finalizeStats;
+ static boolean finalTablesVisibleToImpala;
+
+ static int numberOfDownloadThreads;
+
+ private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class);
+
+ public static void main(String args[]) throws Exception {
+
+ // Sending the logs to the console
+ BasicConfigurator.configure();
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ UsageStatsExporter.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json")));
+ parser.parseArgument(args);
+
+ // Setting up the initial parameters
+ matomoAuthToken = parser.get("matomoAuthToken");
+ matomoBaseURL = parser.get("matomoBaseURL");
+ repoLogPath = parser.get("repoLogPath");
+ portalLogPath = parser.get("portalLogPath");
+ portalMatomoID = parser.get("portalMatomoID");
+ irusUKBaseURL = parser.get("irusUKBaseURL");
+ irusUKReportPath = parser.get("irusUKReportPath");
+ sarcsReportPathArray = parser.get("sarcsReportPathArray");
+ sarcsReportPathNonArray = parser.get("sarcsReportPathNonArray");
+ lareferenciaLogPath = parser.get("lareferenciaLogPath");
+ lareferenciaBaseURL = parser.get("lareferenciaBaseURL");
+ lareferenciaAuthToken = parser.get("lareferenciaAuthToken");
+
+ dbHiveUrl = parser.get("dbHiveUrl");
+ dbImpalaUrl = parser.get("dbImpalaUrl");
+ usageRawDataDBSchema = parser.get("usageRawDataDBSchema");
+ usageStatsDBSchema = parser.get("usageStatsDBSchema");
+ statsDBSchema = parser.get("statsDBSchema");
+
+ if (parser.get("processPiwikLogs").toLowerCase().equals("true")) {
+ processPiwikLogs = true;
+ } else {
+ processPiwikLogs = false;
+ }
+
+ String startingLogPeriodStr = parser.get("startingLogPeriod");
+ Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr);
+ startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate);
+
+ String endingLogPeriodStr = parser.get("endingLogPeriod");
+ Date endingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(endingLogPeriodStr);
+ endingLogPeriod = startingLogPeriodStr(endingLogPeriodDate);
+
+ numberOfPiwikIdsToDownload = Integer.parseInt(parser.get("numberOfPiwikIdsToDownload"));
+ numberOfSiteIdsToDownload = Integer.parseInt(parser.get("numberOfSiteIdsToDownload"));
+
+ if (parser.get("recreateDbAndTables").toLowerCase().equals("true")) {
+ recreateDbAndTables = true;
+ } else {
+ recreateDbAndTables = false;
+ }
+
+ if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true")) {
+ processLaReferenciaLogs = true;
+ } else {
+ processLaReferenciaLogs = false;
+ }
+
+ if (parser.get("irusProcessStats").toLowerCase().equals("true")) {
+ irusProcessStats = true;
+ } else {
+ irusProcessStats = false;
+ }
+
+ irusNumberOfOpendoarsToDownload = Integer.parseInt(parser.get("irusNumberOfOpendoarsToDownload"));
+
+ if (parser.get("sarcProcessStats").toLowerCase().equals("true")) {
+ sarcProcessStats = true;
+ } else {
+ sarcProcessStats = false;
+ }
+ sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload"));
+
+ if (parser.get("finalizeStats").toLowerCase().equals("true")) {
+ finalizeStats = true;
+ } else {
+ finalizeStats = false;
+ }
+ if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true")) {
+ finalTablesVisibleToImpala = true;
+ } else {
+ numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
+ }
+
+ UsageStatsExporter usagestatsExport = new UsageStatsExporter();
+ usagestatsExport.export();
+ }
+
+ private static Calendar startingLogPeriodStr(Date date) {
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ return calendar;
+
+ }
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java
new file mode 100644
index 000000000..4f34adc04
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/IrusStats.java
@@ -0,0 +1,71 @@
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.io.*;
+import java.net.URL;
+import java.net.URLConnection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class IrusStats {
+
+ private String irusUKURL;
+
+ private static final Logger logger = LoggerFactory.getLogger(IrusStats.class);
+
+ public IrusStats() throws Exception {
+ }
+
+
+ public void processIrusStats() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+
+ logger.info("Creating irus_downloads_stats_tmp table");
+ String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".irus_downloads_stats_tmp "
+ + "(`source` string, "
+ + "`repository_id` string, "
+ + "`result_id` string, "
+ + "`date` string, "
+ + "`count` bigint, "
+ + "`openaire` bigint)";
+ stmt.executeUpdate(createDownloadsStats);
+ logger.info("Created irus_downloads_stats_tmp table");
+
+ logger.info("Inserting into irus_downloads_stats_tmp");
+ String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_downloads_stats_tmp "
+ + "SELECT s.source, d.id AS repository_id, "
+ + "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' "
+ + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, "
+ + ConnectDB.getStatsDBSchema() + ".datasource_oids d, "
+ + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ + "WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'";
+ stmt.executeUpdate(insertDStats);
+ logger.info("Inserted into irus_downloads_stats_tmp");
+
+ stmt.close();
+ //ConnectDB.getHiveConnection().close();
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java
new file mode 100644
index 000000000..ea3ac5948
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/LaReferenciaStats.java
@@ -0,0 +1,149 @@
+
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.io.*;
+import java.net.URLDecoder;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class LaReferenciaStats {
+
+ private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class);
+
+ private String logRepoPath;
+
+ private Statement stmt = null;
+
+ private String CounterRobotsURL;
+ private ArrayList robotsList;
+
+ public LaReferenciaStats() throws Exception {
+ }
+
+
+
+ public void processLogs() throws Exception {
+ try {
+ logger.info("LaReferencia creating viewsStats");
+ viewsStats();
+ logger.info("LaReferencia created viewsStats");
+ logger.info("LaReferencia creating downloadsStats");
+ downloadsStats();
+ logger.info("LaReferencia created downloadsStats");
+
+// logger.info("LaReferencia updating Production Tables");
+// updateProdTables();
+// logger.info("LaReferencia updated Production Tables");
+
+ } catch (Exception e) {
+ logger.error("Failed to process logs: " + e);
+ throw new Exception("Failed to process logs: " + e.toString(), e);
+ }
+ }
+
+
+ public void viewsStats() throws Exception {
+
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Creating la_result_views_monthly_tmp view");
+ String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
+ +
+ "SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
+ "THEN 1 ELSE 0 END) AS openaire_referrer, " +
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='action' and " +
+ "(source_item_type='oaItem' or source_item_type='repItem') " +
+ "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
+ "source ORDER BY source, entity_id";
+ stmt.executeUpdate(sql);
+ logger.info("Created la_result_views_monthly_tmp view");
+
+ logger.info("Dropping la_views_stats_tmp table");
+ sql = "DROP TABLE IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".la_views_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped la_views_stats_tmp table");
+
+ logger.info("Creating la_views_stats_tmp table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp " +
+ "AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
+ "max(views) AS count, max(openaire_referrer) AS openaire " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp p, " +
+ ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
+ "WHERE p.source=d.oid AND p.id=ro.oid " +
+ "GROUP BY d.id, ro.id, month " +
+ "ORDER BY d.id, ro.id, month";
+ stmt.executeUpdate(sql);
+ logger.info("Created la_views_stats_tmp table");
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+ }
+
+ private void downloadsStats() throws Exception {
+
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Creating la_result_downloads_monthly_tmp view");
+ String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ + ".la_result_downloads_monthly_tmp AS " +
+ "SELECT entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
+ "THEN 1 ELSE 0 END) AS openaire_referrer, " +
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='download' and " +
+ "(source_item_type='oaItem' or source_item_type='repItem') " +
+ "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
+ "source ORDER BY source, entity_id";
+ stmt.executeUpdate(sql);
+ logger.info("Created la_result_downloads_monthly_tmp view");
+
+ logger.info("Dropping la_downloads_stats_tmp table");
+ sql = "DROP TABLE IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".la_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped la_downloads_stats_tmp table");
+
+ logger.info("Creating la_downloads_stats_tmp table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp " +
+ "AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
+ "max(downloads) AS count, max(openaire_referrer) AS openaire " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_downloads_monthly_tmp p, " +
+ ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
+ "WHERE p.source=d.oid AND p.id=ro.oid " +
+ "GROUP BY d.id, ro.id, month " +
+ "ORDER BY d.id, ro.id, month";
+ stmt.executeUpdate(sql);
+ logger.info("Created la_downloads_stats_tmp table");
+
+ stmt.close();
+ //ConnectDB.getHiveConnection().close();
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
new file mode 100644
index 000000000..a165c6eab
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
@@ -0,0 +1,345 @@
+
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.io.*;
+import java.net.URLDecoder;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Timestamp;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class PiwikStatsDB {
+
+ private String logPath;
+
+ private Statement stmt = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class);
+
+
+ public PiwikStatsDB() throws Exception {
+
+ }
+
+
+ public void recreateDBAndTables() throws Exception {
+ this.createDatabase();
+ // The piwiklog table is not needed since it is built
+ // on top of JSON files
+ ////////////this.createTmpTables();
+ }
+
+ private void createDatabase() throws Exception {
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema());
+ String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE";
+ stmt.executeUpdate(dropDatabase);
+ } catch (Exception e) {
+ logger.error("Failed to drop database: " + e);
+ throw new Exception("Failed to drop database: " + e.toString(), e);
+ }
+
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Creating usagestats DB: " + ConnectDB.getUsageStatsDBSchema());
+ String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema();
+ stmt.executeUpdate(createDatabase);
+
+ } catch (Exception e) {
+ logger.error("Failed to create database: " + e);
+ throw new Exception("Failed to create database: " + e.toString(), e);
+ }
+ }
+
+
+ public void processLogs() throws Exception {
+ try {
+
+ logger.info("ViewsStats processing starts at: "+new Timestamp(System.currentTimeMillis()));
+ viewsStats();
+ logger.info("ViewsStats processing ends at: "+new Timestamp(System.currentTimeMillis()));
+
+ logger.info("DownloadsStats processing starts at: "+new Timestamp(System.currentTimeMillis()));
+ downloadsStats();
+ logger.info("DownloadsStats processing ends at: "+new Timestamp(System.currentTimeMillis()));
+
+ } catch (Exception e) {
+ logger.error("Failed to process logs: " + e);
+ throw new Exception("Failed to process logs: " + e.toString(), e);
+ }
+ }
+
+
+
+ public void viewsStats() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Dropping openaire_result_views_monthly_tmp view");
+ String drop_result_views_monthly = "DROP VIEW IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".openaire_piwikresult_views_monthly_tmp";
+ stmt.executeUpdate(drop_result_views_monthly);
+ logger.info("Dropped openaire_result_views_monthly_tmp view");
+
+ logger.info("Creating openaire_result_views_monthly_tmp view");
+ String create_result_views_monthly = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ + ".openaire_result_views_monthly_tmp " +
+ "AS SELECT entity_id AS id, " +
+ "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) " +
+ "AS openaire_referrer, " +
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
+ "FROM " + ConnectDB.getUsageRawDataDBSchema()
+ + ".piwiklog where action='action' and (source_item_type='oaItem' or " +
+ "source_item_type='repItem') " +
+ "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
+ "source ORDER BY source, entity_id";
+ stmt.executeUpdate(create_result_views_monthly);
+ logger.info("Created openaire_result_views_monthly_tmp table");
+
+ logger.info("Dropping openaire_views_stats_tmp table");
+ String drop_views_stats = "DROP TABLE IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".openaire_views_stats_tmp";
+ stmt.executeUpdate(drop_views_stats);
+ logger.info("Dropped openaire_views_stats_tmp table");
+
+ logger.info("Creating openaire_views_stats_tmp table");
+ String create_views_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".openaire_views_stats_tmp " +
+ "AS SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " +
+ "max(views) AS count, max(openaire_referrer) AS openaire " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " +
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
+ "WHERE p.source=d.piwik_id AND p.id=ro.oid " +
+ "GROUP BY d.id, ro.id, month " +
+ "ORDER BY d.id, ro.id, month ";
+ stmt.executeUpdate(create_views_stats);
+ logger.info("Created openaire_views_stats_tmp table");
+
+ logger.info("Creating openaire_pageviews_stats_tmp table");
+ String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".openaire_pageviews_stats_tmp AS SELECT " +
+ "'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " +
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
+ "WHERE p.source=" + ExecuteWorkflow.portalMatomoID + " AND p.source=d.piwik_id and p.id=ro.id \n" +
+ "GROUP BY d.id, ro.id, month " +
+ "ORDER BY d.id, ro.id, month ";
+ stmt.executeUpdate(create_pageviews_stats);
+ logger.info("Created pageviews_stats table");
+
+ stmt.close();
+ //ConnectDB.getHiveConnection().close();
+ }
+
+ private void downloadsStats() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Dropping openaire_result_downloads_monthly_tmp view");
+ String drop_result_downloads_monthly = "DROP VIEW IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".openaire_result_downloads_monthly_tmp";
+ stmt.executeUpdate(drop_result_downloads_monthly);
+ logger.info("Dropped openaire_result_downloads_monthly_tmp view");
+
+ logger.info("Creating openaire_result_downloads_monthly_tmp view");
+ String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp " +
+ "AS SELECT entity_id AS id, COUNT(entity_id) as downloads, " +
+ "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " +
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
+ "FROM " + ConnectDB.getUsageRawDataDBSchema()+ ".piwiklog where action='download' " +
+ "AND (source_item_type='oaItem' OR source_item_type='repItem') " +
+ "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) , source " +
+ "ORDER BY source, entity_id, month";
+ stmt.executeUpdate(sql);
+ logger.info("Created openaire_result_downloads_monthly_tmp view");
+
+ logger.info("Dropping openaire_downloads_stats_tmp table");
+ String drop_views_stats = "DROP TABLE IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".openaire_downloads_stats_tmp";
+ stmt.executeUpdate(drop_views_stats);
+ logger.info("Dropped openaire_downloads_stats_tmp table");
+
+ logger.info("Creating openaire_downloads_stats_tmp table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp AS " +
+ "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, " +
+ "max(downloads) AS count, max(openaire_referrer) AS openaire " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp p, " +
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
+ "WHERE p.source=d.piwik_id and p.id=ro.oid " +
+ "GROUP BY d.id, ro.id, month " +
+ "ORDER BY d.id, ro.id, month ";
+ stmt.executeUpdate(sql);
+ logger.info("Created downloads_stats table");
+
+
+ logger.info("Dropping openaire_result_downloads_monthly_tmp view");
+ sql = "DROP VIEW IF EXISTS "+ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp";
+ logger.info("Dropped openaire_result_downloads_monthly_tmp view ");
+ stmt.executeUpdate(sql);
+
+ stmt.close();
+ //ConnectDB.getHiveConnection().close();
+ }
+
+ public void finalizeStats() throws Exception {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ //Dropping views_stats table
+ logger.info("Dropping views_stats table");
+ String sql = "DROP TABLE IF EXISTS "+ConnectDB.getUsageStatsDBSchema() + ".views_stats";
+ logger.info("Dropped views_stats table ");
+ stmt.executeUpdate(sql);
+
+ //Dropping downloads_stats table
+ logger.info("Dropping downloads_stats table");
+ sql = "DROP TABLE IF EXISTS "+ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
+ logger.info("Dropped downloads_stats table ");
+ stmt.executeUpdate(sql);
+
+ //Dropping page_views_stats table
+ logger.info("Dropping pageviews_stats table");
+ sql = "DROP TABLE IF EXISTS "+ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats";
+ logger.info("Dropped pageviews_stats table ");
+ stmt.executeUpdate(sql);
+
+ //Creating views_stats table
+ logger.info("Creating views_stats table");
+ String createViewsStats = "CREATE TABLE IF NOT EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".views_stats " +
+ "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_views_stats_tmp STORED AS PARQUET";
+ stmt.executeUpdate(createViewsStats);
+ logger.info("Created views_stats table");
+
+ //Inserting OpenAIRE views stats
+ logger.info("Inserting Openaire data to views_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_views_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Openaire views updated to views_stats");
+
+ //Inserting Lareferencia views stats
+ logger.info("Inserting LaReferencia data to views_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("LaReferencia views updated to views_stats");
+
+
+ logger.info("Creating downloads_stats table");
+ String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".downloads_stats " +
+ "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp STORED AS PARQUET";
+ stmt.executeUpdate(createDownloadsStats);
+ logger.info("Created downloads_stats table");
+
+ //Inserting OpenAIRE downloads stats
+ logger.info("Inserting OpenAIRE data to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+
+ //Inserting Lareferencia downloads stats
+ logger.info("Inserting LaReferencia data to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Lareferencia downloads updated to downloads_stats");
+
+ //Inserting IRUS downloads stats
+ logger.info("Inserting IRUS data to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("IRUS downloads updated to downloads_stats");
+
+ //Inserting SARC-OJS downloads stats
+ logger.info("Inserting SARC data to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("SARC-OJS downloads updated to downloads_stats");
+
+
+ logger.info("Creating pageviews_stats table");
+ String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".pageviews_stats " +
+ "LIKE " + ConnectDB.getUsageStatsDBSchema() + ".openaire_pageviews_stats_tmp STORED AS PARQUET";
+ stmt.executeUpdate(create_pageviews_stats);
+ logger.info("Created pageviews_stats table");
+
+ //Inserting OpenAIRE views stats from Portal
+ logger.info("Inserting data to page_views_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats " +
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_pageviews_stats_tmp";
+ stmt.executeUpdate(sql);
+
+ logger.info("Dropping full_dates table");
+ String dropFullDates = "DROP TABLE IF EXISTS " +
+ ConnectDB.getUsageStatsDBSchema() +
+ ".full_dates";
+ stmt.executeUpdate(dropFullDates);
+ logger.info("Dropped full_dates table");
+
+ Calendar startCalendar = Calendar.getInstance();
+ startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
+ Calendar endCalendar = Calendar.getInstance();
+ int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
+ int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
+
+ logger.info("Creating full_dates table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".full_dates AS " +
+ "SELECT from_unixtime(unix_timestamp(cast(add_months(from_date,i) AS DATE)), 'yyyy/MM') AS txn_date " +
+ "FROM (SELECT DATE '2016-01-01' AS from_date) p " +
+ "LATERAL VIEW " +
+ "posexplode(split(space(" + diffMonth + "),' ')) pe AS i,x";
+ stmt.executeUpdate(sql);
+ logger.info("Created full_dates table");
+
+
+ logger.info("Inserting data to usage_stats");
+ sql = "CREATE TABLE IF NOT EXISTS "+ConnectDB.getUsageStatsDBSchema() + ".usage_stats AS " +
+ "SELECT coalesce(ds.source, vs.source) as source, " +
+ "coalesce(ds.repository_id, vs.repository_id) as repository_id, " +
+ "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " +
+ "coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
+ "coalesce(ds.openaire, 0) as openaire_downloads, " +
+ "coalesce(vs.openaire, 0) as openaire_views " +
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats AS ds FULL OUTER JOIN " +
+ ConnectDB.getUsageStatsDBSchema() + ".views_stats AS vs ON ds.source=vs.source " +
+ "AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
+ stmt.executeUpdate(sql);
+ logger.info("Inserted data to usage_stats");
+
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+ }
+
+
+ private Connection getConnection() throws SQLException {
+ return ConnectDB.getHiveConnection();
+ }
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java
new file mode 100644
index 000000000..2d224075f
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/SarcStats.java
@@ -0,0 +1,106 @@
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.io.*;
+// import java.io.BufferedReader;
+// import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.URLConnection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class SarcStats {
+
+ private Statement stmtHive = null;
+ private Statement stmtImpala = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
+
+ public SarcStats() throws Exception {
+// createTables();
+ }
+
+ private void createTables() throws Exception {
+ try {
+
+ stmtHive = ConnectDB.getHiveConnection().createStatement();
+ String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
+ stmtHive.executeUpdate(sqlCreateTableSushiLog);
+
+ // String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
+ // stmt.executeUpdate(sqlCopyPublicSushiLog);
+ String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ + " ON INSERT TO sushilog "
+ + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
+ + "sushilog.rid, sushilog.date "
+ + "FROM sushilog "
+ + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
+ stmtHive.executeUpdate(sqlcreateRuleSushiLog);
+ String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
+ stmtHive.executeUpdate(createSushiIndex);
+
+ stmtHive.close();
+ ConnectDB.getHiveConnection().close();
+ logger.info("Sushi Tables Created");
+ } catch (Exception e) {
+ logger.error("Failed to create tables: " + e);
+ throw new Exception("Failed to create tables: " + e.toString(), e);
+ }
+ }
+
+ public void processSarc() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Creating sarc_downloads_stats_tmp table");
+ String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".sarc_downloads_stats_tmp "
+ + "(`source` string, "
+ + "`repository_id` string, "
+ + "`result_id` string, "
+ + "`date` string, "
+ + "`count` bigint, "
+ + "`openaire` bigint)";
+ stmt.executeUpdate(createDownloadsStats);
+ logger.info("Created sarc_downloads_stats_tmp table");
+
+ logger.info("Inserting into sarc_downloads_stats_tmp");
+ String insertSarcStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_downloads_stats_tmp "
+ + "SELECT s.source, d.id AS repository_id, "
+ + "ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', "
+ + "LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' "
+ + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, "
+ + ConnectDB.getStatsDBSchema() + ".datasource_oids d, "
+ + ConnectDB.getStatsDBSchema() + ".result_pids ro "
+ + "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND d.id like CONCAT('%', 'sarcservicod', '%') "
+ + "AND s.rid=ro.pid AND ro.type='Digital Object Identifier' AND s.metric_type='ft_total' AND s.source='SARC-OJS'";
+ stmt.executeUpdate(insertSarcStats);
+ logger.info("Inserted into sarc_downloads_stats_tmp");
+
+ stmt.close();
+ //ConnectDB.getHiveConnection().close();
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java
new file mode 100644
index 000000000..43abb1681
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java
@@ -0,0 +1,106 @@
+package eu.dnetlib.oa.graph.usagestatsbuild.export;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main class for downloading and processing Usage statistics
+ *
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class UsageStatsExporter {
+
+ public UsageStatsExporter() {
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ public void export() throws Exception {
+
+ logger.info("Initialising DB properties");
+ ConnectDB.init();
+
+// runImpalaQuery();
+ PiwikStatsDB piwikstatsdb = new PiwikStatsDB();
+
+ logger.info("Re-creating database and tables");
+ if (ExecuteWorkflow.recreateDbAndTables) {
+ piwikstatsdb.recreateDBAndTables();
+ logger.info("DB-Tables are created ");
+ }
+// else {
+// piwikstatsdb.createTmpTables();
+// logger.info("TmpTables are created ");
+// }
+ if (ExecuteWorkflow.processPiwikLogs) {
+ logger.info("Processing logs");
+ piwikstatsdb.processLogs();
+ }
+
+ LaReferenciaStats lastats = new LaReferenciaStats();
+
+ if (ExecuteWorkflow.processLaReferenciaLogs) {
+ logger.info("Processing LaReferencia logs");
+ lastats.processLogs();
+ logger.info("LaReferencia logs done");
+ }
+
+ IrusStats irusstats = new IrusStats();
+
+ if (ExecuteWorkflow.irusProcessStats) {
+ logger.info("Processing IRUS");
+ irusstats.processIrusStats();
+ logger.info("Irus done");
+ }
+
+ SarcStats sarcStats = new SarcStats();
+
+ if (ExecuteWorkflow.sarcProcessStats) {
+ sarcStats.processSarc();
+ }
+ logger.info("Sarc done");
+
+ // finalize usagestats
+ if (ExecuteWorkflow.finalizeStats) {
+ piwikstatsdb.finalizeStats();
+ logger.info("Finalized stats");
+ }
+
+ // Make the tables available to Impala
+ if (ExecuteWorkflow.finalTablesVisibleToImpala) {
+ logger.info("Making tables visible to Impala");
+ invalidateMetadata();
+ }
+
+ logger.info("End");
+ }
+
+ private void invalidateMetadata() throws SQLException {
+ Statement stmt = null;
+
+ stmt = ConnectDB.getImpalaConnection().createStatement();
+
+ String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats";
+ stmt.executeUpdate(sql);
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+ }
+}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json
new file mode 100644
index 000000000..3f121288e
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json
@@ -0,0 +1,237 @@
+[
+ {
+ "paramName": "mat",
+ "paramLongName": "matomoAuthToken",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "mbu",
+ "paramLongName": "matomoBaseURL",
+ "paramDescription": "URL of the isLookUp Service",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rlp",
+ "paramLongName": "repoLogPath",
+ "paramDescription": "nameNode of the source cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "plp",
+ "paramLongName": "portalLogPath",
+ "paramDescription": "namoNode of the target cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pmi",
+ "paramLongName": "portalMatomoID",
+ "paramDescription": "namoNode of the target cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "iukbuw",
+ "paramLongName": "irusUKBaseURL",
+ "paramDescription": "working directory",
+ "paramRequired": true
+ },
+ {
+ "paramName": "iukrp",
+ "paramLongName": "irusUKReportPath",
+ "paramDescription": "maximum number of map tasks used in the distcp process",
+ "paramRequired": true
+ },
+ {
+ "paramName": "srpa",
+ "paramLongName": "sarcsReportPathArray",
+ "paramDescription": "memory for distcp action copying actionsets from remote cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "srpna",
+ "paramLongName": "sarcsReportPathNonArray",
+ "paramDescription": "timeout for distcp copying actions from remote cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "llp",
+ "paramLongName": "lareferenciaLogPath",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "lbu",
+ "paramLongName": "lareferenciaBaseURL",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "lat",
+ "paramLongName": "lareferenciaAuthToken",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbhu",
+ "paramLongName": "dbHiveUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbiu",
+ "paramLongName": "dbImpalaUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "urdbs",
+ "paramLongName": "usageRawDataDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "usdbs",
+ "paramLongName": "usageStatsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "sdbs",
+ "paramLongName": "statsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rdbt",
+ "paramLongName": "recreateDbAndTables",
+ "paramDescription": "Re-create database and initial tables?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pwed",
+ "paramLongName": "piwikEmptyDirs",
+ "paramDescription": "Empty piwik directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ppwl",
+ "paramLongName": "processPiwikLogs",
+ "paramDescription": "Process the piwiklogs (create & fill in the needed tables and process the data) based on the downloaded data",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dpwl",
+ "paramLongName": "downloadPiwikLogs",
+ "paramDescription": "download piwik logs?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "slp",
+ "paramLongName": "startingLogPeriod",
+ "paramDescription": "Starting log period",
+ "paramRequired": true
+ },
+ {
+ "paramName": "elp",
+ "paramLongName": "endingLogPeriod",
+ "paramDescription": "Ending log period",
+ "paramRequired": true
+ },
+ {
+ "paramName": "npidd",
+ "paramLongName": "numberOfPiwikIdsToDownload",
+ "paramDescription": "Limit the number of the downloaded piwikids to the first numberOfPiwikIdsToDownload",
+ "paramRequired": true
+ },
+ {
+ "paramName": "nsidd",
+ "paramLongName": "numberOfSiteIdsToDownload",
+ "paramDescription": "Limit the number of the downloaded siteids (La Referencia logs) to the first numberOfSiteIdsToDownload",
+ "paramRequired": true
+ },
+ {
+ "paramName": "lerd",
+ "paramLongName": "laReferenciaEmptyDirs",
+ "paramDescription": "Empty LaReferencia directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "plrl",
+ "paramLongName": "processLaReferenciaLogs",
+ "paramDescription": "Process the La Referencia logs (create & fill in the needed tables and process the data) based on the downloaded data",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dlrl",
+ "paramLongName": "downloadLaReferenciaLogs",
+ "paramDescription": "download La Referencia logs?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "icted",
+ "paramLongName": "irusCreateTablesEmptyDirs",
+ "paramDescription": "Irus section: Create tables and empty JSON directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "idr",
+ "paramLongName": "irusDownloadReports",
+ "paramDescription": "Irus section: Download reports?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ipr",
+ "paramLongName": "irusProcessStats",
+ "paramDescription": "Irus section: Process stats?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "inod",
+ "paramLongName": "irusNumberOfOpendoarsToDownload",
+ "paramDescription": "Limit the number of the downloaded Opendoars (Irus) to the first irusNumberOfOpendoarsToDownload",
+ "paramRequired": true
+ },
+ {
+ "paramName": "icted",
+ "paramLongName": "sarcCreateTablesEmptyDirs",
+ "paramDescription": "Sarc section: Create tables and empty JSON directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "idr",
+ "paramLongName": "sarcDownloadReports",
+ "paramDescription": "Sarc section: Download reports?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ipr",
+ "paramLongName": "sarcProcessStats",
+ "paramDescription": "Sarc section: Process stats?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "inod",
+ "paramLongName": "sarcNumberOfIssnToDownload",
+ "paramDescription": "Limit the number of the downloaded ISSN (Sarc) to the first sarcNumberOfIssnToDownload",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "fs",
+ "paramLongName": "finalizeStats",
+ "paramDescription": "Create the usage_stats table?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ftvi",
+ "paramLongName": "finalTablesVisibleToImpala",
+ "paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala",
+ "paramRequired": true
+ },
+ {
+ "paramName": "nodt",
+ "paramLongName": "numberOfDownloadThreads",
+ "paramDescription": "Number of download threads",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml
new file mode 100644
index 000000000..b5c807378
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/config-default.xml
@@ -0,0 +1,38 @@
+
+
+ jobTracker
+ ${jobTracker}
+
+
+ nameNode
+ ${nameNode}
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1
+
+
+ impalaJdbcUrl
+ jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;
+
+
+ oozie.wf.workflow.notification.url
+ {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status
+
+
+ oozie.use.system.libpath
+ true
+
+
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml
new file mode 100644
index 000000000..37700539b
--- /dev/null
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml
@@ -0,0 +1,91 @@
+
+
+
+ hiveMetastoreUris
+ Hive server metastore URIs
+
+
+ hiveJdbcUrl
+ Hive server jdbc url
+
+
+ impalaJdbcUrl
+ Impala server jdbc url
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ hive.metastore.uris
+ ${hiveMetastoreUris}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ eu.dnetlib.oa.graph.usagestatsbuild.export.ExecuteWorkflow
+ --matomoAuthToken${matomoAuthToken}
+ --matomoBaseURL${matomoBaseURL}
+ --repoLogPath${repoLogPath}
+ --portalLogPath${portalLogPath}
+ --portalMatomoID${portalMatomoID}
+ --irusUKBaseURL${irusUKBaseURL}
+ --irusUKReportPath${irusUKReportPath}
+ --sarcsReportPathArray${sarcsReportPathArray}
+ --sarcsReportPathNonArray${sarcsReportPathNonArray}
+ --lareferenciaLogPath${lareferenciaLogPath}
+ --lareferenciaBaseURL${lareferenciaBaseURL}
+ --lareferenciaAuthToken${lareferenciaAuthToken}
+ --dbHiveUrl${hiveJdbcUrl}
+ --dbImpalaUrl${impalaJdbcUrl}
+ --usageRawDataDBSchema${usageRawDataDBSchema}
+ --usageStatsDBSchema${usageStatsDBSchema}
+ --statsDBSchema${statsDBSchema}
+ --recreateDbAndTables${recreateDbAndTables}
+ --piwikEmptyDirs${piwikEmptyDirs}
+ --downloadPiwikLogs${downloadPiwikLogs}
+ --processPiwikLogs${processPiwikLogs}
+ --startingLogPeriod${startingLogPeriod}
+ --endingLogPeriod${endingLogPeriod}
+ --numberOfPiwikIdsToDownload${numberOfPiwikIdsToDownload}
+ --numberOfSiteIdsToDownload${numberOfSiteIdsToDownload}
+ --laReferenciaEmptyDirs${laReferenciaEmptyDirs}
+ --downloadLaReferenciaLogs${downloadLaReferenciaLogs}
+ --processLaReferenciaLogs${processLaReferenciaLogs}
+ --irusCreateTablesEmptyDirs${irusCreateTablesEmptyDirs}
+ --irusDownloadReports${irusDownloadReports}
+ --irusProcessStats${irusProcessStats}
+ --irusNumberOfOpendoarsToDownload${irusNumberOfOpendoarsToDownload}
+ --sarcCreateTablesEmptyDirs${sarcCreateTablesEmptyDirs}
+ --sarcDownloadReports${sarcDownloadReports}
+ --sarcProcessStats${sarcProcessStats}
+ --sarcNumberOfIssnToDownload${sarcNumberOfIssnToDownload}
+ --finalizeStats${finalizeStats}
+ --finalTablesVisibleToImpala${finalTablesVisibleToImpala}
+ --numberOfDownloadThreads${numberOfDownloadThreads}
+
+
+
+
+
+
+
+