Workflow usage-stats-update created

This commit is contained in:
dimitrispie 2023-01-13 15:21:04 +02:00
parent 733abdffe3
commit 566e2459a8
12 changed files with 2351 additions and 0 deletions

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-build</artifactId>
<build>
<plugins>
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
<version>2.1.15</version>
<executions>
<execution>
<goals>
<goal>revision</goal>
</goals>
</execution>
</executions>
<configuration>
<dotGitDirectory>${project.basedir}/../.git</dotGitDirectory>
<!-- more config here as you see fit -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
<type>jar</type>
</dependency>
</dependencies>
<name>dhp-usage-stats-build</name>
</project>

View File

@ -0,0 +1 @@
mvn clean package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/usagestatsbuild

View File

@ -0,0 +1,134 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* @author D. Pierrakos, S. Zoupanos
*/
import com.mchange.v2.c3p0.ComboPooledDataSource;
public abstract class ConnectDB {
public static Connection DB_HIVE_CONNECTION;
public static Connection DB_IMPALA_CONNECTION;
private static String dbHiveUrl;
private static String dbImpalaUrl;
private static String usageRawDataDBSchema;
private static String usageStatsDBSchema;
private static String usagestatsPermanentDBSchema;
private static String statsDBSchema;
private ConnectDB() {
}
static void init() throws ClassNotFoundException {
dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
usageStatsDBSchema = ExecuteWorkflow.usageStatsDBSchema;
statsDBSchema = ExecuteWorkflow.statsDBSchema;
usageRawDataDBSchema = ExecuteWorkflow.usageRawDataDBSchema;
usagestatsPermanentDBSchema = ExecuteWorkflow.usagestatsPermanentDBSchema;
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
public static Connection getHiveConnection() throws SQLException {
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
return DB_HIVE_CONNECTION;
} else {
DB_HIVE_CONNECTION = connectHive();
return DB_HIVE_CONNECTION;
}
}
public static Connection getImpalaConnection() throws SQLException {
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
return DB_IMPALA_CONNECTION;
} else {
DB_IMPALA_CONNECTION = connectImpala();
return DB_IMPALA_CONNECTION;
}
}
public static String getUsageRawDataDBSchema() {
return ConnectDB.usageRawDataDBSchema;
}
public static String getUsageStatsDBSchema() {
// String datePattern = "YYYYMMdd";
// DateFormat df = new SimpleDateFormat(datePattern);
//// Get the today date using Calendar object.
// Date today = Calendar.getInstance().getTime();
// String todayAsString = df.format(today);
// return ConnectDB.usageStatsDBSchema + todayAsString;
return ConnectDB.usageStatsDBSchema;
}
public static String getStatsDBSchema() {
return ConnectDB.statsDBSchema;
}
public static String getUsagestatsPermanentDBSchema() {
return ConnectDB.usagestatsPermanentDBSchema;
}
private static Connection connectHive() throws SQLException {
ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbHiveUrl);
cpds.setAcquireIncrement(1);
cpds.setMaxPoolSize(100);
cpds.setMinPoolSize(1);
cpds.setInitialPoolSize(1);
cpds.setMaxIdleTime(300);
cpds.setMaxConnectionAge(36000);
cpds.setAcquireRetryAttempts(30);
cpds.setAcquireRetryDelay(2000);
cpds.setBreakAfterAcquireFailure(false);
cpds.setCheckoutTimeout(0);
cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60);
return cpds.getConnection();
}
private static Connection connectImpala() throws SQLException {
ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbImpalaUrl);
cpds.setAcquireIncrement(1);
cpds.setMaxPoolSize(100);
cpds.setMinPoolSize(1);
cpds.setInitialPoolSize(1);
cpds.setMaxIdleTime(300);
cpds.setMaxConnectionAge(36000);
cpds.setAcquireRetryAttempts(30);
cpds.setAcquireRetryDelay(2000);
cpds.setBreakAfterAcquireFailure(false);
cpds.setCheckoutTimeout(0);
cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60);
return cpds.getConnection();
}
}

View File

