Added Datasets from Datacite WF

This commit is contained in:
Dimitris 2021-02-17 09:46:38 +02:00 committed by dimitrispie
parent 4125b71661
commit 1e06815cdb
15 changed files with 335 additions and 263 deletions

View File

@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>
<artifactId>dhp-workflows</artifactId >
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<groupId>eu.dnetlib</groupId> -->
<!-- <parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-update</artifactId> -->
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-indicators</artifactId>
<build>
<plugins>
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
<version>2.1.15</version>
<executions>
<execution>
<goals>
<goal>revision</goal>
</goals>
</execution>
</executions>
<configuration>
<dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
<!-- more config here as you see fit -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
<type>jar</type>
</dependency>
</dependencies>
<name>dhp-indicators</name>
</project>

View File

@ -0,0 +1 @@
mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/indicators

View File

@ -0,0 +1,35 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.indicators.export;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.BasicConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author D. Pierrakos
*/
public class ExecuteWorkflow {
private static final Logger logger = LoggerFactory.getLogger(ExecuteWorkflow.class);
public static void main(String args[]) throws Exception {
// Sending the logs to the console
BasicConfigurator.configure();
logger.info("Workflow Executed");
}
}

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1</value>
</property>
<property>
<name>impalaJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,5 @@
#! /usr/bin/env python
import sys
print "this is a Python script"
print "Python Interpreter Version: " + sys.version

View File

@ -0,0 +1,58 @@
<workflow-app name="Python sample" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>Hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>Hive server jdbc url</description>
</property>
<property>
<name>impalaJdbcUrl</name>
<description>Impala server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="python-check"/>
<action name="python-check">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>testpython.py</exec>
<file>python/testpython.py</file>
<capture-output/>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Python action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

View File

@ -19,7 +19,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version> <version>1.1.7-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -96,7 +96,7 @@
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId> <artifactId>dhp-common</artifactId>
<version>1.2.4-SNAPSHOT</version> <version>1.1.7-SNAPSHOT</version>
<type>jar</type> <type>jar</type>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -9,10 +9,6 @@ package eu.dnetlib.oa.graph.datasetsusagestats.export;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -32,7 +28,6 @@ public abstract class ConnectDB {
private static String dbHiveUrl; private static String dbHiveUrl;
private static String dbImpalaUrl; private static String dbImpalaUrl;
private static String datasetUsageStatsDBSchema; private static String datasetUsageStatsDBSchema;
private static String datasetsUsageStatsPermanentDBSchema;
private static String statsDBSchema; private static String statsDBSchema;
private final static Logger logger = Logger.getLogger(ConnectDB.class); private final static Logger logger = Logger.getLogger(ConnectDB.class);
private Statement stmt = null; private Statement stmt = null;
@ -42,7 +37,6 @@ public abstract class ConnectDB {
dbHiveUrl = ExecuteWorkflow.dbHiveUrl; dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl; dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
datasetUsageStatsDBSchema = ExecuteWorkflow.datasetUsageStatsDBSchema; datasetUsageStatsDBSchema = ExecuteWorkflow.datasetUsageStatsDBSchema;
datasetsUsageStatsPermanentDBSchema = ExecuteWorkflow.datasetsUsageStatsPermanentDBSchema;
statsDBSchema = ExecuteWorkflow.statsDBSchema; statsDBSchema = ExecuteWorkflow.statsDBSchema;
Class.forName("org.apache.hive.jdbc.HiveDriver"); Class.forName("org.apache.hive.jdbc.HiveDriver");
@ -69,25 +63,14 @@ public abstract class ConnectDB {
} }
public static String getDataSetUsageStatsDBSchema() { public static String getDataSetUsageStatsDBSchema() {
String datePattern = "YYYYMMdd"; return ConnectDB.datasetUsageStatsDBSchema;
DateFormat df = new SimpleDateFormat(datePattern);
// Get the today date using Calendar object.
Date today = Calendar.getInstance().getTime();
String todayAsString = df.format(today);
return ConnectDB.datasetUsageStatsDBSchema + "_" + todayAsString;
} }
public static String getStatsDBSchema() { public static String getStatsDBSchema() {
return ConnectDB.statsDBSchema; return ConnectDB.statsDBSchema;
} }
public static String getDatasetsUsagestatsPermanentDBSchema() {
return ConnectDB.datasetsUsageStatsPermanentDBSchema;
}
private static Connection connectHive() throws SQLException { private static Connection connectHive() throws SQLException {
logger.info("trying to open Hive connection...");
ComboPooledDataSource cpds = new ComboPooledDataSource(); ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbHiveUrl); cpds.setJdbcUrl(dbHiveUrl);
@ -107,18 +90,14 @@ public abstract class ConnectDB {
cpds.setPreferredTestQuery("SELECT 1"); cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60); cpds.setIdleConnectionTestPeriod(60);
logger.info("Opened HIVE successfully"); logger.info("Opened database successfully");
return cpds.getConnection(); return cpds.getConnection();
// Connection connection = DriverManager.getConnection(dbHiveUrl);
// logger.debug("Opened Hive successfully");
//
// return connection;
} }
private static Connection connectImpala() throws SQLException { private static Connection connectImpala() throws SQLException {
logger.info("trying to open Impala connection...");
ComboPooledDataSource cpds = new ComboPooledDataSource(); ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbImpalaUrl); cpds.setJdbcUrl(dbImpalaUrl);
cpds.setUser("dimitris.pierrakos"); cpds.setUser("dimitris.pierrakos");
@ -137,12 +116,8 @@ public abstract class ConnectDB {
cpds.setPreferredTestQuery("SELECT 1"); cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60); cpds.setIdleConnectionTestPeriod(60);
logger.info("Opened Impala successfully"); logger.info("Opened database successfully");
return cpds.getConnection(); return cpds.getConnection();
// Connection connection = DriverManager.getConnection(dbHiveUrl);
// logger.debug("Opened Impala successfully");
//
// return connection;
} }
} }

