diff --git a/.gitignore b/.gitignore
index 2d7730711..f5d6c2bc0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,5 +23,4 @@
/build
spark-warehouse
/**/job-override.properties
-/**/*.log
-
+/**/*.log
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/nb-configuration.xml b/dhp-workflows/dhp-indicators/nb-configuration.xml
new file mode 100644
index 000000000..a65c4514a
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/nb-configuration.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+ JDK_1.8
+
+
diff --git a/dhp-workflows/dhp-indicators/pom.xml b/dhp-workflows/dhp-indicators/pom.xml
new file mode 100755
index 000000000..72ad153f1
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/pom.xml
@@ -0,0 +1,107 @@
+
+
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.1.7-SNAPSHOT
+ ../
+
+ 4.0.0
+ dhp-indicators
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+ 2.1.15
+
+
+
+ revision
+
+
+
+
+ ${project.basedir}/../.git
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ 1.8
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 0.13.1-cdh5.2.1
+ 2.5.0-cdh5.2.1
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+ 2.2.0
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.4.5
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
+ org.json
+ json
+ 20180130
+ jar
+
+
+ org.apache.hive
+ hive-jdbc
+ ${cdh.hive.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.4
+ jar
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ 1.1.7-SNAPSHOT
+ jar
+
+
+ com.mchange
+ c3p0
+ 0.9.5.2
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ jar
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.26
+ jar
+
+
+ dhp-indicators
+
diff --git a/dhp-workflows/dhp-indicators/runworkflow.sh b/dhp-workflows/dhp-indicators/runworkflow.sh
new file mode 100755
index 000000000..0cad5792d
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/runworkflow.sh
@@ -0,0 +1 @@
+mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/indicators
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml
new file mode 100644
index 000000000..6d255a7f4
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml
@@ -0,0 +1,34 @@
+
+
+ jobTracker
+ ${jobTracker}
+
+
+ nameNode
+ ${nameNode}
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hive_jdbc_url
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+
+ oozie.wf.workflow.notification.url
+ {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql
new file mode 100644
index 000000000..fe9eaec04
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql
@@ -0,0 +1,47 @@
+create table TARGET.indi_pub_green_oa stored as parquet as
+select distinct p.id, coalesce(green_oa, 0) as green_oa
+from SOURCE.publication p
+left outer join (
+select p.id, 1 as green_oa
+from SOURCE.publication p
+join SOURCE.result_instance ri on ri.id = p.id
+join SOURCE.datasource on datasource.id = ri.hostedby
+where SOURCE.datasource.type like '%Repository%'
+and (ri.accessright = 'Open Access'
+or ri.accessright = 'Embargo')) tmp
+on p.id= tmp.id;
+
+create table TARGET.indi_pub_grey_lit stored as parquet as
+select distinct p.id, coalesce(grey_lit, 0) as grey_lit
+from SOURCE.publication p
+left outer join (
+select p.id, 1 as grey_lit
+from SOURCE.publication p
+join SOURCE.result_classifications rt on rt.id = p.id
+where rt.type not in ('Article','Part of book or chapter of book','Book','Doctoral thesis','Master thesis','Data Paper', 'Thesis', 'Bachelor thesis', 'Conference object') and
+not exists (select 1 from SOURCE.result_classifications rc where type ='Other literature type' and rc.id=p.id)) tmp on p.id=tmp.id;
+
+create table TARGET.indi_pub_doi_from_crossref stored as parquet as
+select distinct p.id, coalesce(doi_from_crossref, 0) as doi_from_crossref
+from SOURCE.publication p
+left outer join
+(select ri.id, 1 as doi_from_crossref from SOURCE.result_instance ri
+join SOURCE.datasource d on d.id = ri.collectedfrom
+where pidtype='Digital Object Identifier' and d.name ='Crossref') tmp
+on tmp.id=p.id;
+
+create table TARGET.indi_pub_gold_oa stored as parquet as
+select distinct p.id, coalesce(gold_oa, 0) as gold_oa
+from SOURCE.publication p
+left outer join (
+select p.id, 1 as gold_oa
+from SOURCE.publication p
+join SOURCE.result_instance ri on ri.id = p.id
+join SOURCE.datasource on datasource.id = ri.hostedby
+where SOURCE.datasource.id like '%doajarticles%') tmp
+on p.id= tmp.id;
+
+compute stats TARGET.indi_pub_green_oa;
+compute stats TARGET.indi_pub_grey_lit;
+compute stats TARGET.indi_pub_doi_from_crossref;
+compute stats TARGET.indi_pub_gold_oa;
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh
new file mode 100644
index 000000000..306609e8a
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh
@@ -0,0 +1,29 @@
+export PYTHON_EGG_CACHE=/home/$(whoami)/.python-eggs
+export link_folder=/tmp/impala-shell-python-egg-cache-$(whoami)
+if ! [ -L $link_folder ]
+then
+ rm -Rf "$link_folder"
+ ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
+fi
+
+export SOURCE=$1
+export TARGET=$2
+export SHADOW=$3
+export SCRIPT_PATH=$4
+
+echo "Getting file from " $4
+hdfs dfs -copyToLocal $4
+
+echo "Creating indicators database"
+impala-shell -q "drop database if exists ${TARGET} cascade"
+impala-shell -q "create database if not exists ${TARGET}"
+impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/\(.*\)/create view ${TARGET}.\1 as select * from ${SOURCE}.\1;/" | impala-shell -f -
+cat createIndicatorsTables.sql | sed s/SOURCE/$1/g | sed s/TARGET/$2/g1 | impala-shell -f -
+echo "Indicators Database created"
+
+
+echo "Updating Shadow indicators DB"
+impala-shell -q "create database if not exists ${SHADOW}"
+impala-shell -d ${SHADOW} -q "show tables" --delimited | sed "s/^/drop view if exists ${SHADOW}./" | sed "s/$/;/" | impala-shell -f -
+impala-shell -d ${TARGET} -q "show tables" --delimited | sed "s/\(.*\)/create view ${SHADOW}.\1 as select * from ${TARGET}.\1;/" | impala-shell -f -
+echo "Indicators Shadow DB ready!"
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml
new file mode 100644
index 000000000..ec917b9a4
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml
@@ -0,0 +1,101 @@
+
+
+
+ stats_db_name
+ the source stats database name
+
+
+ indicators_db_name
+ the target indicators database name
+
+
+ indicators_shadow_db_name
+ the name of the shadow schema
+
+
+
+ hive_metastore_uris
+ hive server metastore URIs
+
+
+ hive_jdbc_url
+ hive server jdbc url
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ hive.metastore.uris
+ ${hive_metastore_uris}
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ indicators.sh
+ ${stats_db_name}
+ ${indicators_db_name}
+ ${indicators_shadow_db_name}
+ ${wf:appPath()}/scripts/createIndicatorsTables.sql
+ scripts/indicators.sh
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml
new file mode 100644
index 000000000..a65c4514a
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+ JDK_1.8
+
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml
new file mode 100755
index 000000000..c623a12f0
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml
@@ -0,0 +1,121 @@
+
+
+
+
+
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.2.4-SNAPSHOT
+ ../
+
+ 4.0.0
+ dhp-usage-datasets-stats-update
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+ 2.1.15
+
+
+
+ revision
+
+
+
+
+ ${project.basedir}/../.git
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ 1.8
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 0.13.1-cdh5.2.1
+ 2.5.0-cdh5.2.1
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+ 2.2.0
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.4.5
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
+ org.json
+ json
+ 20180130
+ jar
+
+
+ org.apache.hive
+ hive-jdbc
+ ${cdh.hive.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.4
+ jar
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ 1.2.4-SNAPSHOT
+ jar
+
+
+ com.mchange
+ c3p0
+ 0.9.5.2
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ jar
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.26
+ jar
+
+
+ dhp-usage-datasets-stats-update
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh
new file mode 100755
index 000000000..9b4325508
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh
@@ -0,0 +1 @@
+mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/datasetsusagestats
\ No newline at end of file
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java
new file mode 100644
index 000000000..de9e44fbf
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java
@@ -0,0 +1,148 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+/**
+ * @author D. Pierrakos
+ */
+/**
+ * @author D. Pierrakos
+ */
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+
+public abstract class ConnectDB {
+
+ public static Connection DB_HIVE_CONNECTION;
+ public static Connection DB_IMPALA_CONNECTION;
+
+ private static String dbHiveUrl;
+ private static String dbImpalaUrl;
+ private static String datasetUsageStatsDBSchema;
+ private static String datasetsUsageStatsPermanentDBSchema;
+ private static String statsDBSchema;
+ private final static Logger logger = Logger.getLogger(ConnectDB.class);
+ private Statement stmt = null;
+
+ static void init() throws ClassNotFoundException {
+
+ dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
+ dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
+ datasetUsageStatsDBSchema = ExecuteWorkflow.datasetUsageStatsDBSchema;
+ datasetsUsageStatsPermanentDBSchema = ExecuteWorkflow.datasetsUsageStatsPermanentDBSchema;
+ statsDBSchema = ExecuteWorkflow.statsDBSchema;
+
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+ }
+
+ public static Connection getHiveConnection() throws SQLException {
+ if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
+ return DB_HIVE_CONNECTION;
+ } else {
+ DB_HIVE_CONNECTION = connectHive();
+
+ return DB_HIVE_CONNECTION;
+ }
+ }
+
+ public static Connection getImpalaConnection() throws SQLException {
+ if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
+ return DB_IMPALA_CONNECTION;
+ } else {
+ DB_IMPALA_CONNECTION = connectImpala();
+
+ return DB_IMPALA_CONNECTION;
+ }
+ }
+
+ public static String getDataSetUsageStatsDBSchema() {
+ String datePattern = "YYYYMMdd";
+ DateFormat df = new SimpleDateFormat(datePattern);
+// Get the today date using Calendar object.
+ Date today = Calendar.getInstance().getTime();
+ String todayAsString = df.format(today);
+
+ return ConnectDB.datasetUsageStatsDBSchema + "_" + todayAsString;
+ }
+
+ public static String getStatsDBSchema() {
+ return ConnectDB.statsDBSchema;
+ }
+
+ public static String getDatasetsUsagestatsPermanentDBSchema() {
+ return ConnectDB.datasetsUsageStatsPermanentDBSchema;
+ }
+
+ private static Connection connectHive() throws SQLException {
+ logger.info("trying to open Hive connection...");
+
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbHiveUrl);
+ cpds.setUser("dimitris.pierrakos");
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(5);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+
+ logger.info("Opened HIVE successfully");
+
+ return cpds.getConnection();
+// Connection connection = DriverManager.getConnection(dbHiveUrl);
+// logger.debug("Opened Hive successfully");
+//
+// return connection;
+
+ }
+
+ private static Connection connectImpala() throws SQLException {
+ logger.info("trying to open Impala connection...");
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbImpalaUrl);
+ cpds.setUser("dimitris.pierrakos");
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(5);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+
+ logger.info("Opened Impala successfully");
+ return cpds.getConnection();
+// Connection connection = DriverManager.getConnection(dbHiveUrl);
+// logger.debug("Opened Impala successfully");
+//
+// return connection;
+
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java
new file mode 100644
index 000000000..baffa39e0
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java
@@ -0,0 +1,144 @@
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.sql.Statement;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos
+ */
+public class DatasetsStatsDB {
+
+ private String logPath;
+ private String logRepoPath;
+ private String logPortalPath;
+
+ private Statement stmt = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(DatasetsStatsDB.class);
+
+ public DatasetsStatsDB(String logRepoPath, String logPortalPath) throws Exception {
+ this.logRepoPath = logRepoPath;
+ this.logPortalPath = logPortalPath;
+
+ }
+
+ public void recreateDBAndTables() throws Exception {
+ this.createDatabase();
+ this.createTables();
+ }
+
+ private void createDatabase() throws Exception {
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Dropping datasets DB: " + ConnectDB.getDataSetUsageStatsDBSchema());
+ String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + " CASCADE";
+ stmt.executeUpdate(dropDatabase);
+ } catch (Exception e) {
+ logger.error("Failed to drop database: " + e);
+ throw new Exception("Failed to drop database: " + e.toString(), e);
+ }
+
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Creating datacite usagestats DB: " + ConnectDB.getDataSetUsageStatsDBSchema());
+ String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema();
+ stmt.executeUpdate(createDatabase);
+
+ } catch (Exception e) {
+ logger.error("Failed to create database: " + e);
+ throw new Exception("Failed to create database: " + e.toString(), e);
+ }
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger
+ .info(
+ "Creating permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema());
+ String createPermanentDatabase = "CREATE DATABASE IF NOT EXISTS "
+ + ConnectDB.getDatasetsUsagestatsPermanentDBSchema();
+ stmt.executeUpdate(createPermanentDatabase);
+ logger
+ .info(
+ "Created permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema());
+
+ } catch (Exception e) {
+ logger.error("Failed to create database: " + e);
+ throw new Exception("Failed to create database: " + e.toString(), e);
+ }
+ }
+
+ private void createTables() throws Exception {
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ // Create Reports table - This table should exist
+ logger.info("Creating Reports Tmp Table");
+ String sqlCreateTableDataciteReports = "CREATE TABLE IF NOT EXISTS "
+ + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports_tmp(reportid STRING, \n"
+ + " name STRING, \n"
+ + " source STRING,\n"
+ + " release STRING,\n"
+ + " createdby STRING,\n"
+ + " report_start_date STRING,\n"
+ + " report_end_date STRING)\n"
+ + " CLUSTERED BY (reportid)\n"
+ + " into 100 buckets stored as orc tblproperties('transactional'='true')";
+
+ stmt.executeUpdate(sqlCreateTableDataciteReports);
+ logger.info("Reports Table Created");
+
+ // Create Datasets Performance Table
+ logger.info("Creating DataSetsPerformance Tmp Table");
+ String sqlCreateTableDataSetsPerformance = "CREATE TABLE IF NOT EXISTS "
+ + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance_tmp(ds_type STRING,\n"
+ + " ds_title STRING,\n"
+ + " yop STRING,\n"
+ + " dataset_type STRING, \n"
+ + " uri STRING,\n"
+ + " platform STRING,\n"
+ + " publisher STRING,\n"
+ + " publisher_id array>,\n"
+ + " dataset_contributors array>,\n"
+ + " period_end STRING,\n"
+ + " period_from STRING,\n"
+ + " access_method STRING,\n"
+ + " metric_type STRING,\n"
+ + " count INT,\n"
+ + " reportid STRING)\n"
+ + " CLUSTERED BY (ds_type)\n"
+ + " into 100 buckets stored as orc tblproperties('transactional'='true')";
+ stmt.executeUpdate(sqlCreateTableDataSetsPerformance);
+ logger.info("DataSetsPerformance Tmp Table Created");
+
+ logger.info("Creating Datacite Reports table");
+ String createDataciteReportsTable = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports LIKE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports_tmp STORED AS PARQUET";
+ stmt.executeUpdate(createDataciteReportsTable);
+ logger.info("Datacite Reports Table created");
+
+ logger.info("Creating Datasets Performance table");
+ String createDatasetPerformanceTable = "CREATE TABLE IF NOT EXISTS "
+ + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance LIKE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance_tmp STORED AS PARQUET";
+ stmt.executeUpdate(createDatasetPerformanceTable);
+ logger.info("DatasetsPerformance Table created");
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+
+ } catch (Exception e) {
+ logger.error("Failed to create tables: " + e);
+ throw new Exception("Failed to create tables: " + e.toString(), e);
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java
new file mode 100644
index 000000000..02754e173
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java
@@ -0,0 +1,100 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * @author D.Pierrakos
+ */
+public class DownloadReportsListFromDatacite {
+
+ private String dataciteBaseURL;
+ private String dataciteReportPath;
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ public DownloadReportsListFromDatacite(String dataciteBaseURL, String dataciteReportPath)
+ throws MalformedURLException, Exception {
+
+ this.dataciteBaseURL = dataciteBaseURL;
+ this.dataciteReportPath = dataciteReportPath;
+ }
+
+ public void downloadReportsList() throws ParseException {
+ StringBuilder responseStrBuilder = new StringBuilder();
+
+ Gson gson = new Gson();
+
+ try {
+ BufferedInputStream in = new BufferedInputStream(new URL(dataciteBaseURL).openStream());
+ BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String inputStr;
+
+ while ((inputStr = streamReader.readLine()) != null) {
+ responseStrBuilder.append(inputStr);
+ }
+ } catch (IOException e) {
+ logger.info(e.getMessage());
+ }
+ JsonObject jsonObject = gson.fromJson(responseStrBuilder.toString(), JsonObject.class);
+ JsonArray dataArray = jsonObject.getAsJsonArray("reports");
+ ArrayList reportsList = new ArrayList();
+ for (JsonElement element : dataArray) {
+ reportsList.add(element.getAsJsonObject().get("id").getAsString());
+ }
+
+ Iterator it = reportsList.iterator();
+ while (it.hasNext()) {
+ String reportId = it.next().toString();
+ String url = dataciteBaseURL + reportId;
+
+ try {
+ BufferedInputStream in = new BufferedInputStream(new URL(url).openStream());
+ BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String inputStr;
+ StringBuilder responseStrBuilder2 = new StringBuilder();
+ while ((inputStr = streamReader.readLine()) != null) {
+ responseStrBuilder2.append(inputStr);
+ }
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataOutputStream fin = fs
+ .create(
+ new Path(dataciteReportPath + "/" + reportId + ".json"),
+ true);
+ byte[] jsonObjectRawBytes = responseStrBuilder2.toString().getBytes();
+ fin.write(jsonObjectRawBytes);
+ fin.writeChar('\n');
+
+ fin.close();
+
+ fin.close();
+ } catch (IOException e) {
+ System.out.println(e);
+ }
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java
new file mode 100644
index 000000000..ffa8b8199
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java
@@ -0,0 +1,71 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.BasicConfigurator;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class ExecuteWorkflow {
+
+ static String dataciteBaseURL;
+ static String dataciteReportPath;
+ static String dbHiveUrl;
+ static String dbImpalaUrl;
+ static String datasetUsageStatsDBSchema;
+ static String datasetsUsageStatsPermanentDBSchema;
+ static String statsDBSchema;
+ static boolean recreateDbAndTables;
+ static boolean datasetsEmptyDirs;
+ static boolean finalTablesVisibleToImpala;
+
+ public static void main(String args[]) throws Exception {
+
+ // Sending the logs to the console
+ BasicConfigurator.configure();
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ UsageStatsExporter.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json")));
+ parser.parseArgument(args);
+
+ // Setting up the initial parameters
+ dataciteBaseURL = parser.get("dataciteBaseURL");
+ dataciteReportPath = parser.get("dataciteReportPath");
+ dbHiveUrl = parser.get("dbHiveUrl");
+ dbImpalaUrl = parser.get("dbImpalaUrl");
+ datasetUsageStatsDBSchema = parser.get("datasetUsageStatsDBSchema");
+ datasetsUsageStatsPermanentDBSchema = parser.get("datasetsUsageStatsPermanentDBSchema");
+ statsDBSchema = parser.get("statsDBSchema");
+
+ if (parser.get("recreateDbAndTables").toLowerCase().equals("true"))
+ recreateDbAndTables = true;
+ else
+ recreateDbAndTables = false;
+
+ if (parser.get("datasetsEmptyDirs").toLowerCase().equals("true"))
+ datasetsEmptyDirs = true;
+ else
+ datasetsEmptyDirs = false;
+
+ if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
+ finalTablesVisibleToImpala = true;
+ else
+ finalTablesVisibleToImpala = false;
+
+ UsageStatsExporter usagestatsExport = new UsageStatsExporter();
+ usagestatsExport.export();
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java
new file mode 100644
index 000000000..e89e2e5a4
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java
@@ -0,0 +1,388 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * @author D.Pierrakos
+ */
+public class ReadReportsListFromDatacite {
+
+ private String dataciteReportPath;
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ public ReadReportsListFromDatacite(String dataciteReportPath) throws MalformedURLException, Exception {
+
+ this.dataciteReportPath = dataciteReportPath;
+ }
+
+ public void readReports() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+ ArrayList jsonFiles = listHdfsDir(dataciteReportPath);
+ for (String jsonFile : jsonFiles) {
+ logger.info("Reading report file " + jsonFile);
+ this.createTmpReportsTable(jsonFile);
+
+ String sqlSelectReportID = "SELECT get_json_object(json, '$.report.id') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelectReportID);
+ ResultSet rstmpReportID = stmt.getResultSet();
+
+ String reportID = null;
+ while (rstmpReportID.next()) {
+ reportID = rstmpReportID.getString(1);
+ }
+
+ logger.info("Checking report with id " + reportID);
+ String sqlCheckIfReportExists = "SELECT source FROM " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports_tmp where reportid=?";
+ PreparedStatement stGetReportID = ConnectDB.getHiveConnection().prepareStatement(sqlCheckIfReportExists);
+ stGetReportID.setString(1, reportID);
+
+ ResultSet rsCheckIfReportExist = stGetReportID.executeQuery();
+
+ if (rsCheckIfReportExist.next()) {
+ logger.info("Report found with ID " + reportID);
+ dropTmpReportsTable();
+ } else {
+ String sqlInsertReport = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + " .datacitereports_tmp "
+ + "SELECT\n"
+ + " get_json_object(json, '$.report.id') AS reportid,\n"
+ + " get_json_object(json, '$.report.report-header.report-name') AS name,\n"
+ + " get_json_object(json, '$.report.report-header.report-id') AS source,\n"
+ + " get_json_object(json, '$.report.report-header.release') AS release,\n"
+ + " get_json_object(json, '$.report.report-header.created-by\') AS createdby,\n"
+ + " get_json_object(json, '$.report.report-header.reporting-period.begin-date') AS fromdate,\n"
+ + " get_json_object(json, '$.report.report-header.reporting-period.end-date') AS todate \n"
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlInsertReport);
+
+ logger.info("Report added");
+
+ logger.info("Adding datasets");
+ String sqlSelecteDatasetsArray = "SELECT get_json_object(json, '$.report.report-datasets') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelecteDatasetsArray);
+ ResultSet rstmpReportDatasets = stmt.getResultSet();
+
+ if (rstmpReportDatasets.next() && rstmpReportDatasets.getString(1).indexOf(',') > 0) {
+ // String[] listDatasets = rstmpReportDatasets.getString(1).split(",");
+ // String listDatasets = rstmpReportDatasets.getString(1);
+ String sqlSelectReport = "SELECT * FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelectReport);
+ ResultSet rstmpReportAll = stmt.getResultSet();
+ if (rstmpReportAll.next()) {
+ String listDatasets = rstmpReportAll.getString(1);
+ logger.info("Adding uncompressed performance for " + reportID);
+ this.readDatasetsReport(listDatasets, reportID);
+ }
+
+ }
+ logger.info("Adding gziped performance for datasets");
+ String sqlSelecteReportSubsets = "SELECT get_json_object(json, '$.report.report-subsets.gzip[0]') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelecteReportSubsets);
+ ResultSet rstmpReportSubsets = stmt.getResultSet();
+ if (rstmpReportSubsets.next()) {
+ String unCompressedReport = uncompressString(rstmpReportSubsets.getString(1));
+ this.readDatasetsReport(unCompressedReport, reportID);
+ }
+ }
+ }
+ this.dropTmpReportsTable();
+ }
+
+ public void readDatasetsReport(String prettyDatasetsReports, String reportId) throws Exception {
+ logger.info("Reading Datasets performance for report " + reportId);
+ logger.info("Write Performance Report To File");
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.readValue(prettyDatasetsReports, JsonNode.class);
+ String datasetsReports = jsonNode.toString();
+ String report = datasetsReports
+ .replace("report-datasets", "report_datasets")
+ .replace("dataset-title", "dataset_title")
+ .replace("dataset-id", "dataset_id")
+ .replace("data-type", "data_type")
+ .replace("publisher-id", "publisher_id")
+ .replace("dataset-contributors", "dataset_contributors")
+ .replace("begin-date", "begin_date")
+ .replace("end-date", "end_date")
+ .replace("access-method", "access_method")
+ .replace("metric-type", "metric_type")
+ .replace("doi:", "");
+ FileSystem fs = FileSystem.get(new Configuration());
+ String tmpPath = dataciteReportPath + "/tmpjson";
+ FSDataOutputStream fin = fs
+ .create(new Path(dataciteReportPath + "/tmpjson/" + reportId + "_Compressed.json"), true);
+ byte[] jsonObjectRawBytes = report.getBytes();
+
+ fin.write(jsonObjectRawBytes);
+
+ fin.writeChar('\n');
+ fin.close();
+
+ logger.info("Reading Performance Report From File...");
+
+ String sqlCreateTempTableForDatasets = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".tmpjsoncompressesed (report_datasets array>,dataset_title:string, data_type:string, "
+ + "uri:string, publisher:string, publisher_id:array>,platform:string, yop:string, "
+ + "dataset_contributors:array>,"
+ + "performance:array, "
+ + "instance:array>>>>>) "
+ + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
+ + "LOCATION '" + tmpPath + "'";
+
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Adding JSON Serde jar");
+ stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
+ logger.info("Added JSON Serde jar");
+
+ logger.info("Inserting Datasets Performance");
+ stmt.execute(sqlCreateTempTableForDatasets);
+
+ String sqlInsertToDatasetsPerformance = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance_tmp SELECT dataset.dataset_id[0].value ds_type, "
+ + " dataset.dataset_title ds_title, "
+ + " dataset.yop yop, "
+ + " dataset.data_type dataset_type, "
+ + " dataset.uri uri, "
+ + " dataset.platform platform, "
+ + " dataset.publisher publisher, "
+ + " dataset.publisher_id publisher_id, "
+ + " dataset.dataset_contributors dataset_contributors, "
+ + " period.end_date period_end, "
+ + " period.begin_date period_from, "
+ + " performance.access_method access_method, "
+ + " performance.metric_type metric_type, "
+ + " performance.count count, "
+ + "'" + reportId + "' report_id "
+ + " FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed "
+ + " LATERAL VIEW explode(report_datasets) exploded_table as dataset LATERAL VIEW explode(dataset.performance[0].instance) exploded_table2 as performance "
+ + " LATERAL VIEW explode (array(dataset.performance[0].period)) exploded_table3 as period";
+
+ stmt.executeUpdate(sqlInsertToDatasetsPerformance);
+
+ logger.info("Datasets Performance Inserted for Report " + reportId);
+
+ stmt.execute("Drop table " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed");
+
+ logger.info("Datasets Report Added");
+
+ }
+
+ private ArrayList listHdfsDir(String dir) throws Exception {
+
+ FileSystem hdfs = FileSystem.get(new Configuration());
+ RemoteIterator Files;
+ ArrayList fileNames = new ArrayList<>();
+
+ try {
+ Path exportPath = new Path(hdfs.getUri() + dir);
+ Files = hdfs.listFiles(exportPath, false);
+ while (Files.hasNext()) {
+ String fileName = Files.next().getPath().toString();
+ fileNames.add(fileName);
+ }
+
+ hdfs.close();
+ } catch (Exception e) {
+ logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + dir));
+ throw new Exception("HDFS file path with exported data does not exist : " + dir, e);
+ }
+
+ return fileNames;
+ }
+
+ private String readHDFSFile(String filename) throws Exception {
+ String result;
+ try {
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ // log.info("reading file : " + filename);
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
+
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line);
+ // sb.append(line);
+ line = br.readLine();
+ }
+ // uncompressedReport = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
+ result = sb.toString().trim();
+ // fs.close();
+ } catch (Exception e) {
+ throw new Exception(e);
+ }
+
+ return result;
+ }
+
+ public static String uncompressString(String zippedBase64Str)
+ throws IOException {
+ String uncompressedReport = null;
+
+ byte[] bytes = Base64.getDecoder().decode(zippedBase64Str);
+ GZIPInputStream zi = null;
+ try {
+ zi = new GZIPInputStream(new ByteArrayInputStream(bytes));
+ uncompressedReport = IOUtils.toString(zi);
+ } finally {
+ IOUtils.closeQuietly(zi);
+ }
+ logger.info("Report Succesfully Uncompressed...");
+ return uncompressedReport;
+ }
+
+ private void createTmpReportsTable(String jsonFile) throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ dropTmpReportsTable();
+ String createTmpTable = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".tmpjsonToTable (json STRING)";
+ stmt.executeUpdate(createTmpTable);
+ logger.info("Temporary Table for Json Report Created");
+
+ String insertJsonReport = "LOAD DATA INPATH '" + jsonFile + "' INTO TABLE "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(insertJsonReport);
+ logger.info("JSON Report File inserted to tmpjsonToTable Table");
+ }
+
+ private void dropTmpReportsTable() throws SQLException {
+ logger.info("Dropping tmpjson Table");
+ String dropTmpTable = "DROP TABLE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ stmt.executeUpdate(dropTmpTable);
+ logger.info("Dropped Table for Json Report Table");
+
+ }
+
+ public void createUsageStatisticsTable() throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Updating Datacite Reports table");
+ String createDataciteReportsTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports "
+ + "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports_tmp";
+ stmt.executeUpdate(createDataciteReportsTable);
+ logger.info("Datacite Reports Table updated");
+
+ logger.info("Updating Datasets Performance table");
+ String createDatasetPerformanceTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance "
+ + "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance_tmp";
+ stmt.executeUpdate(createDatasetPerformanceTable);
+ logger.info("DatasetsPerformance Table updated");
+
+ logger.info("Creating Downloads Stats table");
+ String createDownloadsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacite_downloads STORED AS PARQUET as "
+ + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ + "where metric_type='total-dataset-requests' ";
+ stmt.executeUpdate(createDownloadsTable);
+ logger.info("Downloads Stats table created");
+
+ logger.info("Creating Views Stats table");
+ String createViewsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacite_views STORED AS PARQUET as "
+ + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ + "where metric_type='total-dataset-investigations' ";
+ stmt.executeUpdate(createViewsTable);
+ logger.info("Views Stats table created");
+
+ logger.info("Building Permanent Datasets Usage Stats DB");
+
+ logger.info("Dropping view datacitereports on permanent datacite usagestats DB");
+ String sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped view datacitereports on permanent datacite usagestats DB");
+
+ logger.info("Create view datacitereports on permanent datacite usagestats DB");
+ sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports"
+ + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports";
+ stmt.executeUpdate(sql);
+ logger.info("Created view datacitereports on permanent datasets usagestats DB");
+
+ logger.info("Dropping view datasetsperformance on permanent datacite usagestats DB");
+ sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped view datasetsperformance on permanent datacite usagestats DB");
+
+ logger.info("Create view datasetsperformance on permanent datacite usagestats DB");
+ sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance"
+ + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance";
+ stmt.executeUpdate(sql);
+ logger.info("Created view datasetsperformance on permanent datasets usagestats DB");
+
+ logger.info("Dropping view datacite_views on permanent datacite usagestats DB");
+ sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped view datacite_views on permanent datacite usagestats DB");
+
+ logger.info("Create view datacite_views on permanent datacite usagestats DB");
+ sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views"
+ + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Created view datacite_views on permanent datasets usagestats DB");
+
+ logger.info("Dropping view datacite_downloads on permanent datacite usagestats DB");
+ sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped view datacite_downloads on permanent datacite usagestats DB");
+
+ logger.info("Create view datacite_downloads on permanent datacite usagestats DB");
+ sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads"
+ + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Created view datacite_downloads on permanent datasets usagestats DB");
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+ logger.info("Completed Building Permanent Datasets Usage Stats DB");
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java
new file mode 100755
index 000000000..8d6e24333
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java
@@ -0,0 +1,117 @@
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main class for downloading and processing Usage statistics
+ *
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class UsageStatsExporter {
+
+ private Statement stmt = null;
+
+ public UsageStatsExporter() {
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ private void reCreateLogDirs() throws IllegalArgumentException, IOException {
+ FileSystem dfs = FileSystem.get(new Configuration());
+
+ logger.info("Deleting Log directory: " + ExecuteWorkflow.dataciteReportPath);
+ dfs.delete(new Path(ExecuteWorkflow.dataciteReportPath), true);
+
+ logger.info("Creating Log directory: " + ExecuteWorkflow.dataciteReportPath);
+ dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath));
+
+ logger.info("Creating tmp directory: " + ExecuteWorkflow.dataciteReportPath + " " + "/tmpjson/");
+ dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath + "/tmpjson/"));
+
+ }
+
+ public void export() throws Exception {
+
+ logger.info("Initialising DB properties");
+ ConnectDB.init();
+ ConnectDB.getHiveConnection();
+
+ if (ExecuteWorkflow.recreateDbAndTables) {
+ DatasetsStatsDB datasetsDB = new DatasetsStatsDB("", "");
+ datasetsDB.recreateDBAndTables();
+ }
+ logger.info("Initializing the download logs module");
+ DownloadReportsListFromDatacite downloadReportsListFromDatacite = new DownloadReportsListFromDatacite(
+ ExecuteWorkflow.dataciteBaseURL,
+ ExecuteWorkflow.dataciteReportPath);
+
+ if (ExecuteWorkflow.datasetsEmptyDirs) {
+ logger.info("Downloading Reports List From Datacite");
+ this.reCreateLogDirs();
+ downloadReportsListFromDatacite.downloadReportsList();
+ logger.info("Reports List has been downloaded");
+ }
+
+ ReadReportsListFromDatacite readReportsListFromDatacite = new ReadReportsListFromDatacite(
+ ExecuteWorkflow.dataciteReportPath);
+ logger.info("Store Reports To DB");
+ readReportsListFromDatacite.readReports();
+ logger.info("Reports Stored To DB");
+ readReportsListFromDatacite.createUsageStatisticsTable();
+
+ // Make the tables available to Impala
+ if (ExecuteWorkflow.finalTablesVisibleToImpala) {
+ logger.info("Making tables visible to Impala");
+ invalidateMetadata();
+ }
+
+ logger.info("End");
+ }
+
+ private void invalidateMetadata() throws SQLException {
+ Statement stmt = null;
+
+ stmt = ConnectDB.getImpalaConnection().createStatement();
+
+ String sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports";
+ stmt.executeUpdate(sql);
+
+ sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance";
+ stmt.executeUpdate(sql);
+
+ stmt.close();
+ try {
+ ConnectDB.getHiveConnection().close();
+ } catch (Exception e) {
+ logger.info("Message at the end :" + e.getMessage());
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json
new file mode 100644
index 000000000..f67651627
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json
@@ -0,0 +1,62 @@
+[
+ {
+ "paramName": "dbu",
+ "paramLongName": "dataciteBaseURL",
+ "paramDescription": "URL of Datacite Reports Endpoint",
+ "paramRequired": true
+ },
+ {
+ "paramName": "drp",
+ "paramLongName": "dataciteReportPath",
+ "paramDescription": "Path for Datacite Reports",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbhu",
+ "paramLongName": "dbHiveUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbiu",
+ "paramLongName": "dbImpalaUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dusdbs",
+ "paramLongName": "datasetUsageStatsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "uspdbs",
+ "paramLongName": "datasetsUsageStatsPermanentDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "sdbs",
+ "paramLongName": "statsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rdbt",
+ "paramLongName": "recreateDbAndTables",
+ "paramDescription": "Re-create database and initial tables?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pwed",
+ "paramLongName": "datasetsEmptyDirs",
+ "paramDescription": "Empty piwik directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ftvi",
+ "paramLongName": "finalTablesVisibleToImpala",
+ "paramDescription": "Make the dataset_usage_stats, visible to Impala",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml
new file mode 100644
index 000000000..b5c807378
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml
@@ -0,0 +1,38 @@
+
+
+ jobTracker
+ ${jobTracker}
+
+
+ nameNode
+ ${nameNode}
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1
+
+
+ impalaJdbcUrl
+ jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;
+
+
+ oozie.wf.workflow.notification.url
+ {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status
+
+
+ oozie.use.system.libpath
+ true
+
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml
new file mode 100644
index 000000000..22bf22c01
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml
@@ -0,0 +1,72 @@
+
+
+
+ hiveMetastoreUris
+ Hive server metastore URIs
+
+
+ hiveJdbcUrl
+ Hive server jdbc url
+
+
+ impalaJdbcUrl
+ Impala server jdbc url
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ hive.metastore.uris
+ ${hiveMetastoreUris}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ eu.dnetlib.oa.graph.datasetsusagestats.export.ExecuteWorkflow
+ --dataciteBaseURL
+ ${dataciteBaseURL}
+ --dataciteReportPath
+ ${dataciteReportPath}
+ --dbHiveUrl
+ ${hiveJdbcUrl}
+ --dbImpalaUrl
+ ${impalaJdbcUrl}
+ --datasetUsageStatsDBSchema
+ ${datasetUsageStatsDBSchema}
+ --datasetsUsageStatsPermanentDBSchema
+ ${datasetsUsageStatsPermanentDBSchema}
+ --statsDBSchema
+ ${statsDBSchema}
+ --recreateDbAndTables
+ ${recreateDbAndTables}
+ --datasetsEmptyDirs
+ ${datasetsEmptyDirs}
+ --finalTablesVisibleToImpala
+ ${finalTablesVisibleToImpala}
+
+
+
+
+
+
+
+
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
index e0e0d3687..d2884a4bb 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java
@@ -65,6 +65,8 @@ public class ExecuteWorkflow {
static int numberOfDownloadThreads;
+ static int b2SSHAREID;
+
public static void main(String args[]) throws Exception {
// Sending the logs to the console
@@ -196,6 +198,8 @@ public class ExecuteWorkflow {
numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
+ b2SSHAREID = Integer.parseInt(parser.get("b2shareID"));
+
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
usagestatsExport.export();
// usagestatsExport.createdDBWithTablesOnly();
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
index a84d6743f..76412cd54 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java
@@ -191,7 +191,7 @@ public class PiwikDownloadLogs {
ResultSet rs = statement
.executeQuery(
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
- + ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
+ + ".datasource where piwik_id is not null and piwik_id <> 0 and piwik_id <> 196 order by piwik_id");
// Getting all the piwikids in a list for logging reasons & limitting the list
// to the max number of piwikids
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java
new file mode 100644
index 000000000..9ec6fb72e
--- /dev/null
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java
@@ -0,0 +1,204 @@
+
+package eu.dnetlib.oa.graph.usagerawdata.export;
+
+import java.io.*;
+import java.net.URL;
+import java.net.URLConnection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos
+ */
+public class PiwikDownloadLogs_B2SHARE {
+
+ private final String piwikUrl;
+ private Date startDate;
+ private final String tokenAuth;
+
+ /*
+ * The Piwik's API method
+ */
+ private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
+ private final String format = "&format=json";
+
+ private static final Logger logger = LoggerFactory.getLogger(PiwikDownloadLogs_B2SHARE.class);
+
+ public PiwikDownloadLogs_B2SHARE(String piwikUrl, String tokenAuth) {
+ this.piwikUrl = piwikUrl;
+ this.tokenAuth = tokenAuth;
+
+ }
+
+ private String getPiwikLogUrl() {
+ return "https://" + piwikUrl + "/";
+ }
+
+ private String getJson(String url) throws Exception {
+ try {
+ logger.debug("Connecting to download the JSON: " + url);
+ URL website = new URL(url);
+ URLConnection connection = website.openConnection();
+
+ StringBuilder response;
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
+ response = new StringBuilder();
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ }
+ return response.toString();
+ } catch (Exception e) {
+ logger.error("Failed to get URL: " + url + " Exception: " + e);
+ throw new Exception("Failed to get URL: " + url + " Exception: " + e.toString(), e);
+ }
+ }
+
+ public void GetOpenAIREB2SHARELogs(String repoLogsPath) throws Exception {
+
+ Statement statement = ConnectDB.getHiveConnection().createStatement();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+
+ List piwikIdToVisit = new ArrayList();
+ piwikIdToVisit.add(ExecuteWorkflow.b2SSHAREID);
+ logger.info("B2SHARE piwikId for download: " + piwikIdToVisit);
+
+ if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0
+ && ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) {
+ logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
+ piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
+ }
+
+ logger.info("Downloading for the followins piwikIds: " + piwikIdToVisit);
+
+ // ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads);
+ for (int siteId : piwikIdToVisit) {
+ // Setting the starting period
+ Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
+ logger.info("Starting period for log download: " + sdf.format(start.getTime()));
+
+ // Setting the ending period (last day of the month)
+ // Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
+ Calendar end = Calendar.getInstance();
+ end.add(Calendar.DAY_OF_MONTH, -1);
+ // end.add(Calendar.MONTH, +1);
+// end.add(Calendar.DAY_OF_MONTH, -1);
+ logger.info("Ending period for log download: " + sdf.format(end.getTime()));
+
+ logger.info("Now working on piwikId: " + siteId);
+
+ PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION
+ .prepareStatement(
+ "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
+ + ".piwiklog WHERE source=?");
+ st.setInt(1, siteId);
+ Date dateMax = null;
+ ResultSet rs_date = st.executeQuery();
+ while (rs_date.next()) {
+ logger.info("Found max date: " + rs_date.getString(1) + " for repository " + siteId);
+
+ if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
+ && !rs_date.getString(1).equals("")) {
+ start.setTime(sdf.parse(rs_date.getString(1)));
+ dateMax = sdf.parse(rs_date.getString(1));
+ }
+ }
+ rs_date.close();
+
+ for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
+ // logger.info("Date used " + currDay.toString());
+ // Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
+ // executor.execute(worker);// calling execute method of ExecutorService
+ logger.info("Date used " + currDay.getTime().toString());
+
+ if (dateMax != null && currDay.getTime().compareTo(dateMax) <= 0) {
+ logger.info("Date found in logs " + dateMax + " and not downloanding Matomo logs for " + siteId);
+ } else {
+ GetOpenAIRELogsB2SHAREForDate(currDay, siteId, repoLogsPath);
+ }
+
+ }
+ }
+ // executor.shutdown();
+ // while (!executor.isTerminated()) {
+ // }
+ // System.out.println("Finished all threads");
+ }
+
+ public void GetOpenAIRELogsB2SHAREForDate(Calendar currDay, int siteId, String repoLogsPath) throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+
+ Date date = currDay.getTime();
+ logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
+
+ String period = "&period=day&date=" + sdf.format(date);
+ String outFolder = repoLogsPath;
+
+ String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
+ + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
+ String content = "";
+
+ int i = 0;
+
+ JSONParser parser = new JSONParser();
+ StringBuffer totalContent = new StringBuffer();
+ FileSystem fs = FileSystem.get(new Configuration());
+
+ do {
+ int writtenBytes = 0;
+ String apiUrl = baseApiUrl;
+
+ if (i > 0) {
+ apiUrl += "&filter_offset=" + (i * 1000);
+ }
+
+ content = getJson(apiUrl);
+ if (content.length() == 0 || content.equals("[]")) {
+ break;
+ }
+
+ FSDataOutputStream fin = fs
+ .create(
+ new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ + ".json"),
+ true);
+ JSONArray jsonArray = (JSONArray) parser.parse(content);
+ for (Object aJsonArray : jsonArray) {
+ JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
+ byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes();
+ fin.write(jsonObjectRawBytes);
+ fin.writeChar('\n');
+
+ writtenBytes += jsonObjectRawBytes.length + 1;
+ }
+
+ fin.close();
+ System.out
+ .println(
+ Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes
+ + " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ + ".json");
+
+ i++;
+ } while (true);
+
+ fs.close();
+ }
+}
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
index 9144620b7..00378ca1f 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java
@@ -179,6 +179,10 @@ public class PiwikStatsDB {
createPedocsOldUsageData();
logger.info("Pedocs Tables Created");
+ logger.info("Create Datacite Tables");
+ createDatasetsUsageData();
+ logger.info("Datacite Tables Created");
+
} catch (Exception e) {
logger.error("Failed to process logs: " + e);
throw new Exception("Failed to process logs: " + e.toString(), e);
@@ -281,6 +285,7 @@ public class PiwikStatsDB {
// clean view double clicks
logger.info("Cleaning action double clicks");
+ ConnectDB.getHiveConnection().setAutoCommit(false);
sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp "
+ "WHERE EXISTS (\n"
+ "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n"
@@ -750,6 +755,16 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
logger.info("Dropped sarc_sushilogtmp_json_non_array");
+ logger.info("Dropping piwiklogb2sharetmp");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped piwiklogb2sharetmp");
+
+ logger.info("Dropping piwiklog_b2share_tmp_json");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog_b2share_tmp_json";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped piwiklog_b2share_tmp_json");
+
stmt.close();
ConnectDB.getHiveConnection().close();
@@ -832,4 +847,32 @@ public class PiwikStatsDB {
logger.info("PeDocs Old Downloads Table created");
}
+
+ public void createDatasetsUsageData() throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Dropping datacite_views");
+ String sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped datacite_views");
+
+ logger.info("Dropping datacite_downloads");
+ sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Dropped datacite_downloads");
+
+ logger.info("Creating Datasets Views Table");
+ sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".datacite_views as select * from openaire_prod_datacite_usage_stats.datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Datasets Views Table created");
+
+ logger.info("Creating Datasets Downloads Table");
+ sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".datacite_downloads as select * from openaire_prod_datacite_usage_stats.datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Datasets Downloads Table created");
+
+ }
}
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java
new file mode 100644
index 000000000..886079a23
--- /dev/null
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java
@@ -0,0 +1,304 @@
+
+package eu.dnetlib.oa.graph.usagerawdata.export;
+
+import java.io.*;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class PiwikStatsDB_B2SHARE {
+
+ private String logPath;
+ private String logRepoPath;
+ private String logPortalPath;
+
+ private Statement stmt = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB_B2SHARE.class);
+
+ private String CounterRobotsURL;
+ private ArrayList robotsList;
+
+ public PiwikStatsDB_B2SHARE(String logRepoPath, String logPortalPath) throws Exception {
+ this.logRepoPath = logRepoPath;
+ this.logPortalPath = logPortalPath;
+
+ }
+
+ public ArrayList getRobotsList() {
+ return robotsList;
+ }
+
+ public void setRobotsList(ArrayList robotsList) {
+ this.robotsList = robotsList;
+ }
+
+ public String getCounterRobotsURL() {
+ return CounterRobotsURL;
+ }
+
+ public void setCounterRobotsURL(String CounterRobotsURL) {
+ this.CounterRobotsURL = CounterRobotsURL;
+ }
+
+ public void processB2SHARELogs() throws Exception {
+ try {
+
+ logger.info("Processing B2SHARE logs");
+ processLog();
+ logger.info("B2SHARE logs process done");
+
+ logger.info("Removing double clicks from B2SHARE logs");
+ removeDoubleClicks();
+ logger.info("Removing double clicks from B2SHARE logs done");
+
+ logger.info("Updating Production Tables");
+ updateProdTables();
+ logger.info("Updated Production Tables");
+
+ } catch (Exception e) {
+ logger.error("Failed to process logs: " + e);
+ throw new Exception("Failed to process logs: " + e.toString(), e);
+ }
+ }
+
+ public void processLog() throws Exception {
+
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Adding JSON Serde jar");
+ stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
+ logger.info("Added JSON Serde jar");
+
+ logger.info("Dropping piwiklog_b2share_tmp_json table");
+ String drop_piwiklogtmp_json = "DROP TABLE IF EXISTS "
+ + ConnectDB.getUsageStatsDBSchema()
+ + ".piwiklog_b2share_tmp_json";
+ stmt.executeUpdate(drop_piwiklogtmp_json);
+ logger.info("Dropped piwiklog_b2share_tmp_json table");
+
+ logger.info("Creating piwiklog_b2share_tmp_json");
+ String create_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS "
+ + ConnectDB.getUsageStatsDBSchema()
+ + ".piwiklog_b2share_tmp_json(\n"
+ + " `idSite` STRING,\n"
+ + " `idVisit` STRING,\n"
+ + " `country` STRING,\n"
+ + " `referrerName` STRING,\n"
+ + " `browser` STRING,\n"
+ + " `actionDetails` ARRAY<\n"
+ + " struct<\n"
+ + " type: STRING,\n"
+ + " url: STRING,\n"
+ + " eventAction: STRING,\n"
+ + " eventName: STRING,\n"
+ + " timestamp: String\n"
+ + " >\n"
+ + " >\n"
+ + ")\n"
+ + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
+ + "LOCATION '" + ExecuteWorkflow.repoLogPath + "'\n"
+ + "TBLPROPERTIES (\"transactional\"=\"false\")";
+ stmt.executeUpdate(create_piwiklogtmp_json);
+ logger.info("Created piwiklog_b2share_tmp_json");
+
+ logger.info("Dropping piwiklogtmp table");
+ String drop_piwiklogtmp = "DROP TABLE IF EXISTS "
+ + ConnectDB.getUsageStatsDBSchema()
+ + ".piwiklogtmp";
+ stmt.executeUpdate(drop_piwiklogtmp);
+ logger.info("Dropped piwiklogtmp");
+
+ logger.info("Creating piwiklogb2sharetmp");
+ String create_piwiklogtmp = "CREATE TABLE "
+ + ConnectDB.getUsageStatsDBSchema()
+ + ".piwiklogb2sharetmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, "
+ + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
+ + "clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true')";
+ stmt.executeUpdate(create_piwiklogtmp);
+ logger.info("Created piwiklogb2sharetmp");
+
+ logger.info("Inserting into piwiklogb2sharetmp");
+ String insert_piwiklogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp "
+ + "SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, "
+ + "actiondetail.eventAction as action, actiondetail.url as url, "
+ + "actiondetail.eventName as entity_id, "
+ + "'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, "
+ + "referrerName as referrer_name, browser as agent\n"
+ + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog_b2share_tmp_json\n"
+ + "LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail";
+ stmt.executeUpdate(insert_piwiklogtmp);
+ logger.info("Inserted into piwiklogb2sharetmp");
+
+ stmt.close();
+ }
+
+ public void removeDoubleClicks() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Cleaning download double clicks");
+ // clean download double clicks
+ String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp "
+ + "WHERE EXISTS (\n"
+ + "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n"
+ + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p1, "
+ + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p2\n"
+ + "WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n"
+ + "AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n"
+ + "AND p1.timestamp listHdfsDir(String dir) throws Exception {
+
+ FileSystem hdfs = FileSystem.get(new Configuration());
+ RemoteIterator Files;
+ ArrayList fileNames = new ArrayList<>();
+
+ try {
+ Path exportPath = new Path(hdfs.getUri() + dir);
+ Files = hdfs.listFiles(exportPath, false);
+ while (Files.hasNext()) {
+ String fileName = Files.next().getPath().toString();
+ fileNames.add(fileName);
+ }
+
+ hdfs.close();
+ } catch (Exception e) {
+ logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
+ throw new Exception("HDFS file path with exported data does not exist : " + logPath, e);
+ }
+
+ return fileNames;
+ }
+
+ private String readHDFSFile(String filename) throws Exception {
+ String result;
+ try {
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ // log.info("reading file : " + filename);
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
+
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+
+ while (line != null) {
+ if (!line.equals("[]")) {
+ sb.append(line);
+ }
+ // sb.append(line);
+ line = br.readLine();
+ }
+ result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
+ if (result.equals("")) {
+ result = "[]";
+ }
+
+ // fs.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ throw new Exception(e);
+ }
+
+ return result;
+ }
+
+ private Connection getConnection() throws SQLException {
+ return ConnectDB.getHiveConnection();
+ }
+
+ public void createPedocsOldUsageData() throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Creating PeDocs Old Views Table");
+ String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".pedocsoldviews as select * from default.pedocsviews";
+ stmt.executeUpdate(sql);
+ logger.info("PeDocs Old Views Table created");
+
+ logger.info("Creating PeDocs Old Downloads Table");
+ sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".pedocsolddownloads as select * from default.pedocsdownloads";
+ stmt.executeUpdate(sql);
+ logger.info("PeDocs Old Downloads Table created");
+
+ }
+
+ public void createDatasetsUsageData() throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Creating Datasets Views Table");
+ String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".datacite_views as select * from datasetsusagestats_20210301.datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Datasets Views Table created");
+
+ logger.info("Creating Datasets Downloads Table");
+ sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ + ".datacite_downloads as select * from datasetsusagestats_20210301.datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Datasets Downloads Table created");
+
+ }
+}
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
index 07e15605f..2f10e4d2b 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java
@@ -142,8 +142,20 @@ public class UsageStatsExporter {
sarcStats.updateSarcLogs();
}
logger.info("Sarc done");
- // finalize usagestats
+ PiwikDownloadLogs_B2SHARE b2sharePiwikID = new PiwikDownloadLogs_B2SHARE(ExecuteWorkflow.matomoBaseURL,
+ ExecuteWorkflow.matomoAuthToken);
+ b2sharePiwikID.GetOpenAIREB2SHARELogs(ExecuteWorkflow.repoLogPath);
+ logger.info("B2SHARE done");
+
+ PiwikStatsDB_B2SHARE piwikstatsB2SHAREdb = new PiwikStatsDB_B2SHARE(ExecuteWorkflow.repoLogPath,
+ ExecuteWorkflow.portalLogPath);
+ piwikstatsB2SHAREdb.setCounterRobotsURL(cRobotsUrl);
+
+ logger.info("Processing B2SHARE logs");
+ piwikstatsB2SHAREdb.processB2SHARELogs();
+
+ // finalize usagestats
logger.info("Dropping tmp tables");
if (ExecuteWorkflow.finalizeStats) {
piwikstatsdb.finalizeStats();
@@ -161,6 +173,7 @@ public class UsageStatsExporter {
piwikstatsdb.recreateDBAndTables();
piwikstatsdb.createPedocsOldUsageData();
+
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating LaReferencia tables");
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json
index 1aa5ad6f8..8c733c55b 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json
@@ -215,5 +215,11 @@
"paramLongName": "numberOfDownloadThreads",
"paramDescription": "Number of download threads",
"paramRequired": true
+ },
+ {
+ "paramName": "b2shareID",
+ "paramLongName": "b2shareID",
+ "paramDescription": "B2SHARE Matomo ID",
+ "paramRequired": true
}
]
diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml
index 022a107ab..80e1da478 100644
--- a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
hiveMetastoreUris
@@ -78,6 +78,7 @@
--sarcNumberOfIssnToDownload${sarcNumberOfIssnToDownload}
--finalizeStats${finalizeStats}
--numberOfDownloadThreads${numberOfDownloadThreads}
+ --b2shareID${b2shareID}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java
index e53709f1a..ea07ed732 100644
--- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java
@@ -82,7 +82,7 @@ public abstract class ConnectDB {
Date today = Calendar.getInstance().getTime();
String todayAsString = df.format(today);
- return ConnectDB.usageStatsDBSchema + "_" + todayAsString;
+ return ConnectDB.usageStatsDBSchema + todayAsString;
}
public static String getStatsDBSchema() {
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
index 253dc03b5..7c6f28023 100644
--- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
@@ -35,20 +35,20 @@ public class PiwikStatsDB {
private void createDatabase() throws Exception {
-// try {
-//
-// stmt = ConnectDB.getHiveConnection().createStatement();
-//
-// logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema());
-// String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE";
-// stmt.executeUpdate(dropDatabase);
-// } catch (Exception e) {
-// logger.error("Failed to drop database: " + e);
-// throw new Exception("Failed to drop database: " + e.toString(), e);
-// }
-//
try {
+
stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema());
+ String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE";
+ stmt.executeUpdate(dropDatabase);
+ } catch (Exception e) {
+ logger.error("Failed to drop database: " + e);
+ throw new Exception("Failed to drop database: " + e.toString(), e);
+ }
+
+ try {
+
logger.info("Creating usagestats DB: " + ConnectDB.getUsageStatsDBSchema());
String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema();
stmt.executeUpdate(createDatabase);
@@ -132,7 +132,7 @@ public class PiwikStatsDB {
+ "max(views) AS count, max(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
- + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' "
+ + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(create_views_stats);
@@ -145,7 +145,7 @@ public class PiwikStatsDB {
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=" + ExecuteWorkflow.portalMatomoID
- + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' "
+ + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(create_pageviews_stats);
@@ -194,7 +194,7 @@ public class PiwikStatsDB {
+ "max(downloads) AS count, max(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
- + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' "
+ + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
@@ -337,6 +337,96 @@ public class PiwikStatsDB {
}
+ public void uploadB2SHAREStats() throws Exception {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ // Dropping B2SHARE b2share_result_views_monthly_tmp view
+ logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view");
+ String sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp";
+ logger.info("Dropped b2share_result_views_monthly_tmp view ");
+ stmt.executeUpdate(sql);
+
+ // Dropping B2SHARE b2share_result_views_monthly_tmp view
+ logger.info("Dropping b2SHARE b2share_result_downloads_monthly_tmp view");
+ sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp";
+ logger.info("Dropped b2share_result_downloads_monthly_tmp view ");
+ stmt.executeUpdate(sql);
+
+ // Dropping B2SHARE b2share_views_stats_tmp table
+ logger.info("Dropping B2SHARE b2share_views_stats_tmp table");
+ sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp";
+ logger.info("Dropped b2share_views_stats_tmp table ");
+ stmt.executeUpdate(sql);
+
+ // Dropping B2SHARE b2share_downloads_stats_tmp table
+ logger.info("Dropping B2SHARE b2share_downloads_stats_tmp table");
+ sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp";
+ logger.info("Dropped b2share_downloads_stats_tmp table ");
+ stmt.executeUpdate(sql);
+
+ // Creating B2SHARE b2share_result_views_monthly_tmp view
+ logger.info("Creating B2SHARE b2share_result_views_monthly_tmp view");
+ sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp "
+ + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".piwiklog "
+ + "WHERE action='action' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 "
+ + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id";
+ stmt.executeUpdate(sql);
+ logger.info("Created b2share_result_views_monthly_tmp view ");
+
+ // Creating B2SHARE b2share_views_stats_tmp table
+ logger.info("Creating B2SHARE b2share_views_stats_tmp table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp AS "
+ + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema()
+ + ".b2share_result_views_monthly_tmp p, "
+ + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' "
+ + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id";
+ stmt.executeUpdate(sql);
+ logger.info("Created B2SHARE b2share_views_stats_tmp table");
+
+ // Creating B2SHARE b2share_result_downloads_monthly_tmp view
+ logger.info("Creating B2SHARE b2share_result_downloads_monthly_tmp view");
+ sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp "
+ + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".piwiklog "
+ + "WHERE action='download' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 "
+ + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id";
+ stmt.executeUpdate(sql);
+ logger.info("Created b2share_result_downloads_monthly_tmp view ");
+
+ // Creating B2SHARE b2share_downloads_stats_tmp table
+ logger.info("Creating B2SHARE b2share_downloads_stats_tmp table");
+ sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp AS "
+ + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema()
+ + ".b2share_result_downloads_monthly_tmp p, "
+ + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' "
+ + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id";
+ stmt.executeUpdate(sql);
+ logger.info("Created B2SHARE b2share_downloads_stats_tmp table");
+
+ // Dropping B2SHARE b2share_result_views_monthly_tmp view
+ logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view");
+ sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp";
+ logger.info("Dropped b2share_result_views_monthly_tmp view ");
+ stmt.executeUpdate(sql);
+
+ // Dropping B2SHARE b2share_result_views_monthly_tmp view
+ logger.info("Dropping B2SHARE b2share_result_downloads_monthly_tmp view");
+ sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp";
+ logger.info("Dropped b2share_result_downloads_monthly_tmp view ");
+ stmt.executeUpdate(sql);
+
+ }
+
public void finalizeStats() throws Exception {
stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
@@ -402,6 +492,13 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
logger.info("LaReferencia views updated to views_stats");
+ // Inserting B2SHARE views stats
+ logger.info("Inserting B2SHARE data to views_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats "
+ + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("B2SHARE views updated to views_stats");
+
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getUsageStatsDBSchema()
@@ -425,12 +522,18 @@ public class PiwikStatsDB {
logger.info("Inserted Pedocs data to downloads_stats");
// Inserting TUDELFT downloads stats
- logger.info("Inserting TUDELFT old data to downloads_stats");
+ logger.info("Inserting TUDELFT data to downloads_stats");
sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats "
+ "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_downloads_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Inserted TUDELFT data to downloads_stats");
+ // Inserting B2SHARE downloads stats
+ logger.info("Inserting B2SHARE data to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats "
+ + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp";
+ stmt.executeUpdate(sql);
+ logger.info("Inserted B2SHARE data to downloads_stats");
// Inserting Lareferencia downloads stats
logger.info("Inserting LaReferencia data to downloads_stats");
sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats "
@@ -452,6 +555,20 @@ public class PiwikStatsDB {
stmt.executeUpdate(sql);
logger.info("SARC-OJS downloads updated to downloads_stats");
+ // Inserting Datacite views stats
+ logger.info("Inserting Datacite views to views_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats "
+ + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_views";
+ stmt.executeUpdate(sql);
+ logger.info("Datacite views updated to views_stats");
+
+ // Inserting Datacite downloads stats
+ logger.info("Inserting Datacite downloads to downloads_stats");
+ sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats "
+ + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_downloads";
+ stmt.executeUpdate(sql);
+ logger.info("Datacite downloads updated to downloads_stats");
+
logger.info("Creating pageviews_stats table");
String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".pageviews_stats "
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java
index 47986f52a..0df6c8b2d 100644
--- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java
@@ -51,6 +51,9 @@ public class UsageStatsExporter {
logger.info("Processing TUDELFT Stats");
piwikstatsdb.uploadTUDELFTStats();
logger.info("Processing TUDELFT Stats Done");
+ logger.info("Processing B2SHARE Stats");
+ piwikstatsdb.uploadB2SHAREStats();
+ logger.info("Processing B2SHARE Stats Done");
}
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml
index 71e8a50d6..45a6abf3d 100644
--- a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
hiveMetastoreUris
diff --git a/nbactions.xml b/nbactions.xml
new file mode 100644
index 000000000..4b6f7519d
--- /dev/null
+++ b/nbactions.xml
@@ -0,0 +1,15 @@
+
+
+
+ test
+
+ *
+
+
+ test
+
+
+ true
+
+
+