@ -0,0 +1,140 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.BasicConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class ExecuteWorkflow {
// static String matomoAuthToken;
static String portalMatomoID;
// static String irusUKBaseURL;
// static String lareferenciaBaseURL;
// static String lareferenciaAuthToken;
static String dbHiveUrl;
static String dbImpalaUrl;
static String usageRawDataDBSchema;
static String usageStatsDBSchema;
static String usagestatsPermanentDBSchema;
static String statsDBSchema;
static boolean recreateDbAndTables;
static boolean processPiwikLogs;
static boolean processLaReferenciaLogs;
static boolean irusProcessStats;
static boolean sarcProcessStats;
static boolean finalizeStats;
static boolean finalTablesVisibleToImpala;
static int numberOfDownloadThreads;
private static final Logger logger = LoggerFactory.getLogger(PiwikStatsDB.class);
public static void main(String args[]) throws Exception {
// Sending the logs to the console
BasicConfigurator.configure();
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
UsageStatsExporter.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/usagestatsbuild/export/usagestatsbuild_parameters.json")));
parser.parseArgument(args);
// Setting up the initial parameters
// matomoAuthToken = parser.get("matomoAuthToken");
// matomoBaseURL = parser.get("matomoBaseURL");
portalMatomoID = parser.get("portalMatomoID");
// irusUKBaseURL = parser.get("irusUKBaseURL");
// lareferenciaBaseURL = parser.get("lareferenciaBaseURL");
// lareferenciaAuthToken = parser.get("lareferenciaAuthToken");
dbHiveUrl = parser.get("dbHiveUrl");
dbImpalaUrl = parser.get("dbImpalaUrl");
usageRawDataDBSchema = parser.get("usageRawDataDBSchema");
usageStatsDBSchema = parser.get("usageStatsDBSchema");
usagestatsPermanentDBSchema = parser.get("usagestatsPermanentDBSchema");
statsDBSchema = parser.get("statsDBSchema");
if (parser.get("processPiwikLogs").toLowerCase().equals("true")) {
processPiwikLogs = true;
} else {
processPiwikLogs = false;
}
// String startingLogPeriodStr = parser.get("startingLogPeriod");
// Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr);
// startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate);
//
// String endingLogPeriodStr = parser.get("endingLogPeriod");
// Date endingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(endingLogPeriodStr);
// endingLogPeriod = startingLogPeriodStr(endingLogPeriodDate);
if (parser.get("recreateDbAndTables").toLowerCase().equals("true")) {
recreateDbAndTables = true;
} else {
recreateDbAndTables = false;
}
if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true")) {
processLaReferenciaLogs = true;
} else {
processLaReferenciaLogs = false;
}
if (parser.get("irusProcessStats").toLowerCase().equals("true")) {
irusProcessStats = true;
} else {
irusProcessStats = false;
}
if (parser.get("sarcProcessStats").toLowerCase().equals("true")) {
sarcProcessStats = true;
} else {
sarcProcessStats = false;
}
if (parser.get("finalizeStats").toLowerCase().equals("true")) {
finalizeStats = true;
} else {
finalizeStats = false;
}
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true")) {
finalTablesVisibleToImpala = true;
} else {
numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
}
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
usagestatsExport.export();
}
private static Calendar startingLogPeriodStr(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
return calendar;
}
}

View File

@ -0,0 +1,95 @@
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class IrusStats {
private String irusUKURL;
private static final Logger logger = LoggerFactory.getLogger(IrusStats.class);
public IrusStats() throws Exception {
}
public void processIrusStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating irus_downloads_stats_tmp table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".irus_downloads_stats_tmp "
+ "(`source` string, "
+ "`repository_id` string, "
+ "`result_id` string, "
+ "`date` string, "
+ "`count` bigint, "
+ "`openaire` bigint)";
stmt.executeUpdate(createDownloadsStats);
logger.info("Created irus_downloads_stats_tmp table");
logger.info("Inserting into irus_downloads_stats_tmp");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_downloads_stats_tmp "
+ "SELECT s.source, d.id AS repository_id, "
+ "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, "
+ ConnectDB.getStatsDBSchema() + ".datasource_oids d, "
+ ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'";
stmt.executeUpdate(insertDStats);
logger.info("Inserted into irus_downloads_stats_tmp");
String createR5Stats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".irus_R5_stats_tmp "
+ "(`source` string, "
+ "`repository_id` string, "
+ "`result_id` string, "
+ "`date` string, "
+ "`views` bigint, "
+ "`downloads` bigint, "
+ "`openaire` bigint)";
stmt.executeUpdate(createR5Stats);
logger.info("Created irus_R5_stats_tmp table");
logger.info("Inserting into irus_R5_stats_tmp");
String insertΡ5Stats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_R5_stats_tmp "
+ "SELECT s.source, d.id AS repository_id, "
+ "ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, "
+ "(s.total_item_investigations-s.total_item_requests) as views, s.total_item_requests as downloads, '0' "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog_cop_r5 s, "
+ ConnectDB.getStatsDBSchema() + ".datasource_oids d, "
+ ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE s.repository=d.oid AND s.rid=ro.oid AND s.source='IRUS-UK'";
stmt.executeUpdate(insertΡ5Stats);
logger.info("Inserted into irus_R5_stats_tmp");
stmt.close();
// ConnectDB.getHiveConnection().close();
}
//// to add create table sushilog_cop_r5 as select * from openaire_prod_usage_raw.sushilog_cop_r5
//// to add create table sushilog_cop_r5 as select * from openaire_prod_usage_raw.sushilog_cop_r5
}