View File

@ -1,6 +1,8 @@
package eu.dnetlib.oa.graph.datasetsusagestats.export; package eu.dnetlib.oa.graph.datasetsusagestats.export;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -45,7 +47,7 @@ public class DatasetsStatsDB {
try { try {
stmt = ConnectDB.getHiveConnection().createStatement(); stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating datacite usagestats DB: " + ConnectDB.getDataSetUsageStatsDBSchema()); logger.info("Creating usagestats DB: " + ConnectDB.getDataSetUsageStatsDBSchema());
String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema(); String createDatabase = "CREATE DATABASE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema();
stmt.executeUpdate(createDatabase); stmt.executeUpdate(createDatabase);
@ -53,23 +55,6 @@ public class DatasetsStatsDB {
logger.error("Failed to create database: " + e); logger.error("Failed to create database: " + e);
throw new Exception("Failed to create database: " + e.toString(), e); throw new Exception("Failed to create database: " + e.toString(), e);
} }
try {
stmt = ConnectDB.getHiveConnection().createStatement();
logger
.info(
"Creating permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema());
String createPermanentDatabase = "CREATE DATABASE IF NOT EXISTS "
+ ConnectDB.getDatasetsUsagestatsPermanentDBSchema();
stmt.executeUpdate(createPermanentDatabase);
logger
.info(
"Created permanent datasets usagestats DB: " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema());
} catch (Exception e) {
logger.error("Failed to create database: " + e);
throw new Exception("Failed to create database: " + e.toString(), e);
}
} }
private void createTables() throws Exception { private void createTables() throws Exception {
@ -77,10 +62,10 @@ public class DatasetsStatsDB {
stmt = ConnectDB.getHiveConnection().createStatement(); stmt = ConnectDB.getHiveConnection().createStatement();
// Create Reports table - This table should exist // Create Reports table - This table should exist
logger.info("Creating Reports Tmp Table"); logger.info("Creating Reports Table");
String sqlCreateTableDataciteReports = "CREATE TABLE IF NOT EXISTS " String sqlCreateTableDataciteReports = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getDataSetUsageStatsDBSchema() + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datacitereports_tmp(reportid STRING, \n" + ".datacitereports(reportid STRING, \n"
+ " name STRING, \n" + " name STRING, \n"
+ " source STRING,\n" + " source STRING,\n"
+ " release STRING,\n" + " release STRING,\n"
@ -94,10 +79,10 @@ public class DatasetsStatsDB {
logger.info("Reports Table Created"); logger.info("Reports Table Created");
// Create Datasets Performance Table // Create Datasets Performance Table
logger.info("Creating DataSetsPerformance Tmp Table"); logger.info("Creating DataSetsPerformance Table");
String sqlCreateTableDataSetsPerformance = "CREATE TABLE IF NOT EXISTS " String sqlCreateTableDataSetsPerformance = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getDataSetUsageStatsDBSchema() + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datasetsperformance_tmp(ds_type STRING,\n" + ".datasetsperformance(ds_type STRING,\n"
+ " ds_title STRING,\n" + " ds_title STRING,\n"
+ " yop STRING,\n" + " yop STRING,\n"
+ " dataset_type STRING, \n" + " dataset_type STRING, \n"
@ -115,22 +100,7 @@ public class DatasetsStatsDB {
+ " CLUSTERED BY (ds_type)\n" + " CLUSTERED BY (ds_type)\n"
+ " into 100 buckets stored as orc tblproperties('transactional'='true')"; + " into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableDataSetsPerformance); stmt.executeUpdate(sqlCreateTableDataSetsPerformance);
logger.info("DataSetsPerformance Tmp Table Created"); logger.info("DataSetsPerformance Table Created");
logger.info("Creating Datacite Reports table");
String createDataciteReportsTable = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datacitereports LIKE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datacitereports_tmp STORED AS PARQUET";
stmt.executeUpdate(createDataciteReportsTable);
logger.info("Datacite Reports Table created");
logger.info("Creating Datasets Performance table");
String createDatasetPerformanceTable = "CREATE TABLE IF NOT EXISTS "
+ ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datasetsperformance LIKE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datasetsperformance_tmp STORED AS PARQUET";
stmt.executeUpdate(createDatasetPerformanceTable);
logger.info("DatasetsPerformance Table created");
stmt.close(); stmt.close();
ConnectDB.getHiveConnection().close(); ConnectDB.getHiveConnection().close();

View File

@ -21,7 +21,6 @@ public class ExecuteWorkflow {
static String dbHiveUrl; static String dbHiveUrl;
static String dbImpalaUrl; static String dbImpalaUrl;
static String datasetUsageStatsDBSchema; static String datasetUsageStatsDBSchema;
static String datasetsUsageStatsPermanentDBSchema;
static String statsDBSchema; static String statsDBSchema;
static boolean recreateDbAndTables; static boolean recreateDbAndTables;
static boolean datasetsEmptyDirs; static boolean datasetsEmptyDirs;
@ -46,7 +45,6 @@ public class ExecuteWorkflow {
dbHiveUrl = parser.get("dbHiveUrl"); dbHiveUrl = parser.get("dbHiveUrl");
dbImpalaUrl = parser.get("dbImpalaUrl"); dbImpalaUrl = parser.get("dbImpalaUrl");
datasetUsageStatsDBSchema = parser.get("datasetUsageStatsDBSchema"); datasetUsageStatsDBSchema = parser.get("datasetUsageStatsDBSchema");
datasetsUsageStatsPermanentDBSchema = parser.get("datasetsUsageStatsPermanentDBSchema");
statsDBSchema = parser.get("statsDBSchema"); statsDBSchema = parser.get("statsDBSchema");
if (parser.get("recreateDbAndTables").toLowerCase().equals("true")) if (parser.get("recreateDbAndTables").toLowerCase().equals("true"))
@ -59,11 +57,11 @@ public class ExecuteWorkflow {
else else
datasetsEmptyDirs = false; datasetsEmptyDirs = false;
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true")) // if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
finalTablesVisibleToImpala = true; // finalTablesVisibleToImpala = true;
else // else
finalTablesVisibleToImpala = false; // finalTablesVisibleToImpala = false;
//
UsageStatsExporter usagestatsExport = new UsageStatsExporter(); UsageStatsExporter usagestatsExport = new UsageStatsExporter();
usagestatsExport.export(); usagestatsExport.export();
} }

View File

@ -65,7 +65,7 @@ public class ReadReportsListFromDatacite {
logger.info("Checking report with id " + reportID); logger.info("Checking report with id " + reportID);
String sqlCheckIfReportExists = "SELECT source FROM " + ConnectDB.getDataSetUsageStatsDBSchema() String sqlCheckIfReportExists = "SELECT source FROM " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datacitereports_tmp where reportid=?"; + ".datacitereports where reportid=?";
PreparedStatement stGetReportID = ConnectDB.getHiveConnection().prepareStatement(sqlCheckIfReportExists); PreparedStatement stGetReportID = ConnectDB.getHiveConnection().prepareStatement(sqlCheckIfReportExists);
stGetReportID.setString(1, reportID); stGetReportID.setString(1, reportID);
@ -76,7 +76,7 @@ public class ReadReportsListFromDatacite {
dropTmpReportsTable(); dropTmpReportsTable();
} else { } else {
String sqlInsertReport = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() String sqlInsertReport = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ " .datacitereports_tmp " + " .datacitereports "
+ "SELECT\n" + "SELECT\n"
+ " get_json_object(json, '$.report.id') AS reportid,\n" + " get_json_object(json, '$.report.id') AS reportid,\n"
+ " get_json_object(json, '$.report.report-header.report-name') AS name,\n" + " get_json_object(json, '$.report.report-header.report-name') AS name,\n"
@ -105,7 +105,7 @@ public class ReadReportsListFromDatacite {
ResultSet rstmpReportAll = stmt.getResultSet(); ResultSet rstmpReportAll = stmt.getResultSet();
if (rstmpReportAll.next()) { if (rstmpReportAll.next()) {
String listDatasets = rstmpReportAll.getString(1); String listDatasets = rstmpReportAll.getString(1);
logger.info("Adding uncompressed performance for " + reportID); logger.info("No compressed performance found");
this.readDatasetsReport(listDatasets, reportID); this.readDatasetsReport(listDatasets, reportID);
} }
@ -125,9 +125,6 @@ public class ReadReportsListFromDatacite {
} }
public void readDatasetsReport(String prettyDatasetsReports, String reportId) throws Exception { public void readDatasetsReport(String prettyDatasetsReports, String reportId) throws Exception {
logger.info("Reading Datasets performance for report " + reportId);
logger.info("Write Performance Report To File");
ConnectDB.getHiveConnection().setAutoCommit(false);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readValue(prettyDatasetsReports, JsonNode.class); JsonNode jsonNode = objectMapper.readValue(prettyDatasetsReports, JsonNode.class);
String datasetsReports = jsonNode.toString(); String datasetsReports = jsonNode.toString();
@ -154,7 +151,8 @@ public class ReadReportsListFromDatacite {
fin.writeChar('\n'); fin.writeChar('\n');
fin.close(); fin.close();
logger.info("Reading Performance Report From File..."); logger.info("Write Compress Report To File");
logger.info("Reading Compress Report From File...");
String sqlCreateTempTableForDatasets = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() String sqlCreateTempTableForDatasets = "CREATE TEMPORARY TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".tmpjsoncompressesed (report_datasets array<struct<dataset_id:array<struct<value:string>>,dataset_title:string, data_type:string, " + ".tmpjsoncompressesed (report_datasets array<struct<dataset_id:array<struct<value:string>>,dataset_title:string, data_type:string, "
@ -177,7 +175,7 @@ public class ReadReportsListFromDatacite {
stmt.execute(sqlCreateTempTableForDatasets); stmt.execute(sqlCreateTempTableForDatasets);
String sqlInsertToDatasetsPerformance = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() String sqlInsertToDatasetsPerformance = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datasetsperformance_tmp SELECT dataset.dataset_id[0].value ds_type, " + ".datasetsperformance SELECT dataset.dataset_id[0].value ds_type, "
+ " dataset.dataset_title ds_title, " + " dataset.dataset_title ds_title, "
+ " dataset.yop yop, " + " dataset.yop yop, "
+ " dataset.data_type dataset_type, " + " dataset.data_type dataset_type, "
@ -198,7 +196,7 @@ public class ReadReportsListFromDatacite {
stmt.executeUpdate(sqlInsertToDatasetsPerformance); stmt.executeUpdate(sqlInsertToDatasetsPerformance);
logger.info("Datasets Performance Inserted for Report " + reportId); logger.info("Datasets Performance Inserted ");
stmt.execute("Drop table " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed"); stmt.execute("Drop table " + ConnectDB.getDataSetUsageStatsDBSchema() + ".tmpjsoncompressesed");
@ -296,93 +294,32 @@ public class ReadReportsListFromDatacite {
} }
public void createUsageStatisticsTable() throws SQLException { public void createUsageStatisticsTable() throws SQLException {
logger.info("Dropping Downloads Stats table");
Statement stmt = ConnectDB.getHiveConnection().createStatement(); Statement stmt = ConnectDB.getHiveConnection().createStatement();
String dropDownloadsTable = "DROP TABLE IF EXISTS " + ConnectDB.getDataSetUsageStatsDBSchema()
logger.info("Updating Datacite Reports table"); + ".datacite_downloads";
String createDataciteReportsTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema() stmt.executeUpdate(dropDownloadsTable);
+ ".datacitereports "
+ "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports_tmp";
stmt.executeUpdate(createDataciteReportsTable);
logger.info("Datacite Reports Table updated");
logger.info("Updating Datasets Performance table");
String createDatasetPerformanceTable = "INSERT INTO " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datasetsperformance "
+ "SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance_tmp";
stmt.executeUpdate(createDatasetPerformanceTable);
logger.info("DatasetsPerformance Table updated");
logger.info("Creating Downloads Stats table"); logger.info("Creating Downloads Stats table");
String createDownloadsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() String createDownloadsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema()
+ ".datacite_downloads STORED AS PARQUET as " + ".datacite_downloads as "
+ "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire " + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance " + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform " + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid " + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ "where metric_type='total-dataset-requests' "; + "where metric_type='total-dataset-requests'";
stmt.executeUpdate(createDownloadsTable); stmt.executeUpdate(createDownloadsTable);
logger.info("Downloads Stats table created"); logger.info("Downloads Stats table created");
logger.info("Creating Views Stats table"); logger.info("Creating Views Stats table");
String createViewsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() String createViewsTable = "CREATE TABLE " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views as "
+ ".datacite_views STORED AS PARQUET as "
+ "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire " + "SELECT 'Datacite' source, d.id repository_id, od.id result_id, regexp_replace(substring(string(period_end),0,7),'-','/') date, count, '0' openaire "
+ "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance " + "FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance "
+ "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform " + "JOIN " + ConnectDB.getStatsDBSchema() + ".datasource d on name=platform "
+ "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid " + "JOIN " + ConnectDB.getStatsDBSchema() + ".result_oids od on string(ds_type)=od.oid "
+ "where metric_type='total-dataset-investigations' "; + "where metric_type='total-dataset-investigations'";
stmt.executeUpdate(createViewsTable); stmt.executeUpdate(createViewsTable);
logger.info("Views Stats table created"); logger.info("Views Stats table created");
logger.info("Building Permanent Datasets Usage Stats DB");
logger.info("Dropping view datacitereports on permanent datacite usagestats DB");
String sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports";
stmt.executeUpdate(sql);
logger.info("Dropped view datacitereports on permanent datacite usagestats DB");
logger.info("Create view datacitereports on permanent datacite usagestats DB");
sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports"
+ " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports";
stmt.executeUpdate(sql);
logger.info("Created view datacitereports on permanent datasets usagestats DB");
logger.info("Dropping view datasetsperformance on permanent datacite usagestats DB");
sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance";
stmt.executeUpdate(sql);
logger.info("Dropped view datasetsperformance on permanent datacite usagestats DB");
logger.info("Create view datasetsperformance on permanent datacite usagestats DB");
sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance"
+ " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance";
stmt.executeUpdate(sql);
logger.info("Created view datasetsperformance on permanent datasets usagestats DB");
logger.info("Dropping view datacite_views on permanent datacite usagestats DB");
sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views";
stmt.executeUpdate(sql);
logger.info("Dropped view datacite_views on permanent datacite usagestats DB");
logger.info("Create view datacite_views on permanent datacite usagestats DB");
sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views"
+ " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views";
stmt.executeUpdate(sql);
logger.info("Created view datacite_views on permanent datasets usagestats DB");
logger.info("Dropping view datacite_downloads on permanent datacite usagestats DB");
sql = "DROP VIEW IF EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads";
stmt.executeUpdate(sql);
logger.info("Dropped view datacite_downloads on permanent datacite usagestats DB");
logger.info("Create view datacite_downloads on permanent datacite usagestats DB");
sql = "CREATE VIEW IF NOT EXISTS " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads"
+ " AS SELECT * FROM " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads";
stmt.executeUpdate(sql);
logger.info("Created view datacite_downloads on permanent datasets usagestats DB");
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Completed Building Permanent Datasets Usage Stats DB");
} }
} }

View File

@ -2,7 +2,6 @@
package eu.dnetlib.oa.graph.datasetsusagestats.export; package eu.dnetlib.oa.graph.datasetsusagestats.export;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -68,50 +67,5 @@ public class UsageStatsExporter {
readReportsListFromDatacite.readReports(); readReportsListFromDatacite.readReports();
logger.info("Reports Stored To DB"); logger.info("Reports Stored To DB");
readReportsListFromDatacite.createUsageStatisticsTable(); readReportsListFromDatacite.createUsageStatisticsTable();
// Make the tables available to Impala
if (ExecuteWorkflow.finalTablesVisibleToImpala) {
logger.info("Making tables visible to Impala");
invalidateMetadata();
}
logger.info("End");
}
private void invalidateMetadata() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getImpalaConnection().createStatement();
String sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_downloads";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacite_views";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datacitereports";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDataSetUsageStatsDBSchema() + ".datasetsperformance";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_downloads";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacite_views";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datacitereports";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getDatasetsUsagestatsPermanentDBSchema() + ".datasetsperformance";
stmt.executeUpdate(sql);
stmt.close();
try {
ConnectDB.getHiveConnection().close();
} catch (Exception e) {
logger.info("Message at the end :" + e.getMessage());
}
} }
} }

View File

@ -29,12 +29,6 @@
"paramDescription": "activate tranform-only mode. Only apply transformation step", "paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "uspdbs",
"paramLongName": "datasetsUsageStatsPermanentDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{ {
"paramName": "sdbs", "paramName": "sdbs",
"paramLongName": "statsDBSchema", "paramLongName": "statsDBSchema",

View File

@ -1,4 +1,4 @@
<workflow-app name="Datacite Datasets Usage Stats" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Usage Datasets Stats" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>hiveMetastoreUris</name> <name>hiveMetastoreUris</name>
@ -52,8 +52,6 @@
<arg>${impalaJdbcUrl}</arg> <arg>${impalaJdbcUrl}</arg>
<arg>--datasetUsageStatsDBSchema</arg> <arg>--datasetUsageStatsDBSchema</arg>
<arg>${datasetUsageStatsDBSchema}</arg> <arg>${datasetUsageStatsDBSchema}</arg>
<arg>--datasetsUsageStatsPermanentDBSchema</arg>
<arg>${datasetsUsageStatsPermanentDBSchema}</arg>
<arg>--statsDBSchema</arg> <arg>--statsDBSchema</arg>
<arg>${statsDBSchema}</arg> <arg>${statsDBSchema}</arg>
<arg>--recreateDbAndTables</arg> <arg>--recreateDbAndTables</arg>