diff --git a/dhp-workflows/dhp-indicators/pom.xml b/dhp-workflows/dhp-indicators/pom.xml
new file mode 100644
index 000000000..937795791
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/pom.xml
@@ -0,0 +1,107 @@
+
+
+
+
+
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.1.7-SNAPSHOT
+
+ 4.0.0
+ dhp-indicators
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+ 2.1.15
+
+
+
+ revision
+
+
+
+
+ ${project.basedir}/../.git
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ 1.8
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 0.13.1-cdh5.2.1
+ 2.5.0-cdh5.2.1
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+ 2.2.0
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.4.5
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
+ org.json
+ json
+ 20180130
+ jar
+
+
+ org.apache.hive
+ hive-jdbc
+ ${cdh.hive.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${cdh.hadoop.version}
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ jar
+
+
+ dhp-indicators
+
diff --git a/dhp-workflows/dhp-indicators/runworkflow.sh b/dhp-workflows/dhp-indicators/runworkflow.sh
new file mode 100755
index 000000000..0cad5792d
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/runworkflow.sh
@@ -0,0 +1 @@
+mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/indicators
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/java/eu/dnetlib/oa/graph/indicators/export/ExecuteWorkflow.java b/dhp-workflows/dhp-indicators/src/main/java/eu/dnetlib/oa/graph/indicators/export/ExecuteWorkflow.java
new file mode 100644
index 000000000..61e6ef72c
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/java/eu/dnetlib/oa/graph/indicators/export/ExecuteWorkflow.java
@@ -0,0 +1,35 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.indicators.export;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.BasicConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+/**
+ * @author D. Pierrakos
+ */
+public class ExecuteWorkflow {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecuteWorkflow.class);
+
+ public static void main(String args[]) throws Exception {
+
+ // Sending the logs to the console
+ BasicConfigurator.configure();
+
+ logger.info("Workflow Executed");
+ }
+
+}
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml
new file mode 100644
index 000000000..b5c807378
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml
@@ -0,0 +1,38 @@
+
+
+ jobTracker
+ ${jobTracker}
+
+
+ nameNode
+ ${nameNode}
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1
+
+
+ impalaJdbcUrl
+ jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;
+
+
+ oozie.wf.workflow.notification.url
+ {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status
+
+
+ oozie.use.system.libpath
+ true
+
+
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testpython.py b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testpython.py
new file mode 100644
index 000000000..e913df6ae
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testpython.py
@@ -0,0 +1,5 @@
+#! /usr/bin/env python
+import sys
+
+print "this is a Python script"
+print "Python Interpreter Version: " + sys.version
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testscript.sh b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testscript.sh
new file mode 100644
index 000000000..78938c85a
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/python/testscript.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+echo "`date` hi"
\ No newline at end of file
diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml
new file mode 100644
index 000000000..2b8ed7d99
--- /dev/null
+++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml
@@ -0,0 +1,58 @@
+
+
+
+ hiveMetastoreUris
+ Hive server metastore URIs
+
+
+ hiveJdbcUrl
+ Hive server jdbc url
+
+
+ impalaJdbcUrl
+ Impala server jdbc url
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ hive.metastore.uris
+ ${hiveMetastoreUris}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapred.job.queue.name
+ ${queueName}
+
+
+ testpython.py
+ python/testpython.py
+
+
+
+
+
+
+ Python action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml
new file mode 100644
index 000000000..a65c4514a
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+ JDK_1.8
+
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml
new file mode 100755
index 000000000..b39c3ff9b
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml
@@ -0,0 +1,121 @@
+
+
+
+
+
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.1.7-SNAPSHOT
+ ../
+
+ 4.0.0
+ dhp-usage-datasets-stats-update
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+ 2.1.15
+
+
+
+ revision
+
+
+
+
+ ${project.basedir}/../.git
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ 1.8
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 0.13.1-cdh5.2.1
+ 2.5.0-cdh5.2.1
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+ 2.2.0
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.4.5
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1.1
+
+
+ org.json
+ json
+ 20180130
+ jar
+
+
+ org.apache.hive
+ hive-jdbc
+ ${cdh.hive.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.4
+ jar
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ 1.1.7-SNAPSHOT
+ jar
+
+
+ com.mchange
+ c3p0
+ 0.9.5.2
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ jar
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.26
+ jar
+
+
+ dhp-usage-datasets-stats-update
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh
new file mode 100755
index 000000000..9b4325508
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh
@@ -0,0 +1 @@
+mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/datasetsusagestats
\ No newline at end of file
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java
new file mode 100644
index 000000000..cab0bc83f
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java
@@ -0,0 +1,123 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.log4j.Logger;
+
+/**
+ * @author D. Pierrakos
+ */
+/**
+ * @author D. Pierrakos
+ */
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+
+public abstract class ConnectDB {
+
+ public static Connection DB_HIVE_CONNECTION;
+ public static Connection DB_IMPALA_CONNECTION;
+
+ private static String dbHiveUrl;
+ private static String dbImpalaUrl;
+ private static String datasetUsageStatsDBSchema;
+ private static String statsDBSchema;
+ private final static Logger logger = Logger.getLogger(ConnectDB.class);
+ private Statement stmt = null;
+
+ static void init() throws ClassNotFoundException {
+
+ dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
+ dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
+ datasetUsageStatsDBSchema = ExecuteWorkflow.datasetUsageStatsDBSchema;
+ statsDBSchema = ExecuteWorkflow.statsDBSchema;
+
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+ }
+
+ public static Connection getHiveConnection() throws SQLException {
+ if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
+ return DB_HIVE_CONNECTION;
+ } else {
+ DB_HIVE_CONNECTION = connectHive();
+
+ return DB_HIVE_CONNECTION;
+ }
+ }
+
+ public static Connection getImpalaConnection() throws SQLException {
+ if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
+ return DB_IMPALA_CONNECTION;
+ } else {
+ DB_IMPALA_CONNECTION = connectImpala();
+
+ return DB_IMPALA_CONNECTION;
+ }
+ }
+
+ public static String getDataSetUsageStatsDBSchema() {
+ return ConnectDB.datasetUsageStatsDBSchema;
+ }
+
+ public static String getStatsDBSchema() {
+ return ConnectDB.statsDBSchema;
+ }
+
+ private static Connection connectHive() throws SQLException {
+
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbHiveUrl);
+ cpds.setUser("dimitris.pierrakos");
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(5);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+
+ logger.info("Opened database successfully");
+
+ return cpds.getConnection();
+
+ }
+
+ private static Connection connectImpala() throws SQLException {
+
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ cpds.setJdbcUrl(dbImpalaUrl);
+ cpds.setUser("dimitris.pierrakos");
+ cpds.setAcquireIncrement(1);
+ cpds.setMaxPoolSize(100);
+ cpds.setMinPoolSize(1);
+ cpds.setInitialPoolSize(1);
+ cpds.setMaxIdleTime(300);
+ cpds.setMaxConnectionAge(36000);
+
+ cpds.setAcquireRetryAttempts(5);
+ cpds.setAcquireRetryDelay(2000);
+ cpds.setBreakAfterAcquireFailure(false);
+
+ cpds.setCheckoutTimeout(0);
+ cpds.setPreferredTestQuery("SELECT 1");
+ cpds.setIdleConnectionTestPeriod(60);
+
+ logger.info("Opened database successfully");
+ return cpds.getConnection();
+
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java
new file mode 100644
index 000000000..17661b99e
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java
@@ -0,0 +1,114 @@
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author D. Pierrakos
+ */
+public class DatasetsStatsDB {
+
+ private String logPath;
+ private String logRepoPath;
+ private String logPortalPath;
+
+ private Statement stmt = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(DatasetsStatsDB.class);
+
+ public DatasetsStatsDB(String logRepoPath, String logPortalPath) throws Exception {
+ this.logRepoPath = logRepoPath;
+ this.logPortalPath = logPortalPath;
+
+ }
+
+ public void recreateDBAndTables() throws Exception {
+ this.createDatabase();
+ this.createTables();
+ }
+
+ private void createDatabase() throws Exception {
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Dropping datasets DB: " + ConnectDB.getDataSetUsageStatsDBSchema());
+ String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + " CASCADE";
+ stmt.executeUpdate(dropDatabase);
+ } catch (Exception e) {
+ logger.error("Failed to drop database: " + e);
+ throw new Exception("Failed to drop database: " + e.toString(), e);
+ }
+
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ logger.info("Creating usagestats DB: " + ConnectDB.getDataSetUsageStatsDBSchema());
+ String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema();
+ stmt.executeUpdate(createDatabase);
+
+ } catch (Exception e) {
+ logger.error("Failed to create database: " + e);
+ throw new Exception("Failed to create database: " + e.toString(), e);
+ }
+ }
+
+ private void createTables() throws Exception {
+ try {
+ stmt = ConnectDB.getHiveConnection().createStatement();
+
+ // Create Reports table - This table should exist
+ logger.info("Creating Reports Table");
+ String sqlCreateTableDataciteReports = "CREATE TABLE IF NOT EXISTS "
+ + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports(reportid STRING, \n"
+ + " name STRING, \n"
+ + " source STRING,\n"
+ + " release STRING,\n"
+ + " createdby STRING,\n"
+ + " report_start_date STRING,\n"
+ + " report_end_date STRING)\n"
+ + " CLUSTERED BY (reportid)\n"
+ + " into 100 buckets stored as orc tblproperties('transactional'='true')";
+
+ stmt.executeUpdate(sqlCreateTableDataciteReports);
+ logger.info("Reports Table Created");
+
+ // Create Datasets Performance Table
+ logger.info("Creating DataSetsPerformance Table");
+ String sqlCreateTableDataSetsPerformance = "CREATE TABLE IF NOT EXISTS "
+ + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance(ds_type STRING,\n"
+ + " ds_title STRING,\n"
+ + " yop STRING,\n"
+ + " dataset_type STRING, \n"
+ + " uri STRING,\n"
+ + " platform STRING,\n"
+ + " publisher STRING,\n"
+ + " publisher_id array>,\n"
+ + " dataset_contributors array>,\n"
+ + " period_end STRING,\n"
+ + " period_from STRING,\n"
+ + " access_method STRING,\n"
+ + " metric_type STRING,\n"
+ + " count INT,\n"
+ + " reportid STRING)\n"
+ + " CLUSTERED BY (ds_type)\n"
+ + " into 100 buckets stored as orc tblproperties('transactional'='true')";
+ stmt.executeUpdate(sqlCreateTableDataSetsPerformance);
+ logger.info("DataSetsPerformance Table Created");
+
+ stmt.close();
+ ConnectDB.getHiveConnection().close();
+
+ } catch (Exception e) {
+ logger.error("Failed to create tables: " + e);
+ throw new Exception("Failed to create tables: " + e.toString(), e);
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java
new file mode 100644
index 000000000..02754e173
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java
@@ -0,0 +1,100 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * @author D.Pierrakos
+ */
+public class DownloadReportsListFromDatacite {
+
+ private String dataciteBaseURL;
+ private String dataciteReportPath;
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ public DownloadReportsListFromDatacite(String dataciteBaseURL, String dataciteReportPath)
+ throws MalformedURLException, Exception {
+
+ this.dataciteBaseURL = dataciteBaseURL;
+ this.dataciteReportPath = dataciteReportPath;
+ }
+
+ public void downloadReportsList() throws ParseException {
+ StringBuilder responseStrBuilder = new StringBuilder();
+
+ Gson gson = new Gson();
+
+ try {
+ BufferedInputStream in = new BufferedInputStream(new URL(dataciteBaseURL).openStream());
+ BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String inputStr;
+
+ while ((inputStr = streamReader.readLine()) != null) {
+ responseStrBuilder.append(inputStr);
+ }
+ } catch (IOException e) {
+ logger.info(e.getMessage());
+ }
+ JsonObject jsonObject = gson.fromJson(responseStrBuilder.toString(), JsonObject.class);
+ JsonArray dataArray = jsonObject.getAsJsonArray("reports");
+ ArrayList reportsList = new ArrayList();
+ for (JsonElement element : dataArray) {
+ reportsList.add(element.getAsJsonObject().get("id").getAsString());
+ }
+
+ Iterator it = reportsList.iterator();
+ while (it.hasNext()) {
+ String reportId = it.next().toString();
+ String url = dataciteBaseURL + reportId;
+
+ try {
+ BufferedInputStream in = new BufferedInputStream(new URL(url).openStream());
+ BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String inputStr;
+ StringBuilder responseStrBuilder2 = new StringBuilder();
+ while ((inputStr = streamReader.readLine()) != null) {
+ responseStrBuilder2.append(inputStr);
+ }
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataOutputStream fin = fs
+ .create(
+ new Path(dataciteReportPath + "/" + reportId + ".json"),
+ true);
+ byte[] jsonObjectRawBytes = responseStrBuilder2.toString().getBytes();
+ fin.write(jsonObjectRawBytes);
+ fin.writeChar('\n');
+
+ fin.close();
+
+ fin.close();
+ } catch (IOException e) {
+ System.out.println(e);
+ }
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java
new file mode 100644
index 000000000..b28578e4b
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java
@@ -0,0 +1,69 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.BasicConfigurator;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+/**
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class ExecuteWorkflow {
+
+ static String dataciteBaseURL;
+ static String dataciteReportPath;
+ static String dbHiveUrl;
+ static String dbImpalaUrl;
+ static String datasetUsageStatsDBSchema;
+ static String statsDBSchema;
+ static boolean recreateDbAndTables;
+ static boolean datasetsEmptyDirs;
+ static boolean finalTablesVisibleToImpala;
+
+ public static void main(String args[]) throws Exception {
+
+ // Sending the logs to the console
+ BasicConfigurator.configure();
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ UsageStatsExporter.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json")));
+ parser.parseArgument(args);
+
+ // Setting up the initial parameters
+ dataciteBaseURL = parser.get("dataciteBaseURL");
+ dataciteReportPath = parser.get("dataciteReportPath");
+ dbHiveUrl = parser.get("dbHiveUrl");
+ dbImpalaUrl = parser.get("dbImpalaUrl");
+ datasetUsageStatsDBSchema = parser.get("datasetUsageStatsDBSchema");
+ statsDBSchema = parser.get("statsDBSchema");
+
+ if (parser.get("recreateDbAndTables").toLowerCase().equals("true"))
+ recreateDbAndTables = true;
+ else
+ recreateDbAndTables = false;
+
+ if (parser.get("datasetsEmptyDirs").toLowerCase().equals("true"))
+ datasetsEmptyDirs = true;
+ else
+ datasetsEmptyDirs = false;
+
+// if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
+// finalTablesVisibleToImpala = true;
+// else
+// finalTablesVisibleToImpala = false;
+//
+ UsageStatsExporter usagestatsExport = new UsageStatsExporter();
+ usagestatsExport.export();
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java
new file mode 100644
index 000000000..6e8c0e397
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java
@@ -0,0 +1,325 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * @author D.Pierrakos
+ */
+public class ReadReportsListFromDatacite {
+
+ private String dataciteReportPath;
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ public ReadReportsListFromDatacite(String dataciteReportPath) throws MalformedURLException, Exception {
+
+ this.dataciteReportPath = dataciteReportPath;
+ }
+
+ public void readReports() throws Exception {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+ ArrayList jsonFiles = listHdfsDir(dataciteReportPath);
+ for (String jsonFile : jsonFiles) {
+ logger.info("Reading report file " + jsonFile);
+ this.createTmpReportsTable(jsonFile);
+
+ String sqlSelectReportID = "SELECT get_json_object(json, '$.report.id') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelectReportID);
+ ResultSet rstmpReportID = stmt.getResultSet();
+
+ String reportID = null;
+ while (rstmpReportID.next()) {
+ reportID = rstmpReportID.getString(1);
+ }
+
+ logger.info("Checking report with id " + reportID);
+ String sqlCheckIfReportExists = "SELECT source FROM " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacitereports where reportid=?";
+ PreparedStatement stGetReportID = ConnectDB.getHiveConnection().prepareStatement(sqlCheckIfReportExists);
+ stGetReportID.setString(1, reportID);
+
+ ResultSet rsCheckIfReportExist = stGetReportID.executeQuery();
+
+ if (rsCheckIfReportExist.next()) {
+ logger.info("Report found with ID " + reportID);
+ dropTmpReportsTable();
+ } else {
+ String sqlInsertReport = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + " .datacitereports "
+ + "SELECT\n"
+ + " get_json_object(json, '$.report.id') AS reportid,\n"
+ + " get_json_object(json, '$.report.report-header.report-name') AS name,\n"
+ + " get_json_object(json, '$.report.report-header.report-id') AS source,\n"
+ + " get_json_object(json, '$.report.report-header.release') AS release,\n"
+ + " get_json_object(json, '$.report.report-header.created-by\') AS createdby,\n"
+ + " get_json_object(json, '$.report.report-header.reporting-period.begin-date') AS fromdate,\n"
+ + " get_json_object(json, '$.report.report-header.reporting-period.end-date') AS todate \n"
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlInsertReport);
+
+ logger.info("Report added");
+
+ logger.info("Adding datasets");
+ String sqlSelecteDatasetsArray = "SELECT get_json_object(json, '$.report.report-datasets') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelecteDatasetsArray);
+ ResultSet rstmpReportDatasets = stmt.getResultSet();
+
+ if (rstmpReportDatasets.next() && rstmpReportDatasets.getString(1).indexOf(',') > 0) {
+ // String[] listDatasets = rstmpReportDatasets.getString(1).split(",");
+ // String listDatasets = rstmpReportDatasets.getString(1);
+ String sqlSelectReport = "SELECT * FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelectReport);
+ ResultSet rstmpReportAll = stmt.getResultSet();
+ if (rstmpReportAll.next()) {
+ String listDatasets = rstmpReportAll.getString(1);
+ logger.info("No compressed performance found");
+ this.readDatasetsReport(listDatasets, reportID);
+ }
+
+ }
+ logger.info("Adding gziped performance for datasets");
+ String sqlSelecteReportSubsets = "SELECT get_json_object(json, '$.report.report-subsets.gzip[0]') FROM "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(sqlSelecteReportSubsets);
+ ResultSet rstmpReportSubsets = stmt.getResultSet();
+ if (rstmpReportSubsets.next()) {
+ String unCompressedReport = uncompressString(rstmpReportSubsets.getString(1));
+ this.readDatasetsReport(unCompressedReport, reportID);
+ }
+ }
+ }
+ this.dropTmpReportsTable();
+ }
+
+ public void readDatasetsReport(String prettyDatasetsReports, String reportId) throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.readValue(prettyDatasetsReports, JsonNode.class);
+ String datasetsReports = jsonNode.toString();
+ String report = datasetsReports
+ .replace("report-datasets", "report_datasets")
+ .replace("dataset-title", "dataset_title")
+ .replace("dataset-id", "dataset_id")
+ .replace("data-type", "data_type")
+ .replace("publisher-id", "publisher_id")
+ .replace("dataset-contributors", "dataset_contributors")
+ .replace("begin-date", "begin_date")
+ .replace("end-date", "end_date")
+ .replace("access-method", "access_method")
+ .replace("metric-type", "metric_type")
+ .replace("doi:", "");
+ FileSystem fs = FileSystem.get(new Configuration());
+ String tmpPath = dataciteReportPath + "/tmpjson";
+ FSDataOutputStream fin = fs
+ .create(new Path(dataciteReportPath + "/tmpjson/" + reportId + "_Compressed.json"), true);
+ byte[] jsonObjectRawBytes = report.getBytes();
+
+ fin.write(jsonObjectRawBytes);
+
+ fin.writeChar('\n');
+ fin.close();
+
+ logger.info("Write Compress Report To File");
+ logger.info("Reading Compress Report From File...");
+
+ String sqlCreateTempTableForDatasets = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".tmpjsoncompressesed (report_datasets array>,dataset_title:string, data_type:string, "
+ + "uri:string, publisher:string, publisher_id:array>,platform:string, yop:string, "
+ + "dataset_contributors:array>,"
+ + "performance:array, "
+ + "instance:array>>>>>) "
+ + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
+ + "LOCATION '" + tmpPath + "'";
+
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+
+ ConnectDB.getHiveConnection().setAutoCommit(false);
+
+ logger.info("Adding JSON Serde jar");
+ stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
+ logger.info("Added JSON Serde jar");
+
+ logger.info("Inserting Datasets Performance");
+ stmt.execute(sqlCreateTempTableForDatasets);
+
+ String sqlInsertToDatasetsPerformance = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datasetsperformance SELECT dataset.dataset_id[0].value ds_type, "
+ + " dataset.dataset_title ds_title, "
+ + " dataset.yop yop, "
+ + " dataset.data_type dataset_type, "
+ + " dataset.uri uri, "
+ + " dataset.platform platform, "
+ + " dataset.publisher publisher, "
+ + " dataset.publisher_id publisher_id, "
+ + " dataset.dataset_contributors dataset_contributors, "
+ + " period.end_date period_end, "
+ + " period.begin_date period_from, "
+ + " performance.access_method access_method, "
+ + " performance.metric_type metric_type, "
+ + " performance.count count, "
+ + "'" + reportId + "' report_id "
+ + " FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed "
+ + " LATERAL VIEW explode(report_datasets) exploded_table as dataset LATERAL VIEW explode(dataset.performance[0].instance) exploded_table2 as performance "
+ + " LATERAL VIEW explode (array(dataset.performance[0].period)) exploded_table3 as period";
+
+ stmt.executeUpdate(sqlInsertToDatasetsPerformance);
+
+ logger.info("Datasets Performance Inserted ");
+
+ stmt.execute("Drop table " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed");
+
+ logger.info("Datasets Report Added");
+
+ }
+
+ private ArrayList listHdfsDir(String dir) throws Exception {
+
+ FileSystem hdfs = FileSystem.get(new Configuration());
+ RemoteIterator Files;
+ ArrayList fileNames = new ArrayList<>();
+
+ try {
+ Path exportPath = new Path(hdfs.getUri() + dir);
+ Files = hdfs.listFiles(exportPath, false);
+ while (Files.hasNext()) {
+ String fileName = Files.next().getPath().toString();
+ fileNames.add(fileName);
+ }
+
+ hdfs.close();
+ } catch (Exception e) {
+ logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + dir));
+ throw new Exception("HDFS file path with exported data does not exist : " + dir, e);
+ }
+
+ return fileNames;
+ }
+
+ private String readHDFSFile(String filename) throws Exception {
+ String result;
+ try {
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ // log.info("reading file : " + filename);
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
+
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line);
+ // sb.append(line);
+ line = br.readLine();
+ }
+ // uncompressedReport = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
+ result = sb.toString().trim();
+ // fs.close();
+ } catch (Exception e) {
+ throw new Exception(e);
+ }
+
+ return result;
+ }
+
+ public static String uncompressString(String zippedBase64Str)
+ throws IOException {
+ String uncompressedReport = null;
+
+ byte[] bytes = Base64.getDecoder().decode(zippedBase64Str);
+ GZIPInputStream zi = null;
+ try {
+ zi = new GZIPInputStream(new ByteArrayInputStream(bytes));
+ uncompressedReport = IOUtils.toString(zi);
+ } finally {
+ IOUtils.closeQuietly(zi);
+ }
+ logger.info("Report Succesfully Uncompressed...");
+ return uncompressedReport;
+ }
+
+ private void createTmpReportsTable(String jsonFile) throws SQLException {
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ dropTmpReportsTable();
+ String createTmpTable = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".tmpjsonToTable (json STRING)";
+ stmt.executeUpdate(createTmpTable);
+ logger.info("Temporary Table for Json Report Created");
+
+ String insertJsonReport = "LOAD DATA INPATH '" + jsonFile + "' INTO TABLE "
+ + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ stmt.execute(insertJsonReport);
+ logger.info("JSON Report File inserted to tmpjsonToTable Table");
+ }
+
+ private void dropTmpReportsTable() throws SQLException {
+ logger.info("Dropping tmpjson Table");
+ String dropTmpTable = "DROP TABLE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable";
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ stmt.executeUpdate(dropTmpTable);
+ logger.info("Dropped Table for Json Report Table");
+
+ }
+
+ public void createUsageStatisticsTable() throws SQLException {
+ logger.info("Dropping Downloads Stats table");
+ Statement stmt = ConnectDB.getHiveConnection().createStatement();
+ String dropDownloadsTable = "DROP TABLE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacite_downloads";
+ stmt.executeUpdate(dropDownloadsTable);
+
+ logger.info("Creating Downloads Stats table");
+ String createDownloadsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ + ".datacite_downloads as "
+ + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ + "where metric_type='total-dataset-requests'";
+ stmt.executeUpdate(createDownloadsTable);
+ logger.info("Downloads Stats table created");
+
+ logger.info("Creating Views Stats table");
+ String createViewsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views as "
+ + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ + "where metric_type='total-dataset-investigations'";
+ stmt.executeUpdate(createViewsTable);
+ logger.info("Views Stats table created");
+ }
+
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java
new file mode 100644
index 000000000..d96d7e875
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java
@@ -0,0 +1,71 @@
+
+package eu.dnetlib.oa.graph.datasetsusagestats.export;
+
+import java.io.IOException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main class for downloading and processing Usage statistics
+ *
+ * @author D. Pierrakos, S. Zoupanos
+ */
+public class UsageStatsExporter {
+
+ private Statement stmt = null;
+
+ public UsageStatsExporter() {
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
+
+ private void reCreateLogDirs() throws IllegalArgumentException, IOException {
+ FileSystem dfs = FileSystem.get(new Configuration());
+
+ logger.info("Deleting Log directory: " + ExecuteWorkflow.dataciteReportPath);
+ dfs.delete(new Path(ExecuteWorkflow.dataciteReportPath), true);
+
+ logger.info("Creating Log directory: " + ExecuteWorkflow.dataciteReportPath);
+ dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath));
+
+ logger.info("Creating tmp directory: " + ExecuteWorkflow.dataciteReportPath + " " + "/tmpjson/");
+ dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath + "/tmpjson/"));
+
+ }
+
+ public void export() throws Exception {
+
+ logger.info("Initialising DB properties");
+ ConnectDB.init();
+ ConnectDB.getHiveConnection();
+
+ if (ExecuteWorkflow.recreateDbAndTables) {
+ DatasetsStatsDB datasetsDB = new DatasetsStatsDB("", "");
+ datasetsDB.recreateDBAndTables();
+ }
+ logger.info("Initializing the download logs module");
+ DownloadReportsListFromDatacite downloadReportsListFromDatacite = new DownloadReportsListFromDatacite(
+ ExecuteWorkflow.dataciteBaseURL,
+ ExecuteWorkflow.dataciteReportPath);
+
+ if (ExecuteWorkflow.datasetsEmptyDirs) {
+ logger.info("Downloading Reports List From Datacite");
+ this.reCreateLogDirs();
+ downloadReportsListFromDatacite.downloadReportsList();
+ logger.info("Reports List has been downloaded");
+ }
+
+ ReadReportsListFromDatacite readReportsListFromDatacite = new ReadReportsListFromDatacite(
+ ExecuteWorkflow.dataciteReportPath);
+ logger.info("Store Reports To DB");
+ readReportsListFromDatacite.readReports();
+ logger.info("Reports Stored To DB");
+ readReportsListFromDatacite.createUsageStatisticsTable();
+ }
+}
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json
new file mode 100644
index 000000000..f8d51a882
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json
@@ -0,0 +1,56 @@
+[
+ {
+ "paramName": "dbu",
+ "paramLongName": "dataciteBaseURL",
+ "paramDescription": "URL of Datacite Reports Endpoint",
+ "paramRequired": true
+ },
+ {
+ "paramName": "drp",
+ "paramLongName": "dataciteReportPath",
+ "paramDescription": "Path for Datacite Reports",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbhu",
+ "paramLongName": "dbHiveUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbiu",
+ "paramLongName": "dbImpalaUrl",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dusdbs",
+ "paramLongName": "datasetUsageStatsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "sdbs",
+ "paramLongName": "statsDBSchema",
+ "paramDescription": "activate tranform-only mode. Only apply transformation step",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rdbt",
+ "paramLongName": "recreateDbAndTables",
+ "paramDescription": "Re-create database and initial tables?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pwed",
+ "paramLongName": "datasetsEmptyDirs",
+ "paramDescription": "Empty piwik directories?",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ftvi",
+ "paramLongName": "finalTablesVisibleToImpala",
+ "paramDescription": "Make the dataset_usage_stats, visible to Impala",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml
new file mode 100644
index 000000000..b5c807378
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml
@@ -0,0 +1,38 @@
+
+
+ jobTracker
+ ${jobTracker}
+
+
+ nameNode
+ ${nameNode}
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1
+
+
+ impalaJdbcUrl
+ jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;
+
+
+ oozie.wf.workflow.notification.url
+ {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status
+
+
+ oozie.use.system.libpath
+ true
+
+
diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml
new file mode 100644
index 000000000..36c1ccea5
--- /dev/null
+++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml
@@ -0,0 +1,70 @@
+
+
+
+ hiveMetastoreUris
+ Hive server metastore URIs
+
+
+ hiveJdbcUrl
+ Hive server jdbc url
+
+
+ impalaJdbcUrl
+ Impala server jdbc url
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ hive.metastore.uris
+ ${hiveMetastoreUris}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ eu.dnetlib.oa.graph.datasetsusagestats.export.ExecuteWorkflow
+ --dataciteBaseURL
+ ${dataciteBaseURL}
+ --dataciteReportPath
+ ${dataciteReportPath}
+ --dbHiveUrl
+ ${hiveJdbcUrl}
+ --dbImpalaUrl
+ ${impalaJdbcUrl}
+ --datasetUsageStatsDBSchema
+ ${datasetUsageStatsDBSchema}
+ --statsDBSchema
+ ${statsDBSchema}
+ --recreateDbAndTables
+ ${recreateDbAndTables}
+ --datasetsEmptyDirs
+ ${datasetsEmptyDirs}
+ --finalTablesVisibleToImpala
+ ${finalTablesVisibleToImpala}
+
+
+
+
+
+
+
+
diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
index 253dc03b5..5a6953f4c 100644
--- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
+++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java
@@ -132,7 +132,7 @@ public class PiwikStatsDB {
+ "max(views) AS count, max(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
- + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' "
+ + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(create_views_stats);
@@ -145,7 +145,7 @@ public class PiwikStatsDB {
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=" + ExecuteWorkflow.portalMatomoID
- + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' "
+ + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(create_pageviews_stats);
@@ -194,7 +194,7 @@ public class PiwikStatsDB {
+ "max(downloads) AS count, max(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro "
- + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' "
+ + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month "
+ "ORDER BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
diff --git a/nbactions.xml b/nbactions.xml
new file mode 100644
index 000000000..4b6f7519d
--- /dev/null
+++ b/nbactions.xml
@@ -0,0 +1,15 @@
+
+
+
+ test
+
+ *
+
+
+ test
+
+
+ true
+
+
+