diff --git a/.gitignore b/.gitignore index 2d7730711..f5d6c2bc0 100644 --- a/.gitignore +++ b/.gitignore @@ -23,5 +23,4 @@ /build spark-warehouse /**/job-override.properties -/**/*.log - +/**/*.log \ No newline at end of file diff --git a/dhp-workflows/dhp-indicators/nb-configuration.xml b/dhp-workflows/dhp-indicators/nb-configuration.xml new file mode 100644 index 000000000..a65c4514a --- /dev/null +++ b/dhp-workflows/dhp-indicators/nb-configuration.xml @@ -0,0 +1,18 @@ + + + + + + JDK_1.8 + + diff --git a/dhp-workflows/dhp-indicators/pom.xml b/dhp-workflows/dhp-indicators/pom.xml new file mode 100755 index 000000000..72ad153f1 --- /dev/null +++ b/dhp-workflows/dhp-indicators/pom.xml @@ -0,0 +1,107 @@ + + + + + + dhp-workflows + eu.dnetlib.dhp + 1.1.7-SNAPSHOT + ../ + + 4.0.0 + dhp-indicators + + + + pl.project13.maven + git-commit-id-plugin + 2.1.15 + + + + revision + + + + + ${project.basedir}/../.git + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + + + + + + UTF-8 + UTF-8 + 0.13.1-cdh5.2.1 + 2.5.0-cdh5.2.1 + + + + + org.apache.spark + spark-core_2.11 + 2.2.0 + + + org.apache.spark + spark-sql_2.11 + 2.4.5 + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + org.json + json + 20180130 + jar + + + org.apache.hive + hive-jdbc + ${cdh.hive.version} + + + org.apache.hadoop + hadoop-common + 2.7.4 + jar + + + eu.dnetlib.dhp + dhp-common + 1.1.7-SNAPSHOT + jar + + + com.mchange + c3p0 + 0.9.5.2 + + + c3p0 + c3p0 + 0.9.1.2 + jar + + + org.slf4j + slf4j-api + 1.7.26 + jar + + + dhp-indicators + diff --git a/dhp-workflows/dhp-indicators/runworkflow.sh b/dhp-workflows/dhp-indicators/runworkflow.sh new file mode 100755 index 000000000..0cad5792d --- /dev/null +++ b/dhp-workflows/dhp-indicators/runworkflow.sh @@ -0,0 +1 @@ +mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/indicators \ No newline at end of file diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml new file mode 100644 index 000000000..6d255a7f4 --- /dev/null +++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/config-default.xml @@ -0,0 +1,34 @@ + + + jobTracker + ${jobTracker} + + + nameNode + ${nameNode} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hive_jdbc_url + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + oozie.wf.workflow.notification.url + {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql new file mode 100644 index 000000000..fe9eaec04 --- /dev/null +++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/createIndicatorsTables.sql @@ -0,0 +1,47 @@ +create table TARGET.indi_pub_green_oa stored as parquet as +select distinct p.id, coalesce(green_oa, 0) as green_oa +from SOURCE.publication p +left outer join ( +select p.id, 1 as green_oa +from SOURCE.publication p +join SOURCE.result_instance ri on ri.id = p.id +join SOURCE.datasource on datasource.id = ri.hostedby +where SOURCE.datasource.type like '%Repository%' +and (ri.accessright = 'Open Access' +or ri.accessright = 'Embargo')) tmp +on p.id= tmp.id; + +create table TARGET.indi_pub_grey_lit stored as parquet as +select distinct p.id, coalesce(grey_lit, 0) as grey_lit +from SOURCE.publication p +left outer join ( +select p.id, 1 as grey_lit +from SOURCE.publication p +join SOURCE.result_classifications rt on rt.id = p.id +where rt.type not in ('Article','Part of book or chapter of book','Book','Doctoral thesis','Master thesis','Data Paper', 'Thesis', 'Bachelor thesis', 'Conference object') and +not exists (select 1 from SOURCE.result_classifications rc where type ='Other literature type' and rc.id=p.id)) tmp on p.id=tmp.id; + +create table TARGET.indi_pub_doi_from_crossref stored as parquet as +select distinct p.id, coalesce(doi_from_crossref, 0) as doi_from_crossref +from SOURCE.publication p +left outer join +(select ri.id, 1 as doi_from_crossref from SOURCE.result_instance ri +join SOURCE.datasource d on d.id = ri.collectedfrom +where pidtype='Digital Object Identifier' and d.name ='Crossref') tmp +on tmp.id=p.id; + +create table TARGET.indi_pub_gold_oa stored as parquet as +select distinct p.id, coalesce(gold_oa, 0) as gold_oa +from SOURCE.publication p +left outer join ( +select p.id, 1 as gold_oa +from SOURCE.publication p +join SOURCE.result_instance ri on ri.id = p.id +join SOURCE.datasource on datasource.id = ri.hostedby +where SOURCE.datasource.id like '%doajarticles%') tmp +on p.id= tmp.id; + +compute stats TARGET.indi_pub_green_oa; +compute stats TARGET.indi_pub_grey_lit; +compute stats TARGET.indi_pub_doi_from_crossref; +compute stats TARGET.indi_pub_gold_oa; \ No newline at end of file diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh new file mode 100644 index 000000000..306609e8a --- /dev/null +++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/scripts/indicators.sh @@ -0,0 +1,29 @@ +export PYTHON_EGG_CACHE=/home/$(whoami)/.python-eggs +export link_folder=/tmp/impala-shell-python-egg-cache-$(whoami) +if ! [ -L $link_folder ] +then + rm -Rf "$link_folder" + ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder} +fi + +export SOURCE=$1 +export TARGET=$2 +export SHADOW=$3 +export SCRIPT_PATH=$4 + +echo "Getting file from " $4 +hdfs dfs -copyToLocal $4 + +echo "Creating indicators database" +impala-shell -q "drop database if exists ${TARGET} cascade" +impala-shell -q "create database if not exists ${TARGET}" +impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/\(.*\)/create view ${TARGET}.\1 as select * from ${SOURCE}.\1;/" | impala-shell -f - +cat createIndicatorsTables.sql | sed s/SOURCE/$1/g | sed s/TARGET/$2/g1 | impala-shell -f - +echo "Indicators Database created" + + +echo "Updating Shadow indicators DB" +impala-shell -q "create database if not exists ${SHADOW}" +impala-shell -d ${SHADOW} -q "show tables" --delimited | sed "s/^/drop view if exists ${SHADOW}./" | sed "s/$/;/" | impala-shell -f - +impala-shell -d ${TARGET} -q "show tables" --delimited | sed "s/\(.*\)/create view ${SHADOW}.\1 as select * from ${TARGET}.\1;/" | impala-shell -f - +echo "Indicators Shadow DB ready!" \ No newline at end of file diff --git a/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml new file mode 100644 index 000000000..ec917b9a4 --- /dev/null +++ b/dhp-workflows/dhp-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/indicators/oozie_app/workflow.xml @@ -0,0 +1,101 @@ + + + + stats_db_name + the source stats database name + + + indicators_db_name + the target indicators database name + + + indicators_shadow_db_name + the name of the shadow schema + + + + hive_metastore_uris + hive server metastore URIs + + + hive_jdbc_url + hive server jdbc url + + + + + + + ${jobTracker} + ${nameNode} + + + hive.metastore.uris + ${hive_metastore_uris} + + + + + + + + + ${jobTracker} + ${nameNode} + indicators.sh + ${stats_db_name} + ${indicators_db_name} + ${indicators_shadow_db_name} + ${wf:appPath()}/scripts/createIndicatorsTables.sql + scripts/indicators.sh + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml new file mode 100644 index 000000000..a65c4514a --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/nb-configuration.xml @@ -0,0 +1,18 @@ + + + + + + JDK_1.8 + + diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml new file mode 100755 index 000000000..c623a12f0 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/pom.xml @@ -0,0 +1,121 @@ + + + + + + + + + dhp-workflows + eu.dnetlib.dhp + 1.2.4-SNAPSHOT + ../ + + 4.0.0 + dhp-usage-datasets-stats-update + + + + pl.project13.maven + git-commit-id-plugin + 2.1.15 + + + + revision + + + + + ${project.basedir}/../.git + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + + + + + + UTF-8 + UTF-8 + 0.13.1-cdh5.2.1 + 2.5.0-cdh5.2.1 + + + + + org.apache.spark + spark-core_2.11 + 2.2.0 + + + org.apache.spark + spark-sql_2.11 + 2.4.5 + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + org.json + json + 20180130 + jar + + + org.apache.hive + hive-jdbc + ${cdh.hive.version} + + + org.apache.hadoop + hadoop-common + 2.7.4 + jar + + + eu.dnetlib.dhp + dhp-common + 1.2.4-SNAPSHOT + jar + + + com.mchange + c3p0 + 0.9.5.2 + + + c3p0 + c3p0 + 0.9.1.2 + jar + + + org.slf4j + slf4j-api + 1.7.26 + jar + + + dhp-usage-datasets-stats-update + diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh new file mode 100755 index 000000000..9b4325508 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/runworkflow.sh @@ -0,0 +1 @@ +mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/datasetsusagestats \ No newline at end of file diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java new file mode 100644 index 000000000..de9e44fbf --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ConnectDB.java @@ -0,0 +1,148 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +import org.apache.log4j.Logger; + +/** + * @author D. Pierrakos + */ +/** + * @author D. Pierrakos + */ +import com.mchange.v2.c3p0.ComboPooledDataSource; + +public abstract class ConnectDB { + + public static Connection DB_HIVE_CONNECTION; + public static Connection DB_IMPALA_CONNECTION; + + private static String dbHiveUrl; + private static String dbImpalaUrl; + private static String datasetUsageStatsDBSchema; + private static String datasetsUsageStatsPermanentDBSchema; + private static String statsDBSchema; + private final static Logger logger = Logger.getLogger(ConnectDB.class); + private Statement stmt = null; + + static void init() throws ClassNotFoundException { + + dbHiveUrl = ExecuteWorkflow.dbHiveUrl; + dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl; + datasetUsageStatsDBSchema = ExecuteWorkflow.datasetUsageStatsDBSchema; + datasetsUsageStatsPermanentDBSchema = ExecuteWorkflow.datasetsUsageStatsPermanentDBSchema; + statsDBSchema = ExecuteWorkflow.statsDBSchema; + + Class.forName("org.apache.hive.jdbc.HiveDriver"); + } + + public static Connection getHiveConnection() throws SQLException { + if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) { + return DB_HIVE_CONNECTION; + } else { + DB_HIVE_CONNECTION = connectHive(); + + return DB_HIVE_CONNECTION; + } + } + + public static Connection getImpalaConnection() throws SQLException { + if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) { + return DB_IMPALA_CONNECTION; + } else { + DB_IMPALA_CONNECTION = connectImpala(); + + return DB_IMPALA_CONNECTION; + } + } + + public static String getDataSetUsageStatsDBSchema() { + String datePattern = "YYYYMMdd"; + DateFormat df = new SimpleDateFormat(datePattern); +// Get the today date using Calendar object. + Date today = Calendar.getInstance().getTime(); + String todayAsString = df.format(today); + + return ConnectDB.datasetUsageStatsDBSchema + "_" + todayAsString; + } + + public static String getStatsDBSchema() { + return ConnectDB.statsDBSchema; + } + + public static String getDatasetsUsagestatsPermanentDBSchema() { + return ConnectDB.datasetsUsageStatsPermanentDBSchema; + } + + private static Connection connectHive() throws SQLException { + logger.info("trying to open Hive connection..."); + + ComboPooledDataSource cpds = new ComboPooledDataSource(); + cpds.setJdbcUrl(dbHiveUrl); + cpds.setUser("dimitris.pierrakos"); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); + + cpds.setAcquireRetryAttempts(5); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); + + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); + + logger.info("Opened HIVE successfully"); + + return cpds.getConnection(); +// Connection connection = DriverManager.getConnection(dbHiveUrl); +// logger.debug("Opened Hive successfully"); +// +// return connection; + + } + + private static Connection connectImpala() throws SQLException { + logger.info("trying to open Impala connection..."); + ComboPooledDataSource cpds = new ComboPooledDataSource(); + cpds.setJdbcUrl(dbImpalaUrl); + cpds.setUser("dimitris.pierrakos"); + cpds.setAcquireIncrement(1); + cpds.setMaxPoolSize(100); + cpds.setMinPoolSize(1); + cpds.setInitialPoolSize(1); + cpds.setMaxIdleTime(300); + cpds.setMaxConnectionAge(36000); + + cpds.setAcquireRetryAttempts(5); + cpds.setAcquireRetryDelay(2000); + cpds.setBreakAfterAcquireFailure(false); + + cpds.setCheckoutTimeout(0); + cpds.setPreferredTestQuery("SELECT 1"); + cpds.setIdleConnectionTestPeriod(60); + + logger.info("Opened Impala successfully"); + return cpds.getConnection(); +// Connection connection = DriverManager.getConnection(dbHiveUrl); +// logger.debug("Opened Impala successfully"); +// +// return connection; + + } +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java new file mode 100644 index 000000000..baffa39e0 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DatasetsStatsDB.java @@ -0,0 +1,144 @@ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import java.sql.Statement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos + */ +public class DatasetsStatsDB { + + private String logPath; + private String logRepoPath; + private String logPortalPath; + + private Statement stmt = null; + + private static final Logger logger = LoggerFactory.getLogger(DatasetsStatsDB.class); + + public DatasetsStatsDB(String logRepoPath, String logPortalPath) throws Exception { + this.logRepoPath = logRepoPath; + this.logPortalPath = logPortalPath; + + } + + public void recreateDBAndTables() throws Exception { + this.createDatabase(); + this.createTables(); + } + + private void createDatabase() throws Exception { + try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Dropping datasets DB: " + ConnectDB.getDataSetUsageStatsDBSchema()); + String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + " CASCADE"; + stmt.executeUpdate(dropDatabase); + } catch (Exception e) { + logger.error("Failed to drop database: " + e); + throw new Exception("Failed to drop database: " + e.toString(), e); + } + + try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Creating datacite usagestats DB: " + ConnectDB.getDataSetUsageStatsDBSchema()); + String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema(); + stmt.executeUpdate(createDatabase); + + } catch (Exception e) { + logger.error("Failed to create database: " + e); + throw new Exception("Failed to create database: " + e.toString(), e); + } + try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger + .info( + "Creating permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema()); + String createPermanentDatabase = "CREATE DATABASE IF NOT EXISTS " + + ConnectDB.getDatasetsUsagestatsPermanentDBSchema(); + stmt.executeUpdate(createPermanentDatabase); + logger + .info( + "Created permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema()); + + } catch (Exception e) { + logger.error("Failed to create database: " + e); + throw new Exception("Failed to create database: " + e.toString(), e); + } + } + + private void createTables() throws Exception { + try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + // Create Reports table - This table should exist + logger.info("Creating Reports Tmp Table"); + String sqlCreateTableDataciteReports = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacitereports_tmp(reportid STRING, \n" + + " name STRING, \n" + + " source STRING,\n" + + " release STRING,\n" + + " createdby STRING,\n" + + " report_start_date STRING,\n" + + " report_end_date STRING)\n" + + " CLUSTERED BY (reportid)\n" + + " into 100 buckets stored as orc tblproperties('transactional'='true')"; + + stmt.executeUpdate(sqlCreateTableDataciteReports); + logger.info("Reports Table Created"); + + // Create Datasets Performance Table + logger.info("Creating DataSetsPerformance Tmp Table"); + String sqlCreateTableDataSetsPerformance = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datasetsperformance_tmp(ds_type STRING,\n" + + " ds_title STRING,\n" + + " yop STRING,\n" + + " dataset_type STRING, \n" + + " uri STRING,\n" + + " platform STRING,\n" + + " publisher STRING,\n" + + " publisher_id array>,\n" + + " dataset_contributors array>,\n" + + " period_end STRING,\n" + + " period_from STRING,\n" + + " access_method STRING,\n" + + " metric_type STRING,\n" + + " count INT,\n" + + " reportid STRING)\n" + + " CLUSTERED BY (ds_type)\n" + + " into 100 buckets stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(sqlCreateTableDataSetsPerformance); + logger.info("DataSetsPerformance Tmp Table Created"); + + logger.info("Creating Datacite Reports table"); + String createDataciteReportsTable = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacitereports LIKE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacitereports_tmp STORED AS PARQUET"; + stmt.executeUpdate(createDataciteReportsTable); + logger.info("Datacite Reports Table created"); + + logger.info("Creating Datasets Performance table"); + String createDatasetPerformanceTable = "CREATE TABLE IF NOT EXISTS " + + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datasetsperformance LIKE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datasetsperformance_tmp STORED AS PARQUET"; + stmt.executeUpdate(createDatasetPerformanceTable); + logger.info("DatasetsPerformance Table created"); + + stmt.close(); + ConnectDB.getHiveConnection().close(); + + } catch (Exception e) { + logger.error("Failed to create tables: " + e); + throw new Exception("Failed to create tables: " + e.toString(), e); + } + } + +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java new file mode 100644 index 000000000..02754e173 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/DownloadReportsListFromDatacite.java @@ -0,0 +1,100 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +/** + * @author D.Pierrakos + */ +public class DownloadReportsListFromDatacite { + + private String dataciteBaseURL; + private String dataciteReportPath; + private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class); + + public DownloadReportsListFromDatacite(String dataciteBaseURL, String dataciteReportPath) + throws MalformedURLException, Exception { + + this.dataciteBaseURL = dataciteBaseURL; + this.dataciteReportPath = dataciteReportPath; + } + + public void downloadReportsList() throws ParseException { + StringBuilder responseStrBuilder = new StringBuilder(); + + Gson gson = new Gson(); + + try { + BufferedInputStream in = new BufferedInputStream(new URL(dataciteBaseURL).openStream()); + BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8")); + String inputStr; + + while ((inputStr = streamReader.readLine()) != null) { + responseStrBuilder.append(inputStr); + } + } catch (IOException e) { + logger.info(e.getMessage()); + } + JsonObject jsonObject = gson.fromJson(responseStrBuilder.toString(), JsonObject.class); + JsonArray dataArray = jsonObject.getAsJsonArray("reports"); + ArrayList reportsList = new ArrayList(); + for (JsonElement element : dataArray) { + reportsList.add(element.getAsJsonObject().get("id").getAsString()); + } + + Iterator it = reportsList.iterator(); + while (it.hasNext()) { + String reportId = it.next().toString(); + String url = dataciteBaseURL + reportId; + + try { + BufferedInputStream in = new BufferedInputStream(new URL(url).openStream()); + BufferedReader streamReader = new BufferedReader(new InputStreamReader(in, "UTF-8")); + String inputStr; + StringBuilder responseStrBuilder2 = new StringBuilder(); + while ((inputStr = streamReader.readLine()) != null) { + responseStrBuilder2.append(inputStr); + } + FileSystem fs = FileSystem.get(new Configuration()); + FSDataOutputStream fin = fs + .create( + new Path(dataciteReportPath + "/" + reportId + ".json"), + true); + byte[] jsonObjectRawBytes = responseStrBuilder2.toString().getBytes(); + fin.write(jsonObjectRawBytes); + fin.writeChar('\n'); + + fin.close(); + + fin.close(); + } catch (IOException e) { + System.out.println(e); + } + } + } +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java new file mode 100644 index 000000000..ffa8b8199 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ExecuteWorkflow.java @@ -0,0 +1,71 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.BasicConfigurator; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class ExecuteWorkflow { + + static String dataciteBaseURL; + static String dataciteReportPath; + static String dbHiveUrl; + static String dbImpalaUrl; + static String datasetUsageStatsDBSchema; + static String datasetsUsageStatsPermanentDBSchema; + static String statsDBSchema; + static boolean recreateDbAndTables; + static boolean datasetsEmptyDirs; + static boolean finalTablesVisibleToImpala; + + public static void main(String args[]) throws Exception { + + // Sending the logs to the console + BasicConfigurator.configure(); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + UsageStatsExporter.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json"))); + parser.parseArgument(args); + + // Setting up the initial parameters + dataciteBaseURL = parser.get("dataciteBaseURL"); + dataciteReportPath = parser.get("dataciteReportPath"); + dbHiveUrl = parser.get("dbHiveUrl"); + dbImpalaUrl = parser.get("dbImpalaUrl"); + datasetUsageStatsDBSchema = parser.get("datasetUsageStatsDBSchema"); + datasetsUsageStatsPermanentDBSchema = parser.get("datasetsUsageStatsPermanentDBSchema"); + statsDBSchema = parser.get("statsDBSchema"); + + if (parser.get("recreateDbAndTables").toLowerCase().equals("true")) + recreateDbAndTables = true; + else + recreateDbAndTables = false; + + if (parser.get("datasetsEmptyDirs").toLowerCase().equals("true")) + datasetsEmptyDirs = true; + else + datasetsEmptyDirs = false; + + if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true")) + finalTablesVisibleToImpala = true; + else + finalTablesVisibleToImpala = false; + + UsageStatsExporter usagestatsExport = new UsageStatsExporter(); + usagestatsExport.export(); + } + +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java new file mode 100644 index 000000000..e89e2e5a4 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/ReadReportsListFromDatacite.java @@ -0,0 +1,388 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Base64; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author D.Pierrakos + */ +public class ReadReportsListFromDatacite { + + private String dataciteReportPath; + private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class); + + public ReadReportsListFromDatacite(String dataciteReportPath) throws MalformedURLException, Exception { + + this.dataciteReportPath = dataciteReportPath; + } + + public void readReports() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + ArrayList jsonFiles = listHdfsDir(dataciteReportPath); + for (String jsonFile : jsonFiles) { + logger.info("Reading report file " + jsonFile); + this.createTmpReportsTable(jsonFile); + + String sqlSelectReportID = "SELECT get_json_object(json, '$.report.id') FROM " + + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(sqlSelectReportID); + ResultSet rstmpReportID = stmt.getResultSet(); + + String reportID = null; + while (rstmpReportID.next()) { + reportID = rstmpReportID.getString(1); + } + + logger.info("Checking report with id " + reportID); + String sqlCheckIfReportExists = "SELECT source FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacitereports_tmp where reportid=?"; + PreparedStatement stGetReportID = ConnectDB.getHiveConnection().prepareStatement(sqlCheckIfReportExists); + stGetReportID.setString(1, reportID); + + ResultSet rsCheckIfReportExist = stGetReportID.executeQuery(); + + if (rsCheckIfReportExist.next()) { + logger.info("Report found with ID " + reportID); + dropTmpReportsTable(); + } else { + String sqlInsertReport = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() + + " .datacitereports_tmp " + + "SELECT\n" + + " get_json_object(json, '$.report.id') AS reportid,\n" + + " get_json_object(json, '$.report.report-header.report-name') AS name,\n" + + " get_json_object(json, '$.report.report-header.report-id') AS source,\n" + + " get_json_object(json, '$.report.report-header.release') AS release,\n" + + " get_json_object(json, '$.report.report-header.created-by\') AS createdby,\n" + + " get_json_object(json, '$.report.report-header.reporting-period.begin-date') AS fromdate,\n" + + " get_json_object(json, '$.report.report-header.reporting-period.end-date') AS todate \n" + + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(sqlInsertReport); + + logger.info("Report added"); + + logger.info("Adding datasets"); + String sqlSelecteDatasetsArray = "SELECT get_json_object(json, '$.report.report-datasets') FROM " + + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(sqlSelecteDatasetsArray); + ResultSet rstmpReportDatasets = stmt.getResultSet(); + + if (rstmpReportDatasets.next() && rstmpReportDatasets.getString(1).indexOf(',') > 0) { + // String[] listDatasets = rstmpReportDatasets.getString(1).split(","); + // String listDatasets = rstmpReportDatasets.getString(1); + String sqlSelectReport = "SELECT * FROM " + + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(sqlSelectReport); + ResultSet rstmpReportAll = stmt.getResultSet(); + if (rstmpReportAll.next()) { + String listDatasets = rstmpReportAll.getString(1); + logger.info("Adding uncompressed performance for " + reportID); + this.readDatasetsReport(listDatasets, reportID); + } + + } + logger.info("Adding gziped performance for datasets"); + String sqlSelecteReportSubsets = "SELECT get_json_object(json, '$.report.report-subsets.gzip[0]') FROM " + + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(sqlSelecteReportSubsets); + ResultSet rstmpReportSubsets = stmt.getResultSet(); + if (rstmpReportSubsets.next()) { + String unCompressedReport = uncompressString(rstmpReportSubsets.getString(1)); + this.readDatasetsReport(unCompressedReport, reportID); + } + } + } + this.dropTmpReportsTable(); + } + + public void readDatasetsReport(String prettyDatasetsReports, String reportId) throws Exception { + logger.info("Reading Datasets performance for report " + reportId); + logger.info("Write Performance Report To File"); + ConnectDB.getHiveConnection().setAutoCommit(false); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readValue(prettyDatasetsReports, JsonNode.class); + String datasetsReports = jsonNode.toString(); + String report = datasetsReports + .replace("report-datasets", "report_datasets") + .replace("dataset-title", "dataset_title") + .replace("dataset-id", "dataset_id") + .replace("data-type", "data_type") + .replace("publisher-id", "publisher_id") + .replace("dataset-contributors", "dataset_contributors") + .replace("begin-date", "begin_date") + .replace("end-date", "end_date") + .replace("access-method", "access_method") + .replace("metric-type", "metric_type") + .replace("doi:", ""); + FileSystem fs = FileSystem.get(new Configuration()); + String tmpPath = dataciteReportPath + "/tmpjson"; + FSDataOutputStream fin = fs + .create(new Path(dataciteReportPath + "/tmpjson/" + reportId + "_Compressed.json"), true); + byte[] jsonObjectRawBytes = report.getBytes(); + + fin.write(jsonObjectRawBytes); + + fin.writeChar('\n'); + fin.close(); + + logger.info("Reading Performance Report From File..."); + + String sqlCreateTempTableForDatasets = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".tmpjsoncompressesed (report_datasets array>,dataset_title:string, data_type:string, " + + "uri:string, publisher:string, publisher_id:array>,platform:string, yop:string, " + + "dataset_contributors:array>," + + "performance:array, " + + "instance:array>>>>>) " + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + tmpPath + "'"; + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Adding JSON Serde jar"); + stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); + logger.info("Added JSON Serde jar"); + + logger.info("Inserting Datasets Performance"); + stmt.execute(sqlCreateTempTableForDatasets); + + String sqlInsertToDatasetsPerformance = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datasetsperformance_tmp SELECT dataset.dataset_id[0].value ds_type, " + + " dataset.dataset_title ds_title, " + + " dataset.yop yop, " + + " dataset.data_type dataset_type, " + + " dataset.uri uri, " + + " dataset.platform platform, " + + " dataset.publisher publisher, " + + " dataset.publisher_id publisher_id, " + + " dataset.dataset_contributors dataset_contributors, " + + " period.end_date period_end, " + + " period.begin_date period_from, " + + " performance.access_method access_method, " + + " performance.metric_type metric_type, " + + " performance.count count, " + + "'" + reportId + "' report_id " + + " FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed " + + " LATERAL VIEW explode(report_datasets) exploded_table as dataset LATERAL VIEW explode(dataset.performance[0].instance) exploded_table2 as performance " + + " LATERAL VIEW explode (array(dataset.performance[0].period)) exploded_table3 as period"; + + stmt.executeUpdate(sqlInsertToDatasetsPerformance); + + logger.info("Datasets Performance Inserted for Report " + reportId); + + stmt.execute("Drop table " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed"); + + logger.info("Datasets Report Added"); + + } + + private ArrayList listHdfsDir(String dir) throws Exception { + + FileSystem hdfs = FileSystem.get(new Configuration()); + RemoteIterator Files; + ArrayList fileNames = new ArrayList<>(); + + try { + Path exportPath = new Path(hdfs.getUri() + dir); + Files = hdfs.listFiles(exportPath, false); + while (Files.hasNext()) { + String fileName = Files.next().getPath().toString(); + fileNames.add(fileName); + } + + hdfs.close(); + } catch (Exception e) { + logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + dir)); + throw new Exception("HDFS file path with exported data does not exist : " + dir, e); + } + + return fileNames; + } + + private String readHDFSFile(String filename) throws Exception { + String result; + try { + + FileSystem fs = FileSystem.get(new Configuration()); + // log.info("reading file : " + filename); + + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); + + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + + while (line != null) { + sb.append(line); + // sb.append(line); + line = br.readLine(); + } + // uncompressedReport = sb.toString().replace("][{\"idSite\"", ",{\"idSite\""); + result = sb.toString().trim(); + // fs.close(); + } catch (Exception e) { + throw new Exception(e); + } + + return result; + } + + public static String uncompressString(String zippedBase64Str) + throws IOException { + String uncompressedReport = null; + + byte[] bytes = Base64.getDecoder().decode(zippedBase64Str); + GZIPInputStream zi = null; + try { + zi = new GZIPInputStream(new ByteArrayInputStream(bytes)); + uncompressedReport = IOUtils.toString(zi); + } finally { + IOUtils.closeQuietly(zi); + } + logger.info("Report Succesfully Uncompressed..."); + return uncompressedReport; + } + + private void createTmpReportsTable(String jsonFile) throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + dropTmpReportsTable(); + String createTmpTable = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".tmpjsonToTable (json STRING)"; + stmt.executeUpdate(createTmpTable); + logger.info("Temporary Table for Json Report Created"); + + String insertJsonReport = "LOAD DATA INPATH '" + jsonFile + "' INTO TABLE " + + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + stmt.execute(insertJsonReport); + logger.info("JSON Report File inserted to tmpjsonToTable Table"); + } + + private void dropTmpReportsTable() throws SQLException { + logger.info("Dropping tmpjson Table"); + String dropTmpTable = "DROP TABLE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsonToTable"; + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + stmt.executeUpdate(dropTmpTable); + logger.info("Dropped Table for Json Report Table"); + + } + + public void createUsageStatisticsTable() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Updating Datacite Reports table"); + String createDataciteReportsTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacitereports " + + "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports_tmp"; + stmt.executeUpdate(createDataciteReportsTable); + logger.info("Datacite Reports Table updated"); + + logger.info("Updating Datasets Performance table"); + String createDatasetPerformanceTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datasetsperformance " + + "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance_tmp"; + stmt.executeUpdate(createDatasetPerformanceTable); + logger.info("DatasetsPerformance Table updated"); + + logger.info("Creating Downloads Stats table"); + String createDownloadsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacite_downloads STORED AS PARQUET as " + + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire " + + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance " + + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform " + + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid " + + "where metric_type='total-dataset-requests' "; + stmt.executeUpdate(createDownloadsTable); + logger.info("Downloads Stats table created"); + + logger.info("Creating Views Stats table"); + String createViewsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + + ".datacite_views STORED AS PARQUET as " + + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire " + + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance " + + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform " + + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid " + + "where metric_type='total-dataset-investigations' "; + stmt.executeUpdate(createViewsTable); + logger.info("Views Stats table created"); + + logger.info("Building Permanent Datasets Usage Stats DB"); + + logger.info("Dropping view datacitereports on permanent datacite usagestats DB"); + String sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports"; + stmt.executeUpdate(sql); + logger.info("Dropped view datacitereports on permanent datacite usagestats DB"); + + logger.info("Create view datacitereports on permanent datacite usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports" + + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports"; + stmt.executeUpdate(sql); + logger.info("Created view datacitereports on permanent datasets usagestats DB"); + + logger.info("Dropping view datasetsperformance on permanent datacite usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance"; + stmt.executeUpdate(sql); + logger.info("Dropped view datasetsperformance on permanent datacite usagestats DB"); + + logger.info("Create view datasetsperformance on permanent datacite usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance" + + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance"; + stmt.executeUpdate(sql); + logger.info("Created view datasetsperformance on permanent datasets usagestats DB"); + + logger.info("Dropping view datacite_views on permanent datacite usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + logger.info("Dropped view datacite_views on permanent datacite usagestats DB"); + + logger.info("Create view datacite_views on permanent datacite usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views" + + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + logger.info("Created view datacite_views on permanent datasets usagestats DB"); + + logger.info("Dropping view datacite_downloads on permanent datacite usagestats DB"); + sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Dropped view datacite_downloads on permanent datacite usagestats DB"); + + logger.info("Create view datacite_downloads on permanent datacite usagestats DB"); + sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads" + + " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Created view datacite_downloads on permanent datasets usagestats DB"); + + stmt.close(); + ConnectDB.getHiveConnection().close(); + logger.info("Completed Building Permanent Datasets Usage Stats DB"); + } + +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java new file mode 100755 index 000000000..8d6e24333 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/java/eu/dnetlib/oa/graph/datasetsusagestats/export/UsageStatsExporter.java @@ -0,0 +1,117 @@ + +package eu.dnetlib.oa.graph.datasetsusagestats.export; + +import java.io.IOException; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main class for downloading and processing Usage statistics + * + * @author D. Pierrakos, S. Zoupanos + */ +public class UsageStatsExporter { + + private Statement stmt = null; + + public UsageStatsExporter() { + + } + + private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class); + + private void reCreateLogDirs() throws IllegalArgumentException, IOException { + FileSystem dfs = FileSystem.get(new Configuration()); + + logger.info("Deleting Log directory: " + ExecuteWorkflow.dataciteReportPath); + dfs.delete(new Path(ExecuteWorkflow.dataciteReportPath), true); + + logger.info("Creating Log directory: " + ExecuteWorkflow.dataciteReportPath); + dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath)); + + logger.info("Creating tmp directory: " + ExecuteWorkflow.dataciteReportPath + " " + "/tmpjson/"); + dfs.mkdirs(new Path(ExecuteWorkflow.dataciteReportPath + "/tmpjson/")); + + } + + public void export() throws Exception { + + logger.info("Initialising DB properties"); + ConnectDB.init(); + ConnectDB.getHiveConnection(); + + if (ExecuteWorkflow.recreateDbAndTables) { + DatasetsStatsDB datasetsDB = new DatasetsStatsDB("", ""); + datasetsDB.recreateDBAndTables(); + } + logger.info("Initializing the download logs module"); + DownloadReportsListFromDatacite downloadReportsListFromDatacite = new DownloadReportsListFromDatacite( + ExecuteWorkflow.dataciteBaseURL, + ExecuteWorkflow.dataciteReportPath); + + if (ExecuteWorkflow.datasetsEmptyDirs) { + logger.info("Downloading Reports List From Datacite"); + this.reCreateLogDirs(); + downloadReportsListFromDatacite.downloadReportsList(); + logger.info("Reports List has been downloaded"); + } + + ReadReportsListFromDatacite readReportsListFromDatacite = new ReadReportsListFromDatacite( + ExecuteWorkflow.dataciteReportPath); + logger.info("Store Reports To DB"); + readReportsListFromDatacite.readReports(); + logger.info("Reports Stored To DB"); + readReportsListFromDatacite.createUsageStatisticsTable(); + + // Make the tables available to Impala + if (ExecuteWorkflow.finalTablesVisibleToImpala) { + logger.info("Making tables visible to Impala"); + invalidateMetadata(); + } + + logger.info("End"); + } + + private void invalidateMetadata() throws SQLException { + Statement stmt = null; + + stmt = ConnectDB.getImpalaConnection().createStatement(); + + String sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports"; + stmt.executeUpdate(sql); + + sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance"; + stmt.executeUpdate(sql); + + stmt.close(); + try { + ConnectDB.getHiveConnection().close(); + } catch (Exception e) { + logger.info("Message at the end :" + e.getMessage()); + } + } +} diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json new file mode 100644 index 000000000..f67651627 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/export/datasets_usagestats_parameters.json @@ -0,0 +1,62 @@ +[ + { + "paramName": "dbu", + "paramLongName": "dataciteBaseURL", + "paramDescription": "URL of Datacite Reports Endpoint", + "paramRequired": true + }, + { + "paramName": "drp", + "paramLongName": "dataciteReportPath", + "paramDescription": "Path for Datacite Reports", + "paramRequired": true + }, + { + "paramName": "dbhu", + "paramLongName": "dbHiveUrl", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "dbiu", + "paramLongName": "dbImpalaUrl", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "dusdbs", + "paramLongName": "datasetUsageStatsDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "uspdbs", + "paramLongName": "datasetsUsageStatsPermanentDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "sdbs", + "paramLongName": "statsDBSchema", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + }, + { + "paramName": "rdbt", + "paramLongName": "recreateDbAndTables", + "paramDescription": "Re-create database and initial tables?", + "paramRequired": true + }, + { + "paramName": "pwed", + "paramLongName": "datasetsEmptyDirs", + "paramDescription": "Empty piwik directories?", + "paramRequired": true + }, + { + "paramName": "ftvi", + "paramLongName": "finalTablesVisibleToImpala", + "paramDescription": "Make the dataset_usage_stats, visible to Impala", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml new file mode 100644 index 000000000..b5c807378 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + ${jobTracker} + + + nameNode + ${nameNode} + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1 + + + impalaJdbcUrl + jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl; + + + oozie.wf.workflow.notification.url + {serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status + + + oozie.use.system.libpath + true + + diff --git a/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml new file mode 100644 index 000000000..22bf22c01 --- /dev/null +++ b/dhp-workflows/dhp-usage-datasets-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/datasetsusagestats/oozie_app/workflow.xml @@ -0,0 +1,72 @@ + + + + hiveMetastoreUris + Hive server metastore URIs + + + hiveJdbcUrl + Hive server jdbc url + + + impalaJdbcUrl + Impala server jdbc url + + + + + ${jobTracker} + ${nameNode} + + + hive.metastore.uris + ${hiveMetastoreUris} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + eu.dnetlib.oa.graph.datasetsusagestats.export.ExecuteWorkflow + --dataciteBaseURL + ${dataciteBaseURL} + --dataciteReportPath + ${dataciteReportPath} + --dbHiveUrl + ${hiveJdbcUrl} + --dbImpalaUrl + ${impalaJdbcUrl} + --datasetUsageStatsDBSchema + ${datasetUsageStatsDBSchema} + --datasetsUsageStatsPermanentDBSchema + ${datasetsUsageStatsPermanentDBSchema} + --statsDBSchema + ${statsDBSchema} + --recreateDbAndTables + ${recreateDbAndTables} + --datasetsEmptyDirs + ${datasetsEmptyDirs} + --finalTablesVisibleToImpala + ${finalTablesVisibleToImpala} + + + + + + + + diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java index e0e0d3687..d2884a4bb 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/ExecuteWorkflow.java @@ -65,6 +65,8 @@ public class ExecuteWorkflow { static int numberOfDownloadThreads; + static int b2SSHAREID; + public static void main(String args[]) throws Exception { // Sending the logs to the console @@ -196,6 +198,8 @@ public class ExecuteWorkflow { numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads")); + b2SSHAREID = Integer.parseInt(parser.get("b2shareID")); + UsageStatsExporter usagestatsExport = new UsageStatsExporter(); usagestatsExport.export(); // usagestatsExport.createdDBWithTablesOnly(); diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java index a84d6743f..76412cd54 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs.java @@ -191,7 +191,7 @@ public class PiwikDownloadLogs { ResultSet rs = statement .executeQuery( "SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema() - + ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id"); + + ".datasource where piwik_id is not null and piwik_id <> 0 and piwik_id <> 196 order by piwik_id"); // Getting all the piwikids in a list for logging reasons & limitting the list // to the max number of piwikids diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java new file mode 100644 index 000000000..9ec6fb72e --- /dev/null +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikDownloadLogs_B2SHARE.java @@ -0,0 +1,204 @@ + +package eu.dnetlib.oa.graph.usagerawdata.export; + +import java.io.*; +import java.net.URL; +import java.net.URLConnection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos + */ +public class PiwikDownloadLogs_B2SHARE { + + private final String piwikUrl; + private Date startDate; + private final String tokenAuth; + + /* + * The Piwik's API method + */ + private final String APImethod = "?module=API&method=Live.getLastVisitsDetails"; + private final String format = "&format=json"; + + private static final Logger logger = LoggerFactory.getLogger(PiwikDownloadLogs_B2SHARE.class); + + public PiwikDownloadLogs_B2SHARE(String piwikUrl, String tokenAuth) { + this.piwikUrl = piwikUrl; + this.tokenAuth = tokenAuth; + + } + + private String getPiwikLogUrl() { + return "https://" + piwikUrl + "/"; + } + + private String getJson(String url) throws Exception { + try { + logger.debug("Connecting to download the JSON: " + url); + URL website = new URL(url); + URLConnection connection = website.openConnection(); + + StringBuilder response; + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + response = new StringBuilder(); + String inputLine; + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + } + return response.toString(); + } catch (Exception e) { + logger.error("Failed to get URL: " + url + " Exception: " + e); + throw new Exception("Failed to get URL: " + url + " Exception: " + e.toString(), e); + } + } + + public void GetOpenAIREB2SHARELogs(String repoLogsPath) throws Exception { + + Statement statement = ConnectDB.getHiveConnection().createStatement(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + List piwikIdToVisit = new ArrayList(); + piwikIdToVisit.add(ExecuteWorkflow.b2SSHAREID); + logger.info("B2SHARE piwikId for download: " + piwikIdToVisit); + + if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 + && ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) { + logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload); + piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload); + } + + logger.info("Downloading for the followins piwikIds: " + piwikIdToVisit); + + // ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads); + for (int siteId : piwikIdToVisit) { + // Setting the starting period + Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone(); + logger.info("Starting period for log download: " + sdf.format(start.getTime())); + + // Setting the ending period (last day of the month) + // Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone(); + Calendar end = Calendar.getInstance(); + end.add(Calendar.DAY_OF_MONTH, -1); + // end.add(Calendar.MONTH, +1); +// end.add(Calendar.DAY_OF_MONTH, -1); + logger.info("Ending period for log download: " + sdf.format(end.getTime())); + + logger.info("Now working on piwikId: " + siteId); + + PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION + .prepareStatement( + "SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog WHERE source=?"); + st.setInt(1, siteId); + Date dateMax = null; + ResultSet rs_date = st.executeQuery(); + while (rs_date.next()) { + logger.info("Found max date: " + rs_date.getString(1) + " for repository " + siteId); + + if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") + && !rs_date.getString(1).equals("")) { + start.setTime(sdf.parse(rs_date.getString(1))); + dateMax = sdf.parse(rs_date.getString(1)); + } + } + rs_date.close(); + + for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { + // logger.info("Date used " + currDay.toString()); + // Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + // executor.execute(worker);// calling execute method of ExecutorService + logger.info("Date used " + currDay.getTime().toString()); + + if (dateMax != null && currDay.getTime().compareTo(dateMax) <= 0) { + logger.info("Date found in logs " + dateMax + " and not downloanding Matomo logs for " + siteId); + } else { + GetOpenAIRELogsB2SHAREForDate(currDay, siteId, repoLogsPath); + } + + } + } + // executor.shutdown(); + // while (!executor.isTerminated()) { + // } + // System.out.println("Finished all threads"); + } + + public void GetOpenAIRELogsB2SHAREForDate(Calendar currDay, int siteId, String repoLogsPath) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + Date date = currDay.getTime(); + logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); + + String period = "&period=day&date=" + sdf.format(date); + String outFolder = repoLogsPath; + + String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format + + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; + String content = ""; + + int i = 0; + + JSONParser parser = new JSONParser(); + StringBuffer totalContent = new StringBuffer(); + FileSystem fs = FileSystem.get(new Configuration()); + + do { + int writtenBytes = 0; + String apiUrl = baseApiUrl; + + if (i > 0) { + apiUrl += "&filter_offset=" + (i * 1000); + } + + content = getJson(apiUrl); + if (content.length() == 0 || content.equals("[]")) { + break; + } + + FSDataOutputStream fin = fs + .create( + new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"), + true); + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRaw = (JSONObject) aJsonArray; + byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes(); + fin.write(jsonObjectRawBytes); + fin.writeChar('\n'); + + writtenBytes += jsonObjectRawBytes.length + 1; + } + + fin.close(); + System.out + .println( + Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes + + " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i + + ".json"); + + i++; + } while (true); + + fs.close(); + } +} diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java index 9144620b7..00378ca1f 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB.java @@ -179,6 +179,10 @@ public class PiwikStatsDB { createPedocsOldUsageData(); logger.info("Pedocs Tables Created"); + logger.info("Create Datacite Tables"); + createDatasetsUsageData(); + logger.info("Datacite Tables Created"); + } catch (Exception e) { logger.error("Failed to process logs: " + e); throw new Exception("Failed to process logs: " + e.toString(), e); @@ -281,6 +285,7 @@ public class PiwikStatsDB { // clean view double clicks logger.info("Cleaning action double clicks"); + ConnectDB.getHiveConnection().setAutoCommit(false); sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " + "WHERE EXISTS (\n" + "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" @@ -750,6 +755,16 @@ public class PiwikStatsDB { stmt.executeUpdate(sql); logger.info("Dropped sarc_sushilogtmp_json_non_array"); + logger.info("Dropping piwiklogb2sharetmp"); + sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp"; + stmt.executeUpdate(sql); + logger.info("Dropped piwiklogb2sharetmp"); + + logger.info("Dropping piwiklog_b2share_tmp_json"); + sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog_b2share_tmp_json"; + stmt.executeUpdate(sql); + logger.info("Dropped piwiklog_b2share_tmp_json"); + stmt.close(); ConnectDB.getHiveConnection().close(); @@ -832,4 +847,32 @@ public class PiwikStatsDB { logger.info("PeDocs Old Downloads Table created"); } + + public void createDatasetsUsageData() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Dropping datacite_views"); + String sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + logger.info("Dropped datacite_views"); + + logger.info("Dropping datacite_downloads"); + sql = "DROP TABLE " + ConnectDB.getUsageStatsDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Dropped datacite_downloads"); + + logger.info("Creating Datasets Views Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_views as select * from openaire_prod_datacite_usage_stats.datacite_views"; + stmt.executeUpdate(sql); + logger.info("Datasets Views Table created"); + + logger.info("Creating Datasets Downloads Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_downloads as select * from openaire_prod_datacite_usage_stats.datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Datasets Downloads Table created"); + + } } diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java new file mode 100644 index 000000000..886079a23 --- /dev/null +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/PiwikStatsDB_B2SHARE.java @@ -0,0 +1,304 @@ + +package eu.dnetlib.oa.graph.usagerawdata.export; + +import java.io.*; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author D. Pierrakos, S. Zoupanos + */ +public class PiwikStatsDB_B2SHARE { + + private String logPath; + private String logRepoPath; + private String logPortalPath; + + private Statement stmt = null; + + private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB_B2SHARE.class); + + private String CounterRobotsURL; + private ArrayList robotsList; + + public PiwikStatsDB_B2SHARE(String logRepoPath, String logPortalPath) throws Exception { + this.logRepoPath = logRepoPath; + this.logPortalPath = logPortalPath; + + } + + public ArrayList getRobotsList() { + return robotsList; + } + + public void setRobotsList(ArrayList robotsList) { + this.robotsList = robotsList; + } + + public String getCounterRobotsURL() { + return CounterRobotsURL; + } + + public void setCounterRobotsURL(String CounterRobotsURL) { + this.CounterRobotsURL = CounterRobotsURL; + } + + public void processB2SHARELogs() throws Exception { + try { + + logger.info("Processing B2SHARE logs"); + processLog(); + logger.info("B2SHARE logs process done"); + + logger.info("Removing double clicks from B2SHARE logs"); + removeDoubleClicks(); + logger.info("Removing double clicks from B2SHARE logs done"); + + logger.info("Updating Production Tables"); + updateProdTables(); + logger.info("Updated Production Tables"); + + } catch (Exception e) { + logger.error("Failed to process logs: " + e); + throw new Exception("Failed to process logs: " + e.toString(), e); + } + } + + public void processLog() throws Exception { + + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Adding JSON Serde jar"); + stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar"); + logger.info("Added JSON Serde jar"); + + logger.info("Dropping piwiklog_b2share_tmp_json table"); + String drop_piwiklogtmp_json = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog_b2share_tmp_json"; + stmt.executeUpdate(drop_piwiklogtmp_json); + logger.info("Dropped piwiklog_b2share_tmp_json table"); + + logger.info("Creating piwiklog_b2share_tmp_json"); + String create_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklog_b2share_tmp_json(\n" + + " `idSite` STRING,\n" + + " `idVisit` STRING,\n" + + " `country` STRING,\n" + + " `referrerName` STRING,\n" + + " `browser` STRING,\n" + + " `actionDetails` ARRAY<\n" + + " struct<\n" + + " type: STRING,\n" + + " url: STRING,\n" + + " eventAction: STRING,\n" + + " eventName: STRING,\n" + + " timestamp: String\n" + + " >\n" + + " >\n" + + ")\n" + + "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" + + "LOCATION '" + ExecuteWorkflow.repoLogPath + "'\n" + + "TBLPROPERTIES (\"transactional\"=\"false\")"; + stmt.executeUpdate(create_piwiklogtmp_json); + logger.info("Created piwiklog_b2share_tmp_json"); + + logger.info("Dropping piwiklogtmp table"); + String drop_piwiklogtmp = "DROP TABLE IF EXISTS " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogtmp"; + stmt.executeUpdate(drop_piwiklogtmp); + logger.info("Dropped piwiklogtmp"); + + logger.info("Creating piwiklogb2sharetmp"); + String create_piwiklogtmp = "CREATE TABLE " + + ConnectDB.getUsageStatsDBSchema() + + ".piwiklogb2sharetmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " + + "entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " + + "clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true')"; + stmt.executeUpdate(create_piwiklogtmp); + logger.info("Created piwiklogb2sharetmp"); + + logger.info("Inserting into piwiklogb2sharetmp"); + String insert_piwiklogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp " + + "SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, " + + "actiondetail.eventAction as action, actiondetail.url as url, " + + "actiondetail.eventName as entity_id, " + + "'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " + + "referrerName as referrer_name, browser as agent\n" + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog_b2share_tmp_json\n" + + "LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail"; + stmt.executeUpdate(insert_piwiklogtmp); + logger.info("Inserted into piwiklogb2sharetmp"); + + stmt.close(); + } + + public void removeDoubleClicks() throws Exception { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Cleaning download double clicks"); + // clean download double clicks + String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp " + + "WHERE EXISTS (\n" + + "SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" + + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p1, " + + ConnectDB.getUsageStatsDBSchema() + ".piwiklogb2sharetmp p2\n" + + "WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n" + + "AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n" + + "AND p1.timestamp listHdfsDir(String dir) throws Exception { + + FileSystem hdfs = FileSystem.get(new Configuration()); + RemoteIterator Files; + ArrayList fileNames = new ArrayList<>(); + + try { + Path exportPath = new Path(hdfs.getUri() + dir); + Files = hdfs.listFiles(exportPath, false); + while (Files.hasNext()) { + String fileName = Files.next().getPath().toString(); + fileNames.add(fileName); + } + + hdfs.close(); + } catch (Exception e) { + logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath)); + throw new Exception("HDFS file path with exported data does not exist : " + logPath, e); + } + + return fileNames; + } + + private String readHDFSFile(String filename) throws Exception { + String result; + try { + + FileSystem fs = FileSystem.get(new Configuration()); + // log.info("reading file : " + filename); + + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); + + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + + while (line != null) { + if (!line.equals("[]")) { + sb.append(line); + } + // sb.append(line); + line = br.readLine(); + } + result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\""); + if (result.equals("")) { + result = "[]"; + } + + // fs.close(); + } catch (Exception e) { + logger.error(e.getMessage()); + throw new Exception(e); + } + + return result; + } + + private Connection getConnection() throws SQLException { + return ConnectDB.getHiveConnection(); + } + + public void createPedocsOldUsageData() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating PeDocs Old Views Table"); + String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".pedocsoldviews as select * from default.pedocsviews"; + stmt.executeUpdate(sql); + logger.info("PeDocs Old Views Table created"); + + logger.info("Creating PeDocs Old Downloads Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".pedocsolddownloads as select * from default.pedocsdownloads"; + stmt.executeUpdate(sql); + logger.info("PeDocs Old Downloads Table created"); + + } + + public void createDatasetsUsageData() throws SQLException { + Statement stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + logger.info("Creating Datasets Views Table"); + String sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_views as select * from datasetsusagestats_20210301.datacite_views"; + stmt.executeUpdate(sql); + logger.info("Datasets Views Table created"); + + logger.info("Creating Datasets Downloads Table"); + sql = "Create TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + + ".datacite_downloads as select * from datasetsusagestats_20210301.datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Datasets Downloads Table created"); + + } +} diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java index 07e15605f..2f10e4d2b 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/java/eu/dnetlib/oa/graph/usagerawdata/export/UsageStatsExporter.java @@ -142,8 +142,20 @@ public class UsageStatsExporter { sarcStats.updateSarcLogs(); } logger.info("Sarc done"); - // finalize usagestats + PiwikDownloadLogs_B2SHARE b2sharePiwikID = new PiwikDownloadLogs_B2SHARE(ExecuteWorkflow.matomoBaseURL, + ExecuteWorkflow.matomoAuthToken); + b2sharePiwikID.GetOpenAIREB2SHARELogs(ExecuteWorkflow.repoLogPath); + logger.info("B2SHARE done"); + + PiwikStatsDB_B2SHARE piwikstatsB2SHAREdb = new PiwikStatsDB_B2SHARE(ExecuteWorkflow.repoLogPath, + ExecuteWorkflow.portalLogPath); + piwikstatsB2SHAREdb.setCounterRobotsURL(cRobotsUrl); + + logger.info("Processing B2SHARE logs"); + piwikstatsB2SHAREdb.processB2SHARELogs(); + + // finalize usagestats logger.info("Dropping tmp tables"); if (ExecuteWorkflow.finalizeStats) { piwikstatsdb.finalizeStats(); @@ -161,6 +173,7 @@ public class UsageStatsExporter { piwikstatsdb.recreateDBAndTables(); piwikstatsdb.createPedocsOldUsageData(); + Statement stmt = ConnectDB.getHiveConnection().createStatement(); logger.info("Creating LaReferencia tables"); diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json index 1aa5ad6f8..8c733c55b 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json @@ -215,5 +215,11 @@ "paramLongName": "numberOfDownloadThreads", "paramDescription": "Number of download threads", "paramRequired": true + }, + { + "paramName": "b2shareID", + "paramLongName": "b2shareID", + "paramDescription": "B2SHARE Matomo ID", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml index 022a107ab..80e1da478 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-usage-raw-data-update/src/main/resources/eu/dnetlib/dhp/oa/graph/usagerawdata/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + hiveMetastoreUris @@ -78,6 +78,7 @@ --sarcNumberOfIssnToDownload${sarcNumberOfIssnToDownload} --finalizeStats${finalizeStats} --numberOfDownloadThreads${numberOfDownloadThreads} + --b2shareID${b2shareID} diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java index e53709f1a..ea07ed732 100644 --- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java +++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/ConnectDB.java @@ -82,7 +82,7 @@ public abstract class ConnectDB { Date today = Calendar.getInstance().getTime(); String todayAsString = df.format(today); - return ConnectDB.usageStatsDBSchema + "_" + todayAsString; + return ConnectDB.usageStatsDBSchema + todayAsString; } public static String getStatsDBSchema() { diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java index 253dc03b5..7c6f28023 100644 --- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java +++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/PiwikStatsDB.java @@ -35,20 +35,20 @@ public class PiwikStatsDB { private void createDatabase() throws Exception { -// try { -// -// stmt = ConnectDB.getHiveConnection().createStatement(); -// -// logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); -// String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE"; -// stmt.executeUpdate(dropDatabase); -// } catch (Exception e) { -// logger.error("Failed to drop database: " + e); -// throw new Exception("Failed to drop database: " + e.toString(), e); -// } -// try { + stmt = ConnectDB.getHiveConnection().createStatement(); + + logger.info("Dropping usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); + String dropDatabase = "DROP DATABASE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + " CASCADE"; + stmt.executeUpdate(dropDatabase); + } catch (Exception e) { + logger.error("Failed to drop database: " + e); + throw new Exception("Failed to drop database: " + e.toString(), e); + } + + try { + logger.info("Creating usagestats DB: " + ConnectDB.getUsageStatsDBSchema()); String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema(); stmt.executeUpdate(createDatabase); @@ -132,7 +132,7 @@ public class PiwikStatsDB { + "max(views) AS count, max(openaire_referrer) AS openaire " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " - + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' " + + "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + "GROUP BY d.id, ro.id, month " + "ORDER BY d.id, ro.id, month "; stmt.executeUpdate(create_views_stats); @@ -145,7 +145,7 @@ public class PiwikStatsDB { + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_views_monthly_tmp p, " + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + "WHERE p.source=" + ExecuteWorkflow.portalMatomoID - + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' " + + " AND p.source=d.piwik_id and p.id=ro.id AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + "GROUP BY d.id, ro.id, month " + "ORDER BY d.id, ro.id, month "; stmt.executeUpdate(create_pageviews_stats); @@ -194,7 +194,7 @@ public class PiwikStatsDB { + "max(downloads) AS count, max(openaire_referrer) AS openaire " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".openaire_result_downloads_monthly_tmp p, " + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " - + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' " + + "WHERE p.source=d.piwik_id and p.id=ro.oid AND ro.oid!='200' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' " + "GROUP BY d.id, ro.id, month " + "ORDER BY d.id, ro.id, month "; stmt.executeUpdate(sql); @@ -337,6 +337,96 @@ public class PiwikStatsDB { } + public void uploadB2SHAREStats() throws Exception { + stmt = ConnectDB.getHiveConnection().createStatement(); + ConnectDB.getHiveConnection().setAutoCommit(false); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view"); + String sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp"; + logger.info("Dropped b2share_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping b2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp"; + logger.info("Dropped b2share_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_views_stats_tmp table + logger.info("Dropping B2SHARE b2share_views_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp"; + logger.info("Dropped b2share_views_stats_tmp table "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_downloads_stats_tmp table + logger.info("Dropping B2SHARE b2share_downloads_stats_tmp table"); + sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp"; + logger.info("Dropped b2share_downloads_stats_tmp table "); + stmt.executeUpdate(sql); + + // Creating B2SHARE b2share_result_views_monthly_tmp view + logger.info("Creating B2SHARE b2share_result_views_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".piwiklog " + + "WHERE action='action' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created b2share_result_views_monthly_tmp view "); + + // Creating B2SHARE b2share_views_stats_tmp table + logger.info("Creating B2SHARE b2share_views_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp AS " + + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".b2share_result_views_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created B2SHARE b2share_views_stats_tmp table"); + + // Creating B2SHARE b2share_result_downloads_monthly_tmp view + logger.info("Creating B2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp " + + "AS SELECT entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, " + + "COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, " + + "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " + + "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".piwiklog " + + "WHERE action='download' and (source_item_type='oaItem' or source_item_type='repItem') and source=412 " + + "GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ORDER BY source, entity_id"; + stmt.executeUpdate(sql); + logger.info("Created b2share_result_downloads_monthly_tmp view "); + + // Creating B2SHARE b2share_downloads_stats_tmp table + logger.info("Creating B2SHARE b2share_downloads_stats_tmp table"); + sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp AS " + + "SELECT 'B2SHARE' as source, d.id as repository_id, ro.id as result_id, month as date, " + + "max(views) AS count, max(openaire_referrer) AS openaire FROM " + ConnectDB.getUsageStatsDBSchema() + + ".b2share_result_downloads_monthly_tmp p, " + + ConnectDB.getStatsDBSchema() + ".datasource d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " + + "WHERE p.id=ro.oid and d.id='re3data_____::ad3609c351bd520edf6f10f5e0d9b877' " + + "GROUP BY d.id, ro.id, month ORDER BY d.id, ro.id"; + stmt.executeUpdate(sql); + logger.info("Created B2SHARE b2share_downloads_stats_tmp table"); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_views_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_views_monthly_tmp"; + logger.info("Dropped b2share_result_views_monthly_tmp view "); + stmt.executeUpdate(sql); + + // Dropping B2SHARE b2share_result_views_monthly_tmp view + logger.info("Dropping B2SHARE b2share_result_downloads_monthly_tmp view"); + sql = "DROP view IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".b2share_result_downloads_monthly_tmp"; + logger.info("Dropped b2share_result_downloads_monthly_tmp view "); + stmt.executeUpdate(sql); + + } + public void finalizeStats() throws Exception { stmt = ConnectDB.getHiveConnection().createStatement(); ConnectDB.getHiveConnection().setAutoCommit(false); @@ -402,6 +492,13 @@ public class PiwikStatsDB { stmt.executeUpdate(sql); logger.info("LaReferencia views updated to views_stats"); + // Inserting B2SHARE views stats + logger.info("Inserting B2SHARE data to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_views_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("B2SHARE views updated to views_stats"); + logger.info("Creating downloads_stats table"); String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() @@ -425,12 +522,18 @@ public class PiwikStatsDB { logger.info("Inserted Pedocs data to downloads_stats"); // Inserting TUDELFT downloads stats - logger.info("Inserting TUDELFT old data to downloads_stats"); + logger.info("Inserting TUDELFT data to downloads_stats"); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".tudelft_downloads_stats_tmp"; stmt.executeUpdate(sql); logger.info("Inserted TUDELFT data to downloads_stats"); + // Inserting B2SHARE downloads stats + logger.info("Inserting B2SHARE data to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".b2share_downloads_stats_tmp"; + stmt.executeUpdate(sql); + logger.info("Inserted B2SHARE data to downloads_stats"); // Inserting Lareferencia downloads stats logger.info("Inserting LaReferencia data to downloads_stats"); sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " @@ -452,6 +555,20 @@ public class PiwikStatsDB { stmt.executeUpdate(sql); logger.info("SARC-OJS downloads updated to downloads_stats"); + // Inserting Datacite views stats + logger.info("Inserting Datacite views to views_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " + + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_views"; + stmt.executeUpdate(sql); + logger.info("Datacite views updated to views_stats"); + + // Inserting Datacite downloads stats + logger.info("Inserting Datacite downloads to downloads_stats"); + sql = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " + + "SELECT * FROM " + ConnectDB.getUsageRawDataDBSchema() + ".datacite_downloads"; + stmt.executeUpdate(sql); + logger.info("Datacite downloads updated to downloads_stats"); + logger.info("Creating pageviews_stats table"); String create_pageviews_stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats " diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java index 47986f52a..0df6c8b2d 100644 --- a/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-stats-build/src/main/java/eu/dnetlib/oa/graph/usagestatsbuild/export/UsageStatsExporter.java @@ -51,6 +51,9 @@ public class UsageStatsExporter { logger.info("Processing TUDELFT Stats"); piwikstatsdb.uploadTUDELFTStats(); logger.info("Processing TUDELFT Stats Done"); + logger.info("Processing B2SHARE Stats"); + piwikstatsdb.uploadB2SHAREStats(); + logger.info("Processing B2SHARE Stats Done"); } diff --git a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml index 71e8a50d6..45a6abf3d 100644 --- a/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-usage-stats-build/src/main/resources/eu/dnetlib/dhp/oa/graph/usagestatsbuild/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + hiveMetastoreUris diff --git a/nbactions.xml b/nbactions.xml new file mode 100644 index 000000000..4b6f7519d --- /dev/null +++ b/nbactions.xml @@ -0,0 +1,15 @@ + + + + test + + * + + + test + + + true + + +