View File

@ -0,0 +1,321 @@
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.io.*;
import java.net.URLDecoder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class LaReferenciaStats {
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class);
private String logRepoPath;
private Statement stmt = null;
private String CounterRobotsURL;
private ArrayList robotsList;
public LaReferenciaStats() throws Exception {
}
public void processLogs() throws Exception {
try {
logger.info("LaReferencia creating viewsStats");
viewsStats();
logger.info("LaReferencia created viewsStats");
logger.info("LaReferencia creating downloadsStats");
downloadsStats();
logger.info("LaReferencia created downloadsStats");
logger.info("LaReferencia creating COUNTER CoP R5 metrics");
createCoPR5TablesForLareferencia();
logger.info("LaReferencia created COUNTER CoP R5 metrics");
// logger.info("LaReferencia updating Production Tables");
// updateProdTables();
// logger.info("LaReferencia updated Production Tables");
} catch (Exception e) {
logger.error("Failed to process logs: " + e);
throw new Exception("Failed to process logs: " + e.toString(), e);
}
}
public void viewsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating la_result_views_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
+
"SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='action' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_views_monthly_tmp view");
logger.info("Dropping la_views_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_views_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_views_stats_tmp table");
logger.info("Creating la_views_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(views) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_views_stats_tmp table");
stmt.close();
// ConnectDB.getHiveConnection().close();
}
private void downloadsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating la_result_downloads_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ ".la_result_downloads_monthly_tmp AS " +
"SELECT entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog where action='download' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_downloads_monthly_tmp view");
logger.info("Dropping la_downloads_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_downloads_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_downloads_stats_tmp table");
logger.info("Creating la_downloads_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(downloads) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_downloads_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_downloads_stats_tmp table");
stmt.close();
// ConnectDB.getHiveConnection().close();
}
private void createCoPR5TablesForLareferencia() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
// Unique Item Investigations
logger.info("Create View Unique_Item_Investigations");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ ".lr_view_unique_item_investigations "
+ "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_investigations, "
+ "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog "
+ "WHERE (source_item_type='oaItem' or source_item_type='repItem') "
+ "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ";
stmt.executeUpdate(sql);
logger.info("Created View Unique_Item_Investigations");
logger.info("Drop Table Unique_Item_Investigations");
sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations ";
stmt.executeUpdate(sql);
logger.info("Dropped Table Unique_Item_Investigations");
logger.info("Create Table tbl_unique_item_investigations");
sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations as "
+ "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ "sum(unique_item_investigations) AS unique_item_investigations, sum(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_investigations p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' "
+ "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
logger.info("Created Table tbl_unique_item_investigations");
// Total Item Investigations
logger.info("Create View lr_view_total_item_investigations");
sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_investigations "
+ "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ "COUNT(entity_id) AS total_item_investigations, "
+ "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog "
+ "WHERE (source_item_type='oaItem' or source_item_type='repItem') "
+ "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ";
stmt.executeUpdate(sql);
logger.info("Created View lr_view_total_item_investigations");
logger.info("Drop Table lr_tbl_total_item_investigations");
sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations ";
stmt.executeUpdate(sql);
logger.info("Dropped Table lr_tbl_total_item_investigations");
logger.info("Create Table lr_tbl_total_item_investigations");
sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations as "
+ "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ "sum(total_item_investigations) AS total_item_investigations, sum(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_investigations p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' "
+ "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
logger.info("Created Table lr_tbl_total_item_investigations");
// Unique Item Requests
logger.info("Create View lr_view_unique_item_requests");
sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_requests AS "
+ "SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ "CASE WHEN COUNT(entity_id)>1 THEN 1 ELSE 1 END AS unique_item_requests, "
+ "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog "
+ "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') "
+ "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ";
stmt.executeUpdate(sql);
logger.info("Created View lr_view_unique_item_requests");
logger.info("Drop Table Unique_Item_Requests");
sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests ";
stmt.executeUpdate(sql);
logger.info("Dropped Table Unique_Item_Requests");
logger.info("Create Table lr_tbl_unique_item_requests");
sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests as "
+ "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ "sum(unique_item_requests) AS unique_item_requests, sum(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_unique_item_requests p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' "
+ "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
logger.info("Created Table lr_tbl_unique_item_requests");
// Total Item Requests
logger.info("Create View lr_view_total_item_requests");
sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".lr_view_total_item_requests "
+ "AS SELECT id_visit, entity_id, reflect('java.net.URLDecoder', 'decode', entity_id) AS id, "
+ "COUNT(entity_id) AS total_item_requests, "
+ "SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".lareferencialog "
+ "WHERE action='download' AND (source_item_type='oaItem' or source_item_type='repItem') "
+ "AND entity_id is NOT NULL GROUP BY id_visit, entity_id, "
+ "CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), source ";
stmt.executeUpdate(sql);
logger.info("Created View lr_view_total_item_requests");
logger.info("Drop Table lr_tbl_total_item_requests");
sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests ";
stmt.executeUpdate(sql);
logger.info("Dropped Table lr_tbl_total_item_requests");
logger.info("Create Table lr_tbl_total_item_requests");
sql = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests as "
+ "SELECT 'OpenAIRE' as source, d.id as repository_id, ro.id as result_id, month as date, "
+ "sum(total_item_requests) AS total_item_requests, sum(openaire_referrer) AS openaire "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".view_total_item_requests p, "
+ ConnectDB.getStatsDBSchema() + ".datasource d," + ConnectDB.getStatsDBSchema() + ".result_oids ro "
+ "WHERE p.source=d.piwik_id AND p.id=ro.oid AND ro.oid!='200' AND ro.oid!='204' AND ro.oid!='404' "
+ "AND ro.oid!='400' AND ro.oid!='503' AND d.id!='re3data_____::7b0ad08687b2c960d5aeef06f811d5e6' "
+ "GROUP BY d.id, ro.id, month ";
stmt.executeUpdate(sql);
logger.info("Created Table lr_tbl_total_item_requests");
// All CoP R5 metrics Table
logger.info("Drop Table lr_tbl_all_r5_metrics");
sql = "DROP TABLE IF EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_all_r5_metrics ";
stmt.executeUpdate(sql);
logger.info("Dropped Table lr_tbl_all_r5_metrics");
logger.info("Create Table lr_tbl_all_r5_metrics");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_all_r5_metrics as "
+ "WITH tmp1 as (SELECT coalesce(ds.repository_id, vs.repository_id) as repository_id, "
+ "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, "
+ "coalesce(vs.unique_item_investigations, 0) as unique_item_investigations, "
+ "coalesce(ds.total_item_investigations, 0) as total_item_investigations "
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_investigations AS vs "
+ "FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_investigations AS ds "
+ " ON ds.source=vs.source AND ds.result_id=vs.result_id AND ds.date=vs.date), "
+ "tmp2 AS (select coalesce (ds.repository_id, vs.repository_id) as repository_id, "
+ "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, "
+ "coalesce(ds.total_item_investigations, 0) as total_item_investigations, "
+ "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, "
+ " coalesce(vs.unique_item_requests, 0) as unique_item_requests FROM tmp1 "
+ "AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_unique_item_requests AS vs "
+ "ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date) "
+ "SELECT 'LaReferencia' as source, coalesce (ds.repository_id, vs.repository_id) as repository_id, "
+ "coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, "
+ "coalesce(ds.unique_item_investigations, 0) as unique_item_investigations, "
+ "coalesce(ds.total_item_investigations, 0) as total_item_investigations, "
+ "coalesce(ds.unique_item_requests, 0) as unique_item_requests, "
+ "coalesce(vs.total_item_requests, 0) as total_item_requests "
+ "FROM tmp2 AS ds FULL OUTER JOIN " + ConnectDB.getUsageStatsDBSchema() + ".lr_tbl_total_item_requests "
+ "AS vs ON ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
stmt.executeUpdate(sql);
logger.info("Created Table tbl_all_r5_metrics");
stmt.close();
ConnectDB.getHiveConnection().close();
}
}

View File

@ -0,0 +1,107 @@
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.io.*;
// import java.io.BufferedReader;
// import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class SarcStats {
private Statement stmtHive = null;
private Statement stmtImpala = null;
private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
public SarcStats() throws Exception {
// createTables();
}
private void createTables() throws Exception {
try {
stmtHive = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmtHive.executeUpdate(sqlCreateTableSushiLog);
// String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO sushilog "
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
+ "sushilog.rid, sushilog.date "
+ "FROM sushilog "
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
stmtHive.executeUpdate(sqlcreateRuleSushiLog);
String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
stmtHive.executeUpdate(createSushiIndex);
stmtHive.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
public void processSarc() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating sarc_downloads_stats_tmp table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_downloads_stats_tmp "
+ "(`source` string, "
+ "`repository_id` string, "
+ "`result_id` string, "
+ "`date` string, "
+ "`count` bigint, "
+ "`openaire` bigint)";
stmt.executeUpdate(createDownloadsStats);
logger.info("Created sarc_downloads_stats_tmp table");
logger.info("Inserting into sarc_downloads_stats_tmp");
String insertSarcStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_downloads_stats_tmp "
+ "SELECT s.source, d.id AS repository_id, "
+ "ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', "
+ "LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' "
+ "FROM " + ConnectDB.getUsageRawDataDBSchema() + ".sushilog s, "
+ ConnectDB.getStatsDBSchema() + ".datasource_oids d, "
+ ConnectDB.getStatsDBSchema() + ".result_pids ro "
+ "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND d.id like CONCAT('%', 'sarcservicod', '%') "
+ "AND s.rid=ro.pid AND ro.type='Digital Object Identifier' AND s.metric_type='ft_total' AND s.source='SARC-OJS'";
stmt.executeUpdate(insertSarcStats);
logger.info("Inserted into sarc_downloads_stats_tmp");
stmt.close();
// ConnectDB.getHiveConnection().close();
}
}

View File

@ -0,0 +1,137 @@
package eu.dnetlib.oa.graph.usagestatsbuild.export;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main class for downloading and processing Usage statistics
*
* @author D. Pierrakos, S. Zoupanos
*/
public class UsageStatsExporter {
public UsageStatsExporter() {
}
private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
public void export() throws Exception {
logger.info("Initialising DB properties");
ConnectDB.init();
// runImpalaQuery();
PiwikStatsDB piwikstatsdb = new PiwikStatsDB();
logger.info("Re-creating database and tables");
if (ExecuteWorkflow.recreateDbAndTables) {
piwikstatsdb.recreateDBAndTables();
logger.info("DB-Tables are created ");
}
// else {
// piwikstatsdb.createTmpTables();
// logger.info("TmpTables are created ");
// }
if (ExecuteWorkflow.processPiwikLogs) {
logger.info("Creating distinct piwik log");
piwikstatsdb.createDistinctPiwikLog();
logger.info("Processing OpenAIRE logs");
piwikstatsdb.processLogs();
logger.info("OpenAIRE logs Done");
logger.info("Processing Episciences logs");
piwikstatsdb.processEpisciencesLogs();
logger.info("Episciences logs Done");
logger.info("Processing Pedocs Old Stats");
piwikstatsdb.uploadOldPedocs();
logger.info("Processing Pedocs Old Stats Done");
logger.info("Processing TUDELFT Stats");
piwikstatsdb.uploadTUDELFTStats();
logger.info("Processing TUDELFT Stats Done");
logger.info("Processing B2SHARE Stats");
piwikstatsdb.uploadB2SHAREStats();
logger.info("Processing B2SHARE Stats Done");
}
LaReferenciaStats lastats = new LaReferenciaStats();
if (ExecuteWorkflow.processLaReferenciaLogs) {
logger.info("Processing LaReferencia logs");
lastats.processLogs();
logger.info("LaReferencia logs done");
}
IrusStats irusstats = new IrusStats();
if (ExecuteWorkflow.irusProcessStats) {
logger.info("Processing IRUS");
irusstats.processIrusStats();
logger.info("Irus done");
}
SarcStats sarcStats = new SarcStats();
if (ExecuteWorkflow.sarcProcessStats) {
sarcStats.processSarc();
}
logger.info("Sarc done");
// finalize usagestats
if (ExecuteWorkflow.finalizeStats) {
piwikstatsdb.finalizeStats();
logger.info("Finalized stats");
}
// Make the tables available to Impala
if (ExecuteWorkflow.finalTablesVisibleToImpala) {
logger.info("Making tables visible to Impala");
invalidateMetadata();
}
logger.info("End");
}
private void invalidateMetadata() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getImpalaConnection().createStatement();
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".downloads_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".views_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".usage_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".pageviews_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsagestatsPermanentDBSchema() + ".counter_r5_stats_with_metrics";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
}

View File

@ -0,0 +1,92 @@
[
{
"paramName": "pmi",
"paramLongName": "portalMatomoID",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "dbhu",
"paramLongName": "dbHiveUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "dbiu",
"paramLongName": "dbImpalaUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "urdbs",
"paramLongName": "usageRawDataDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "usdbs",
"paramLongName": "usageStatsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "sdbs",
"paramLongName": "statsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "uspdbs",
"paramLongName": "usagestatsPermanentDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "rdbt",
"paramLongName": "recreateDbAndTables",
"paramDescription": "Re-create database and initial tables?",
"paramRequired": true
},
{
"paramName": "ppwl",
"paramLongName": "processPiwikLogs",
"paramDescription": "Process the piwiklogs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "plrl",
"paramLongName": "processLaReferenciaLogs",
"paramDescription": "Process the La Referencia logs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "irusProcessStats",
"paramDescription": "Irus section: Process stats?",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "sarcProcessStats",
"paramDescription": "Sarc section: Process stats?",
"paramRequired": true
},
{
"paramName": "fs",
"paramLongName": "finalizeStats",
"paramDescription": "Create the usage_stats table?",
"paramRequired": true
},
{
"paramName": "ftvi",
"paramLongName": "finalTablesVisibleToImpala",
"paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala",
"paramRequired": true
},
{
"paramName": "nodt",
"paramLongName": "numberOfDownloadThreads",
"paramDescription": "Number of download threads",
"paramRequired": true
}
]

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1;?spark.executor.memory=19166291558;spark.yarn.executor.memoryOverhead=3225;spark.driver.memory=11596411699;spark.yarn.driver.memoryOverhead=1228</value>
</property>
<property>
<name>impalaJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,83 @@
<workflow-app name="Usage Stats Update" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>Hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>Hive server jdbc url</description>
</property>
<property>
<name>impalaJdbcUrl</name>
<description>Impala server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>spark.executor.memory</name>
<value>19166291558</value>
</property>
<property>
<name>spark.yarn.executor.memoryOverhead</name>
<value>3225</value>
</property>
<property>
<name>spark.driver.memory</name>
<value>11596411699</value>
</property>
<property>
<name>spark.yarn.driver.memoryOverhead</name>
<value>1228</value>
</property>
</configuration>
</global>
<start to="Step1"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name='Step1'>
<java>
<main-class>eu.dnetlib.oa.graph.usagestatsbuild.export.ExecuteWorkflow</main-class>
<arg>--portalMatomoID</arg><arg>${portalMatomoID}</arg>
<arg>--dbHiveUrl</arg><arg>${hiveJdbcUrl}</arg>
<arg>--dbImpalaUrl</arg><arg>${impalaJdbcUrl}</arg>
<arg>--usageRawDataDBSchema</arg><arg>${usageRawDataDBSchema}</arg>
<arg>--usageStatsDBSchema</arg><arg>${usageStatsDBSchema}</arg>
<arg>--usagestatsPermanentDBSchema</arg><arg>${usagestatsPermanentDBSchema}</arg>
<arg>--statsDBSchema</arg><arg>${statsDBSchema}</arg>
<arg>--recreateDbAndTables</arg><arg>${recreateDbAndTables}</arg>
<arg>--processPiwikLogs</arg><arg>${processPiwikLogs}</arg>
<arg>--processLaReferenciaLogs</arg><arg>${processLaReferenciaLogs}</arg>
<arg>--irusProcessStats</arg><arg>${irusProcessStats}</arg>
<arg>--sarcProcessStats</arg><arg>${sarcProcessStats}</arg>
<arg>--finalizeStats</arg><arg>${finalizeStats}</arg>
<arg>--finalTablesVisibleToImpala</arg><arg>${finalTablesVisibleToImpala}</arg>
<arg>--numberOfDownloadThreads</arg><arg>${numberOfDownloadThreads}</arg>
<capture-output/>
</java>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>