Commit 30102020

This commit is contained in:
Dimitris 2020-10-30 14:07:21 +02:00
parent 1d9fdb7367
commit b8a3392b59
14 changed files with 4235 additions and 0 deletions

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>
<artifactId>dhp-workflows</artifactId >
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<groupId>eu.dnetlib</groupId> -->
<!-- <parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-update</artifactId> -->
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-update</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
<type>jar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,125 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.log4j.Logger;
/**
* @author D. Pierrakos, S. Zoupanos
*/
/**
* @author D. Pierrakos, S. Zoupanos
*/
import com.mchange.v2.c3p0.ComboPooledDataSource;
public abstract class ConnectDB {
public static Connection DB_HIVE_CONNECTION;
public static Connection DB_IMPALA_CONNECTION;
private static String dbHiveUrl;
private static String dbImpalaUrl;
private static String usageStatsDBSchema;
private static String statsDBSchema;
private final static Logger log = Logger.getLogger(ConnectDB.class);
static void init() throws ClassNotFoundException {
dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
usageStatsDBSchema = ExecuteWorkflow.usageStatsDBSchema;
statsDBSchema = ExecuteWorkflow.statsDBSchema;
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
public static Connection getHiveConnection() throws SQLException {
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
return DB_HIVE_CONNECTION;
} else {
DB_HIVE_CONNECTION = connectHive();
return DB_HIVE_CONNECTION;
}
}
public static Connection getImpalaConnection() throws SQLException {
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
return DB_IMPALA_CONNECTION;
} else {
DB_IMPALA_CONNECTION = connectImpala();
return DB_IMPALA_CONNECTION;
}
}
public static String getUsageStatsDBSchema() {
return ConnectDB.usageStatsDBSchema;
}
public static String getStatsDBSchema() {
return ConnectDB.statsDBSchema;
}
private static Connection connectHive() throws SQLException {
/*
* Connection connection = DriverManager.getConnection(dbHiveUrl); Statement stmt =
* connection.createStatement(); log.debug("Opened database successfully"); return connection;
*/
ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbHiveUrl);
cpds.setAcquireIncrement(1);
cpds.setMaxPoolSize(100);
cpds.setMinPoolSize(1);
cpds.setInitialPoolSize(1);
cpds.setMaxIdleTime(300);
cpds.setMaxConnectionAge(36000);
cpds.setAcquireRetryAttempts(5);
cpds.setAcquireRetryDelay(2000);
cpds.setBreakAfterAcquireFailure(false);
cpds.setCheckoutTimeout(30000);
cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60);
return cpds.getConnection();
}
private static Connection connectImpala() throws SQLException {
/*
* Connection connection = DriverManager.getConnection(dbImpalaUrl); Statement stmt =
* connection.createStatement(); log.debug("Opened database successfully"); return connection;
*/
ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setJdbcUrl(dbImpalaUrl);
cpds.setAcquireIncrement(1);
cpds.setMaxPoolSize(100);
cpds.setMinPoolSize(1);
cpds.setInitialPoolSize(1);
cpds.setMaxIdleTime(300);
cpds.setMaxConnectionAge(36000);
cpds.setAcquireRetryAttempts(5);
cpds.setAcquireRetryDelay(2000);
cpds.setBreakAfterAcquireFailure(false);
cpds.setCheckoutTimeout(30000);
cpds.setPreferredTestQuery("SELECT 1");
cpds.setIdleConnectionTestPeriod(60);
return cpds.getConnection();
}
}

View File

@ -0,0 +1,197 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.BasicConfigurator;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class ExecuteWorkflow {
static String matomoAuthToken;
static String matomoBaseURL;
static String repoLogPath;
static String portalLogPath;
static String portalMatomoID;
static String irusUKBaseURL;
static String irusUKReportPath;
static String sarcsReportPathArray;
static String sarcsReportPathNonArray;
static String lareferenciaLogPath;
static String lareferenciaBaseURL;
static String lareferenciaAuthToken;
static String dbHiveUrl;
static String dbImpalaUrl;
static String usageStatsDBSchema;
static String statsDBSchema;
static boolean recreateDbAndTables;
static boolean piwikEmptyDirs;
static boolean downloadPiwikLogs;
static boolean processPiwikLogs;
static Calendar startingLogPeriod;
static Calendar endingLogPeriod;
static int numberOfPiwikIdsToDownload;
static int numberOfSiteIdsToDownload;
static boolean laReferenciaEmptyDirs;
static boolean downloadLaReferenciaLogs;
static boolean processLaReferenciaLogs;
static boolean irusCreateTablesEmptyDirs;
static boolean irusDownloadReports;
static boolean irusProcessStats;
static int irusNumberOfOpendoarsToDownload;
static boolean sarcCreateTablesEmptyDirs;
static boolean sarcDownloadReports;
static boolean sarcProcessStats;
static int sarcNumberOfIssnToDownload;
static boolean finalizeStats;
static boolean finalTablesVisibleToImpala;
static int numberOfDownloadThreads;
public static void main(String args[]) throws Exception {
// Sending the logs to the console
BasicConfigurator.configure();
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
UsageStatsExporter.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json")));
parser.parseArgument(args);
// Setting up the initial parameters
matomoAuthToken = parser.get("matomoAuthToken");
matomoBaseURL = parser.get("matomoBaseURL");
repoLogPath = parser.get("repoLogPath");
portalLogPath = parser.get("portalLogPath");
portalMatomoID = parser.get("portalMatomoID");
irusUKBaseURL = parser.get("irusUKBaseURL");
irusUKReportPath = parser.get("irusUKReportPath");
sarcsReportPathArray = parser.get("sarcsReportPathArray");
sarcsReportPathNonArray = parser.get("sarcsReportPathNonArray");
lareferenciaLogPath = parser.get("lareferenciaLogPath");
lareferenciaBaseURL = parser.get("lareferenciaBaseURL");
lareferenciaAuthToken = parser.get("lareferenciaAuthToken");
dbHiveUrl = parser.get("dbHiveUrl");
dbImpalaUrl = parser.get("dbImpalaUrl");
usageStatsDBSchema = parser.get("usageStatsDBSchema");
statsDBSchema = parser.get("statsDBSchema");
if (parser.get("recreateDbAndTables").toLowerCase().equals("true"))
recreateDbAndTables = true;
else
recreateDbAndTables = false;
if (parser.get("piwikEmptyDirs").toLowerCase().equals("true"))
piwikEmptyDirs = true;
else
piwikEmptyDirs = false;
if (parser.get("downloadPiwikLogs").toLowerCase().equals("true"))
downloadPiwikLogs = true;
else
downloadPiwikLogs = false;
if (parser.get("processPiwikLogs").toLowerCase().equals("true"))
processPiwikLogs = true;
else
processPiwikLogs = false;
String startingLogPeriodStr = parser.get("startingLogPeriod");
Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr);
startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate);
String endingLogPeriodStr = parser.get("endingLogPeriod");
Date endingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(endingLogPeriodStr);
endingLogPeriod = startingLogPeriodStr(endingLogPeriodDate);
numberOfPiwikIdsToDownload = Integer.parseInt(parser.get("numberOfPiwikIdsToDownload"));
numberOfSiteIdsToDownload = Integer.parseInt(parser.get("numberOfSiteIdsToDownload"));
if (parser.get("laReferenciaEmptyDirs").toLowerCase().equals("true"))
laReferenciaEmptyDirs = true;
else
laReferenciaEmptyDirs = false;
if (parser.get("downloadLaReferenciaLogs").toLowerCase().equals("true"))
downloadLaReferenciaLogs = true;
else
downloadLaReferenciaLogs = false;
if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true"))
processLaReferenciaLogs = true;
else
processLaReferenciaLogs = false;
if (parser.get("irusCreateTablesEmptyDirs").toLowerCase().equals("true"))
irusCreateTablesEmptyDirs = true;
else
irusCreateTablesEmptyDirs = false;
if (parser.get("irusDownloadReports").toLowerCase().equals("true"))
irusDownloadReports = true;
else
irusDownloadReports = false;
if (parser.get("irusProcessStats").toLowerCase().equals("true"))
irusProcessStats = true;
else
irusProcessStats = false;
irusNumberOfOpendoarsToDownload = Integer.parseInt(parser.get("irusNumberOfOpendoarsToDownload"));
if (parser.get("sarcCreateTablesEmptyDirs").toLowerCase().equals("true"))
sarcCreateTablesEmptyDirs = true;
else
sarcCreateTablesEmptyDirs = false;
if (parser.get("sarcDownloadReports").toLowerCase().equals("true"))
sarcDownloadReports = true;
else
sarcDownloadReports = false;
if (parser.get("sarcProcessStats").toLowerCase().equals("true"))
sarcProcessStats = true;
else
sarcProcessStats = false;
sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload"));
if (parser.get("finalizeStats").toLowerCase().equals("true"))
finalizeStats = true;
else
finalizeStats = false;
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
finalTablesVisibleToImpala = true;
else
finalTablesVisibleToImpala = false;
numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
usagestatsExport.export();
}
private static Calendar startingLogPeriodStr(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
return calendar;
}
}

View File

@ -0,0 +1,417 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class IrusStats {
private String irusUKURL;
private static final Logger logger = LoggerFactory.getLogger(IrusStats.class);
public IrusStats(String irusUKURL) throws Exception {
this.irusUKURL = irusUKURL;
// The following may not be needed - It will be created when JSON tables are created
// createTmpTables();
}
public void reCreateLogDirs() throws Exception {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath);
dfs.delete(new Path(ExecuteWorkflow.irusUKReportPath), true);
logger.info("Creating irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath);
dfs.mkdirs(new Path(ExecuteWorkflow.irusUKReportPath));
}
public void createTables() throws Exception {
try {
logger.info("Creating sushilog");
Statement stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog(source STRING, " +
"repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, " +
"repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableSushiLog);
logger.info("Created sushilog");
// To see how to apply to the ignore duplicate rules and indexes
// stmt.executeUpdate(sqlCreateTableSushiLog);
// String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO sushilog "
// + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
// + "sushilog.rid, sushilog.date "
// + "FROM sushilog "
// + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlcreateRuleSushiLog);
// String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
// stmt.executeUpdate(createSushiIndex);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
// // The following may not be needed - It will be created when JSON tables are created
// private void createTmpTables() throws Exception {
// try {
//
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilogtmp(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
// stmt.executeUpdate(sqlCreateTableSushiLog);
//
// // stmt.executeUpdate("CREATE TABLE IF NOT EXISTS public.sushilog AS TABLE sushilog;");
// // String sqlCopyPublicSushiLog = "INSERT INTO sushilog SELECT * FROM public.sushilog;";
// // stmt.executeUpdate(sqlCopyPublicSushiLog);
// String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO sushilogtmp "
// + " WHERE (EXISTS ( SELECT sushilogtmp.source, sushilogtmp.repository,"
// + "sushilogtmp.rid, sushilogtmp.date "
// + "FROM sushilogtmp "
// + "WHERE sushilogtmp.source = new.source AND sushilogtmp.repository = new.repository AND sushilogtmp.rid = new.rid AND sushilogtmp.date = new.date AND sushilogtmp.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlcreateRuleSushiLog);
//
// stmt.close();
// ConnectDB.getConnection().close();
// log.info("Sushi Tmp Tables Created");
// } catch (Exception e) {
// log.error("Failed to create tables: " + e);
// throw new Exception("Failed to create tables: " + e.toString(), e);
// }
// }
public void processIrusStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping sushilogtmp_json table");
String dropSushilogtmpJson = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sushilogtmp_json";
stmt.executeUpdate(dropSushilogtmpJson);
logger.info("Dropped sushilogtmp_json table");
logger.info("Creating irus_sushilogtmp_json table");
String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n" +
" `ItemIdentifier` ARRAY<\n" +
" struct<\n" +
" Type: STRING,\n" +
" Value: STRING\n" +
" >\n" +
" >,\n" +
" `ItemPerformance` ARRAY<\n" +
" struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >\n" +
" >\n" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(createSushilogtmpJson);
logger.info("Created irus_sushilogtmp_json table");
logger.info("Dropping irus_sushilogtmp table");
String dropSushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".irus_sushilogtmp";
stmt.executeUpdate(dropSushilogtmp);
logger.info("Dropped irus_sushilogtmp table");
logger.info("Creating irus_sushilogtmp table");
String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".irus_sushilogtmp(source STRING, repository STRING, " +
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
"tblproperties('transactional'='true')";
stmt.executeUpdate(createSushilogtmp);
logger.info("Created irus_sushilogtmp table");
logger.info("Inserting to irus_sushilogtmp table");
String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp " +
"SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), " +
"`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " +
"`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json " +
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
"LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " +
"WHERE `ItemIdent`.`Type`= 'OAI'";
stmt.executeUpdate(insertSushilogtmp);
logger.info("Inserted to irus_sushilogtmp table");
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats " +
"(`source` string, " +
"`repository_id` string, " +
"`result_id` string, " +
"`date` string, " +
"`count` bigint, " +
"`openaire` bigint)";
stmt.executeUpdate(createDownloadsStats);
logger.info("Created downloads_stats table");
logger.info("Inserting into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
"SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp s, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'";
stmt.executeUpdate(insertDStats);
logger.info("Inserted into downloads_stats");
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository_id` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmt.executeUpdate(createSushilog);
logger.info("Created sushilog table");
logger.info("Inserting to sushilog table");
String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM " +
ConnectDB.getUsageStatsDBSchema()
+ ".irus_sushilogtmp";
stmt.executeUpdate(insertToShushilog);
logger.info("Inserted to sushilog table");
ConnectDB.getHiveConnection().close();
}
public void getIrusRRReport(String irusUKReportPath) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getIrusRRReport) Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getIrusRRReport) Ending period for log download: " + sdf.format(end.getTime()));
String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=" +
sdf.format(start.getTime()) + "&EndDate=" + sdf.format(end.getTime()) +
"&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
logger.info("(getIrusRRReport) Getting report: " + reportUrl);
String text = getJson(reportUrl, "", "");
List<String> opendoarsToVisit = new ArrayList<String>();
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(text);
jsonObject = (JSONObject) jsonObject.get("ReportResponse");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Customer");
JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
int i = 0;
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier");
for (Object identifier : itemIdentifier) {
JSONObject opendoar = (JSONObject) identifier;
if (opendoar.get("Type").toString().equals("OpenDOAR")) {
i++;
opendoarsToVisit.add(opendoar.get("Value").toString());
break;
}
}
// break;
}
logger.info("(getIrusRRReport) Found the following opendoars for download: " + opendoarsToVisit);
if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0 &&
ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
}
logger.info("(getIrusRRReport) Downloading the followins opendoars: " + opendoarsToVisit);
for (String opendoar : opendoarsToVisit) {
logger.info("Now working on openDoar: " + opendoar);
this.getIrusIRReport(opendoar, irusUKReportPath);
}
logger.info("(getIrusRRReport) Finished with report: " + reportUrl);
}
private void getIrusIRReport(String opendoar, String irusUKReportPath) throws Exception {
logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar);
ConnectDB.getHiveConnection().setAutoCommit(false);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getIrusIRReport) Starting period for log download: " + simpleDateFormat.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getIrusIRReport) Ending period for log download: " + simpleDateFormat.format(end.getTime()));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, "opendoar____::" + opendoar);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
int batch_size = 0;
while (start.before(end)) {
// log.info("date: " + simpleDateFormat.format(start.getTime()));
String reportUrl = this.irusUKURL + "GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate="
+ simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime())
+ "&RepositoryIdentifier=opendoar%3A" + opendoar
+ "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
start.add(Calendar.MONTH, 1);
logger.info("Downloading file: " + reportUrl);
String text = getJson(reportUrl, "", "");
if (text == null) {
continue;
}
FileSystem fs = FileSystem.get(new Configuration());
String filePath = irusUKReportPath + "/" + "IrusIRReport_" +
opendoar + "_" + simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePath);
FSDataOutputStream fin = fs.create(new Path(filePath), true);
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(text);
jsonObject = (JSONObject) jsonObject.get("ReportResponse");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Customer");
JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
if (jsonArray == null) {
continue;
}
String oai = "";
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
fin.write(jsonObjectRow.toJSONString().getBytes());
fin.writeChar('\n');
}
fin.close();
}
ConnectDB.getHiveConnection().close();
logger.info("(getIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar);
}
private String getJson(String url) throws Exception {
try {
System.out.println("===> Connecting to: " + url);
URL website = new URL(url);
System.out.println("Connection url -----> " + url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
// response.append("\n");
}
}
System.out.println("response ====> " + response.toString());
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + e);
System.out.println("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
private String getJson(String url, String username, String password) throws Exception {
// String cred=username+":"+password;
// String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL", e);
return null;
}
}
}

View File

@ -0,0 +1,261 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class LaReferenciaDownloadLogs {
private final String piwikUrl;
private Date startDate;
private final String tokenAuth;
/*
* The Piwik's API method
*/
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
private final String format = "&format=json";
private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess";
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaDownloadLogs.class);
public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception {
this.piwikUrl = piwikUrl;
this.tokenAuth = tokenAuth;
this.createTables();
// this.createTmpTables();
}
public void reCreateLogDirs() throws IllegalArgumentException, IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true);
logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath));
}
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
logger.info("Created LaReferencia tables");
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialog "
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
// + "FROM lareferencialog "
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Lareferencia Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
// System.exit(0);
}
}
// private void createTmpTables() throws Exception {
//
// try {
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
// String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialogtmp "
// + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
// + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
// + "FROM lareferencialogtmp "
// + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
// stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
//
// stmt.close();
// log.info("Lareferencia Tmp Tables Created");
//
// } catch (Exception e) {
// log.error("Failed to create tmptables: " + e);
// throw new Exception("Failed to create tmp tables: " + e.toString(), e);
// // System.exit(0);
// }
// }
private String getPiwikLogUrl() {
return piwikUrl + "/";
}
private String getJson(String url) throws Exception {
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
// response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
public void GetLaReferenciaRepos(String repoLogsPath) throws Exception {
String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth;
String content = "";
List<Integer> siteIdsToVisit = new ArrayList<Integer>();
// Getting all the siteIds in a list for logging reasons & limiting the list
// to the max number of siteIds
content = getJson(baseApiUrl);
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
siteIdsToVisit.add(Integer.parseInt(jsonObjectRow.get("idsite").toString()));
}
logger.info("Found the following siteIds for download: " + siteIdsToVisit);
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 &&
ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
siteIdsToVisit = siteIdsToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
}
logger.info("Downloading from repos with the followins siteIds: " + siteIdsToVisit);
for (int siteId : siteIdsToVisit) {
logger.info("Now working on piwikId: " + siteId);
this.GetLaReFerenciaLogs(repoLogsPath, siteId);
}
}
public void GetLaReFerenciaLogs(String repoLogsPath,
int laReferencialMatomoID) throws Exception {
logger.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() +
".lareferencialog WHERE matomoid=? GROUP BY timestamp HAVING max(timestamp) is not null");
st.setInt(1, laReferencialMatomoID);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
Date date = currDay.getTime();
logger
.info(
"Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for "
+ sdf.format(date));
String period = "&period=day&date=" + sdf.format(date);
String outFolder = "";
outFolder = repoLogsPath;
FileSystem fs = FileSystem.get(new Configuration());
FSDataOutputStream fin = fs
.create(
new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + ".json"),
true);
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
String content = "";
int i = 0;
JSONParser parser = new JSONParser();
do {
String apiUrl = baseApiUrl;
if (i > 0) {
apiUrl += "&filter_offset=" + (i * 1000);
}
content = getJson(apiUrl);
if (content.length() == 0 || content.equals("[]"))
break;
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
fin.write(jsonObjectRaw.toJSONString().getBytes());
fin.writeChar('\n');
}
logger
.info(
"Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID
+ " and for "
+ sdf.format(date));
i++;
} while (true);
fin.close();
}
}
}

View File

@ -0,0 +1,423 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URLDecoder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class LaReferenciaStats {
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class);
private String logRepoPath;
private Statement stmt = null;
private String CounterRobotsURL;
private ArrayList robotsList;
public LaReferenciaStats(String logRepoPath) throws Exception {
this.logRepoPath = logRepoPath;
this.createTables();
// this.createTmpTables();
}
/*
* private void connectDB() throws Exception { try { ConnectDB connectDB = new ConnectDB(); } catch (Exception e) {
* log.error("Connect to db failed: " + e); throw new Exception("Failed to connect to db: " + e.toString(), e); } }
*/
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
logger.info("Created LaReferencia tables");
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialog "
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
// + "FROM lareferencialog "
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Lareferencia Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
// System.exit(0);
}
}
// private void createTmpTables() throws Exception {
//
// try {
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
// String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialogtmp "
// + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
// + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
// + "FROM lareferencialogtmp "
// + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
// stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
//
// stmt.close();
// log.info("Lareferencia Tmp Tables Created");
//
// } catch (Exception e) {
// log.error("Failed to create tmptables: " + e);
// throw new Exception("Failed to create tmp tables: " + e.toString(), e);
// // System.exit(0);
// }
// }
public void processLogs() throws Exception {
try {
logger.info("Processing LaReferencia repository logs");
processlaReferenciaLog();
logger.info("LaReferencia repository logs process done");
logger.info("LaReferencia removing double clicks");
removeDoubleClicks();
logger.info("LaReferencia removed double clicks");
logger.info("LaReferencia creating viewsStats");
viewsStats();
logger.info("LaReferencia created viewsStats");
logger.info("LaReferencia creating downloadsStats");
downloadsStats();
logger.info("LaReferencia created downloadsStats");
logger.info("LaReferencia updating Production Tables");
updateProdTables();
logger.info("LaReferencia updated Production Tables");
} catch (Exception e) {
logger.error("Failed to process logs: " + e);
throw new Exception("Failed to process logs: " + e.toString(), e);
}
}
public void processlaReferenciaLog() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping lareferencialogtmp_json table");
String drop_lareferencialogtmp_json = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp_json";
stmt.executeUpdate(drop_lareferencialogtmp_json);
logger.info("Dropped lareferencialogtmp_json table");
logger.info("Creating lareferencialogtmp_json");
String create_lareferencialogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp_json(\n" +
" `idSite` STRING,\n" +
" `idVisit` STRING,\n" +
" `country` STRING,\n" +
" `referrerName` STRING,\n" +
" `browser` STRING,\n" +
" `repItem` STRING,\n" +
" `actionDetails` ARRAY<\n" +
" struct<\n" +
" timestamp: STRING,\n" +
" type: STRING,\n" +
" url: STRING,\n" +
" `customVariables`: struct<\n" +
" `1`: struct<\n" +
" `customVariablePageValue1`: STRING\n" +
" >,\n" +
" `2`: struct<\n" +
" `customVariablePageValue2`: STRING\n" +
" >\n" +
" >\n" +
" >\n" +
" >" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + ExecuteWorkflow.lareferenciaLogPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_lareferencialogtmp_json);
logger.info("Created lareferencialogtmp_json");
logger.info("Dropping lareferencialogtmp table");
String drop_lareferencialogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp";
stmt.executeUpdate(drop_lareferencialogtmp);
logger.info("Dropped lareferencialogtmp table");
logger.info("Creating lareferencialogtmp");
String create_lareferencialogtmp = "CREATE TABLE " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(create_lareferencialogtmp);
logger.info("Created lareferencialogtmp");
logger.info("Inserting into lareferencialogtmp");
String insert_lareferencialogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp " +
"SELECT DISTINCT cast(idSite as INT) as matomoid, CONCAT('opendoar____::', " +
"actiondetail.customVariables.`2`.customVariablePageValue2) as source, idVisit as id_Visit, country, " +
"actiondetail.type as action, actiondetail.url as url, " +
"actiondetail.customVariables.`1`.`customVariablePageValue1` as entity_id, " +
"'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " +
"referrerName as referrer_name, browser as agent " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp_json " +
"LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail";
stmt.executeUpdate(insert_lareferencialogtmp);
logger.info("Inserted into lareferencialogtmp");
stmt.close();
}
public void removeDoubleClicks() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Cleaning download double clicks");
// clean download double clicks
String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp WHERE EXISTS (" +
"SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p1, " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p2 " +
"WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id " +
"AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp " +
"AND p1.timestamp<p2.timestamp AND ((unix_timestamp(p2.timestamp)-unix_timestamp(p1.timestamp))/60)<30 " +
"AND lareferencialogtmp.source=p1.source AND lareferencialogtmp.id_visit=p1.id_visit " +
"AND lareferencialogtmp.action=p1.action AND lareferencialogtmp.entity_id=p1.entity_id " +
"AND lareferencialogtmp.timestamp=p1.timestamp)";
stmt.executeUpdate(sql);
stmt.close();
logger.info("Cleaned download double clicks");
stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Cleaning action double clicks");
// clean view double clicks
sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp WHERE EXISTS (" +
"SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p1, " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p2 " +
"WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id " +
"AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp " +
"AND p1.timestamp<p2.timestamp AND ((unix_timestamp(p2.timestamp)-unix_timestamp(p1.timestamp))/60)<10 " +
"AND lareferencialogtmp.source=p1.source AND lareferencialogtmp.id_visit=p1.id_visit " +
"AND lareferencialogtmp.action=p1.action AND lareferencialogtmp.entity_id=p1.entity_id " +
"AND lareferencialogtmp.timestamp=p1.timestamp)";
stmt.executeUpdate(sql);
stmt.close();
logger.info("Cleaned action double clicks");
// conn.close();
}
public void viewsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating la_result_views_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
+
"SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp where action='action' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_views_monthly_tmp view");
logger.info("Dropping la_views_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_views_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_views_stats_tmp table");
logger.info("Creating la_views_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(views) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_views_stats_tmp table");
stmt.close();
ConnectDB.getHiveConnection().close();
}
private void downloadsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating la_result_downloads_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ ".la_result_downloads_monthly_tmp AS " +
"SELECT entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp where action='download' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_downloads_monthly_tmp view");
logger.info("Dropping la_downloads_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_downloads_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_downloads_stats_tmp table");
logger.info("Creating la_downloads_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(downloads) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_downloads_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_downloads_stats_tmp table");
stmt.close();
ConnectDB.getHiveConnection().close();
}
private void updateProdTables() throws SQLException, Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Updating lareferencialog");
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp";
stmt.executeUpdate(sql);
logger.info("Updating views_stats");
sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp";
stmt.executeUpdate(sql);
// sql = "insert into public.views_stats select * from la_views_stats_tmp;";
// stmt.executeUpdate(sql);
logger.info("Updating downloads_stats");
sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp";
stmt.executeUpdate(sql);
// sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
// stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
private ArrayList<String> listHdfsDir(String dir) throws Exception {
FileSystem hdfs = FileSystem.get(new Configuration());
RemoteIterator<LocatedFileStatus> Files;
ArrayList<String> fileNames = new ArrayList<>();
try {
Path exportPath = new Path(hdfs.getUri() + dir);
Files = hdfs.listFiles(exportPath, false);
while (Files.hasNext()) {
String fileName = Files.next().getPath().toString();
// log.info("Found hdfs file " + fileName);
fileNames.add(fileName);
}
// hdfs.close();
} catch (Exception e) {
logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e);
}
return fileNames;
}
private String readHDFSFile(String filename) throws Exception {
String result;
try {
FileSystem fs = FileSystem.get(new Configuration());
// log.info("reading file : " + filename);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
StringBuilder sb = new StringBuilder();
String line = br.readLine();
while (line != null) {
if (!line.equals("[]")) {
sb.append(line);
}
// sb.append(line);
line = br.readLine();
}
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
if (result.equals("")) {
result = "[]";
}
// fs.close();
} catch (Exception e) {
logger.error(e.getMessage());
throw new Exception(e);
}
return result;
}
}

View File

@ -0,0 +1,316 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.Authenticator;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class PiwikDownloadLogs {
private final String piwikUrl;
private Date startDate;
private final String tokenAuth;
/*
* The Piwik's API method
*/
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
private final String format = "&format=json";
private static final Logger logger = LoggerFactory.getLogger(PiwikDownloadLogs.class);
public PiwikDownloadLogs(String piwikUrl, String tokenAuth) {
this.piwikUrl = piwikUrl;
this.tokenAuth = tokenAuth;
}
private String getPiwikLogUrl() {
return "https://" + piwikUrl + "/";
}
private String getJson(String url) throws Exception {
try {
logger.debug("Connecting to download the JSON: " + url);
URL website = new URL(url);
URLConnection connection = website.openConnection();
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + url + " Exception: " + e);
throw new Exception("Failed to get URL: " + url + " Exception: " + e.toString(), e);
}
}
class WorkerThread implements Runnable {
private Calendar currDay;
private int siteId;
private String repoLogsPath;
private String portalLogPath;
private String portalMatomoID;
public WorkerThread(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
String portalMatomoID) throws IOException {
this.currDay = (Calendar) currDay.clone();
this.siteId = new Integer(siteId);
this.repoLogsPath = new String(repoLogsPath);
this.portalLogPath = new String(portalLogPath);
this.portalMatomoID = new String(portalMatomoID);
}
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
System.out
.println(
Thread.currentThread().getName() + " (Start) Thread for "
+ "parameters: currDay=" + sdf.format(currDay.getTime()) + ", siteId=" + siteId +
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
try {
GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out
.println(
Thread.currentThread().getName() + " (End) Thread for "
+ "parameters: currDay=" + sdf.format(currDay.getTime()) + ", siteId=" + siteId +
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
}
public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
String portalMatomoID) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = currDay.getTime();
logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
String period = "&period=day&date=" + sdf.format(date);
String outFolder = "";
if (siteId == Integer.parseInt(portalMatomoID)) {
outFolder = portalLogPath;
} else {
outFolder = repoLogsPath;
}
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
String content = "";
int i = 0;
JSONParser parser = new JSONParser();
StringBuffer totalContent = new StringBuffer();
FileSystem fs = FileSystem.get(new Configuration());
do {
int writtenBytes = 0;
String apiUrl = baseApiUrl;
if (i > 0) {
apiUrl += "&filter_offset=" + (i * 1000);
}
content = getJson(apiUrl);
if (content.length() == 0 || content.equals("[]"))
break;
FSDataOutputStream fin = fs
.create(
new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json"),
true);
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes();
fin.write(jsonObjectRawBytes);
fin.writeChar('\n');
writtenBytes += jsonObjectRawBytes.length + 1;
}
fin.close();
System.out
.println(
Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes
+ " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json");
i++;
} while (true);
fs.close();
}
}
public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
Statement statement = ConnectDB.getHiveConnection().createStatement();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
ResultSet rs = statement
.executeQuery(
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
+ ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
// Getting all the piwikids in a list for logging reasons & limitting the list
// to the max number of piwikids
List<Integer> piwikIdToVisit = new ArrayList<Integer>();
while (rs.next())
piwikIdToVisit.add(rs.getInt(1));
logger.info("Found the following piwikIds for download: " + piwikIdToVisit);
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 &&
ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) {
logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
}
logger.info("Downloading from repos with the followins piwikIds: " + piwikIdToVisit);
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
//ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads);
for (int siteId : piwikIdToVisit) {
logger.info("Now working on piwikId: " + siteId);
PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
+ ".piwiklog WHERE source=?");
st.setInt(1, siteId);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
logger.info("Found max date: " + rs_date.getString(1) + " for repository " + siteId);
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
//logger.info("Date used " + currDay.toString());
//Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
//executor.execute(worker);// calling execute method of ExecutorService
GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
}
}
//executor.shutdown();
//while (!executor.isTerminated()) {
//}
//System.out.println("Finished all threads");
}
public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
String portalMatomoID) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = currDay.getTime();
logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
String period = "&period=day&date=" + sdf.format(date);
String outFolder = "";
if (siteId == Integer.parseInt(portalMatomoID)) {
outFolder = portalLogPath;
} else {
outFolder = repoLogsPath;
}
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
String content = "";
int i = 0;
JSONParser parser = new JSONParser();
StringBuffer totalContent = new StringBuffer();
FileSystem fs = FileSystem.get(new Configuration());
do {
int writtenBytes = 0;
String apiUrl = baseApiUrl;
if (i > 0) {
apiUrl += "&filter_offset=" + (i * 1000);
}
content = getJson(apiUrl);
if (content.length() == 0 || content.equals("[]"))
break;
FSDataOutputStream fin = fs
.create(
new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json"),
true);
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes();
fin.write(jsonObjectRawBytes);
fin.writeChar('\n');
writtenBytes += jsonObjectRawBytes.length + 1;
}
fin.close();
System.out
.println(
Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes
+ " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json");
i++;
} while (true);
fs.close();
}
}

View File

@ -0,0 +1,54 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
/**
* @author D. Pierrakos, S. Zoupanos
*/
/**
* @author D. Pierrakos, S. Zoupanos
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.json.JSONException;
import org.json.simple.JSONArray;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class ReadCounterRobotsList {
private ArrayList robotsPatterns = new ArrayList();
private String COUNTER_ROBOTS_URL;
public ReadCounterRobotsList(String url) throws IOException, JSONException, ParseException {
COUNTER_ROBOTS_URL = url;
robotsPatterns = readRobotsPartners(COUNTER_ROBOTS_URL);
}
private ArrayList readRobotsPartners(String url) throws MalformedURLException, IOException, ParseException {
InputStream is = new URL(url).openStream();
JSONParser parser = new JSONParser();
BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("ISO-8859-1")));
JSONArray jsonArray = (JSONArray) parser.parse(reader);
for (Object aJsonArray : jsonArray) {
org.json.simple.JSONObject jsonObjectRow = (org.json.simple.JSONObject) aJsonArray;
robotsPatterns.add(jsonObjectRow.get("pattern").toString().replace("\\", "\\\\"));
}
return robotsPatterns;
}
public ArrayList getRobotsPatterns() {
return robotsPatterns;
}
}

View File

@ -0,0 +1,571 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
// import java.io.BufferedReader;
// import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class SarcStats {
private Statement stmtHive = null;
private Statement stmtImpala = null;
private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
public SarcStats() throws Exception {
// createTables();
}
private void createTables() throws Exception {
try {
stmtHive = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmtHive.executeUpdate(sqlCreateTableSushiLog);
// String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO sushilog "
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
+ "sushilog.rid, sushilog.date "
+ "FROM sushilog "
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
stmtHive.executeUpdate(sqlcreateRuleSushiLog);
String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
stmtHive.executeUpdate(createSushiIndex);
stmtHive.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
public void reCreateLogDirs() throws IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting sarcsReport (Array) directory: " + ExecuteWorkflow.sarcsReportPathArray);
dfs.delete(new Path(ExecuteWorkflow.sarcsReportPathArray), true);
logger.info("Deleting sarcsReport (NonArray) directory: " + ExecuteWorkflow.sarcsReportPathNonArray);
dfs.delete(new Path(ExecuteWorkflow.sarcsReportPathNonArray), true);
logger.info("Creating sarcsReport (Array) directory: " + ExecuteWorkflow.sarcsReportPathArray);
dfs.mkdirs(new Path(ExecuteWorkflow.sarcsReportPathArray));
logger.info("Creating sarcsReport (NonArray) directory: " + ExecuteWorkflow.sarcsReportPathNonArray);
dfs.mkdirs(new Path(ExecuteWorkflow.sarcsReportPathNonArray));
}
public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping sarc_sushilogtmp_json_array table");
String drop_sarc_sushilogtmp_json_array = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array";
stmt.executeUpdate(drop_sarc_sushilogtmp_json_array);
logger.info("Dropped sarc_sushilogtmp_json_array table");
logger.info("Creating sarc_sushilogtmp_json_array table");
String create_sarc_sushilogtmp_json_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n" +
" `ItemIdentifier` ARRAY<\n" +
" struct<\n" +
" `Type`: STRING,\n" +
" `Value`: STRING\n" +
" >\n" +
" >,\n" +
" `ItemPerformance` struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >\n" +
")" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + sarcsReportPathArray + "/'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_sarc_sushilogtmp_json_array);
logger.info("Created sarc_sushilogtmp_json_array table");
logger.info("Dropping sarc_sushilogtmp_json_non_array table");
String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp_json_non_array";
stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array);
logger.info("Dropped sarc_sushilogtmp_json_non_array table");
logger.info("Creating sarc_sushilogtmp_json_non_array table");
String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n" +
" `ItemIdentifier` struct<\n" +
" `Type`: STRING,\n" +
" `Value`: STRING\n" +
" >,\n" +
" `ItemPerformance` struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >" +
")" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + sarcsReportPathNonArray + "/'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array);
logger.info("Created sarc_sushilogtmp_json_non_array table");
logger.info("Creating sarc_sushilogtmp table");
String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp(source STRING, repository STRING, " +
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
"tblproperties('transactional'='true')";
stmt.executeUpdate(create_sarc_sushilogtmp);
logger.info("Created sarc_sushilogtmp table");
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " +
"SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
" `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
"`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array " +
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
"WHERE `ItemIdent`.`Type`='DOI'";
stmt.executeUpdate(insert_sarc_sushilogtmp);
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " +
"SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
"`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
"`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array";
stmt.executeUpdate(insert_sarc_sushilogtmp);
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
ConnectDB.getHiveConnection().close();
}
public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmt.executeUpdate(createSushilog);
logger.info("Created sushilog table");
logger.info("Dropping sarc_sushilogtmp table");
String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp";
stmt.executeUpdate(drop_sarc_sushilogtmp);
logger.info("Dropped sarc_sushilogtmp table");
ConnectDB.getHiveConnection().close();
List<String[]> issnAndUrls = new ArrayList<String[]>();
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/motricidade/sushiLite/v1_7/", "1646-107X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/antropologicas/sushiLite/v1_7/", "0873-819X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/interaccoes/sushiLite/v1_7/", "1646-2335"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/cct/sushiLite/v1_7/", "2182-3030"
});
issnAndUrls.add(new String[] {
"https://actapediatrica.spp.pt/sushiLite/v1_7/", "0873-9781"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/sociologiapp/sushiLite/v1_7/", "0873-6529"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/finisterra/sushiLite/v1_7/", "0430-5027"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/sisyphus/sushiLite/v1_7/", "2182-8474"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/anestesiologia/sushiLite/v1_7/", "0871-6099"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/rpe/sushiLite/v1_7/", "0871-9187"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/psilogos/sushiLite/v1_7/", "1646-091X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/juridica/sushiLite/v1_7/", "2183-5799"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/ecr/sushiLite/v1_7/", "1647-2098"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/nascercrescer/sushiLite/v1_7/", "0872-0754"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/cea/sushiLite/v1_7/", "1645-3794"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/proelium/sushiLite/v1_7/", "1645-8826"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/millenium/sushiLite/v1_7/", "0873-3015"
});
if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0 &&
ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.sarcNumberOfIssnToDownload);
issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload);
}
logger.info("(getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls);
for (String[] issnAndUrl : issnAndUrls) {
logger.info("Now working on ISSN: " + issnAndUrl[1]);
getARReport(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]);
}
}
public void finalizeSarcStats() throws Exception {
stmtHive = ConnectDB.getHiveConnection().createStatement();
ConnectDB.getHiveConnection().setAutoCommit(false);
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats " +
"(`source` string, " +
"`repository_id` string, " +
"`result_id` string, " +
"`date` string, " +
"`count` bigint, " +
"`openaire` bigint)";
stmtHive.executeUpdate(createDownloadsStats);
logger.info("Created downloads_stats table");
logger.info("Dropping sarc_sushilogtmp_impala table");
String drop_sarc_sushilogtmp_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp_impala";
stmtHive.executeUpdate(drop_sarc_sushilogtmp_impala);
logger.info("Dropped sarc_sushilogtmp_impala table");
logger.info("Creating sarc_sushilogtmp_impala, a table readable by impala");
String createSarcSushilogtmpImpala = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala " +
"STORED AS PARQUET AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmtHive.executeUpdate(createSarcSushilogtmpImpala);
logger.info("Created sarc_sushilogtmp_impala");
logger.info("Making sarc_sushilogtmp visible to impala");
String invalidateMetadata = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala;";
stmtImpala.executeUpdate(invalidateMetadata);
logger.info("Dropping downloads_stats_impala table");
String drop_downloads_stats_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".downloads_stats_impala";
stmtHive.executeUpdate(drop_downloads_stats_impala);
logger.info("Dropped downloads_stats_impala table");
logger.info("Making downloads_stats_impala deletion visible to impala");
try {
String invalidateMetadataDownloadsStatsImpala = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala;";
stmtImpala.executeUpdate(invalidateMetadataDownloadsStatsImpala);
} catch (SQLException sqle) {
}
// We run the following query in Impala because it is faster
logger.info("Creating downloads_stats_impala");
String createDownloadsStatsImpala = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala AS " +
"SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " +
"LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_impala s, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
ConnectDB.getStatsDBSchema() + ".datasource_results dr, " +
ConnectDB.getStatsDBSchema() + ".result_pids ro " +
"WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " +
"s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS'";
stmtImpala.executeUpdate(createDownloadsStatsImpala);
logger.info("Creating downloads_stats_impala");
// Insert into downloads_stats
logger.info("Inserting data from downloads_stats_impala into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats SELECT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_impala";
stmtHive.executeUpdate(insertDStats);
logger.info("Inserted into downloads_stats");
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository_id` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmtHive.executeUpdate(createSushilog);
logger.info("Created sushilog table");
// Insert into sushilog
logger.info("Inserting into sushilog");
String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmtHive.executeUpdate(insertSushiLog);
logger.info("Inserted into sushilog");
stmtHive.close();
ConnectDB.getHiveConnection().close();
}
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
String url, String issn) throws Exception {
logger.info("Processing SARC! issn: " + issn + " with url: " + url);
ConnectDB.getHiveConnection().setAutoCommit(false);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getARReport) Starting period for log download: " + simpleDateFormat.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getARReport) Ending period for log download: " + simpleDateFormat.format(end.getTime()));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, issn);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
// Creating the needed configuration for the correct storing of data
Configuration config = new Configuration();
config.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
config.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
config
.set(
"fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
config
.set(
"fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem dfs = FileSystem.get(config);
while (start.before(end)) {
String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate="
+ simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime());
start.add(Calendar.MONTH, 1);
logger.info("(getARReport) Getting report: " + reportUrl);
String text = getJson(reportUrl);
if (text == null) {
continue;
}
JSONParser parser = new JSONParser();
JSONObject jsonObject = null;
try {
jsonObject = (JSONObject) parser.parse(text);
}
// if there is a parsing error continue with the next url
catch (ParseException pe) {
continue;
}
jsonObject = (JSONObject) jsonObject.get("sc:ReportResponse");
jsonObject = (JSONObject) jsonObject.get("sc:Report");
if (jsonObject == null) {
continue;
}
jsonObject = (JSONObject) jsonObject.get("c:Report");
jsonObject = (JSONObject) jsonObject.get("c:Customer");
Object obj = jsonObject.get("c:ReportItems");
JSONArray jsonArray = new JSONArray();
if (obj instanceof JSONObject) {
jsonArray.add(obj);
} else {
jsonArray = (JSONArray) obj;
// jsonArray = (JSONArray) jsonObject.get("c:ReportItems");
}
if (jsonArray == null) {
continue;
}
// Creating the file in the filesystem for the ItemIdentifier as array object
String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_" +
simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePathArray);
FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true);
// Creating the file in the filesystem for the ItemIdentifier as array object
String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_" +
simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePathNonArray);
FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
renameKeysRecursively(":", jsonObjectRow);
if (jsonObjectRow.get("ItemIdentifier") instanceof JSONObject) {
finNonArray.write(jsonObjectRow.toJSONString().getBytes());
finNonArray.writeChar('\n');
} else {
finArray.write(jsonObjectRow.toJSONString().getBytes());
finArray.writeChar('\n');
}
}
finArray.close();
finNonArray.close();
// Check the file size and if it is too big, delete it
File fileArray = new File(filePathArray);
if (fileArray.length() == 0)
fileArray.delete();
File fileNonArray = new File(filePathNonArray);
if (fileNonArray.length() == 0)
fileNonArray.delete();
}
dfs.close();
ConnectDB.getHiveConnection().close();
}
private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception {
for (Object jjval : givenJsonObj) {
if (jjval instanceof JSONArray)
renameKeysRecursively(delimiter, (JSONArray) jjval);
else if (jjval instanceof JSONObject)
renameKeysRecursively(delimiter, (JSONObject) jjval);
// All other types of vals
else
;
}
}
private void renameKeysRecursively(String delimiter, JSONObject givenJsonObj) throws Exception {
Set<String> jkeys = new HashSet<String>(givenJsonObj.keySet());
for (String jkey : jkeys) {
String[] splitArray = jkey.split(delimiter);
String newJkey = splitArray[splitArray.length - 1];
Object jval = givenJsonObj.get(jkey);
givenJsonObj.remove(jkey);
givenJsonObj.put(newJkey, jval);
if (jval instanceof JSONObject)
renameKeysRecursively(delimiter, (JSONObject) jval);
if (jval instanceof JSONArray) {
renameKeysRecursively(delimiter, (JSONArray) jval);
}
}
}
private String getJson(String url) throws Exception {
// String cred=username+":"+password;
// String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
// Logging error and silently continuing
logger.error("Failed to get URL: " + e);
System.out.println("Failed to get URL: " + e);
// return null;
// throw new Exception("Failed to get URL: " + e.toString(), e);
}
return "";
}
}

View File

@ -0,0 +1,179 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main class for downloading and processing Usage statistics
*
* @author D. Pierrakos, S. Zoupanos
*/
public class UsageStatsExporter {
public UsageStatsExporter() {
}
private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
private void reCreateLogDirs() throws IllegalArgumentException, IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting repoLog directory: " + ExecuteWorkflow.repoLogPath);
dfs.delete(new Path(ExecuteWorkflow.repoLogPath), true);
logger.info("Deleting portalLog directory: " + ExecuteWorkflow.portalLogPath);
dfs.delete(new Path(ExecuteWorkflow.portalLogPath), true);
logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true);
logger.info("Creating repoLog directory: " + ExecuteWorkflow.repoLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.repoLogPath));
logger.info("Creating portalLog directory: " + ExecuteWorkflow.portalLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.portalLogPath));
logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath));
}
public void export() throws Exception {
logger.info("Initialising DB properties");
ConnectDB.init();
// runImpalaQuery();
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(ExecuteWorkflow.repoLogPath, ExecuteWorkflow.portalLogPath);
logger.info("Re-creating database and tables");
if (ExecuteWorkflow.recreateDbAndTables)
piwikstatsdb.recreateDBAndTables();
;
logger.info("Initializing the download logs module");
PiwikDownloadLogs piwd = new PiwikDownloadLogs(ExecuteWorkflow.matomoBaseURL, ExecuteWorkflow.matomoAuthToken);
if (ExecuteWorkflow.piwikEmptyDirs) {
logger.info("Recreating Piwik log directories");
piwikstatsdb.reCreateLogDirs();
}
// Downloading piwik logs (also managing directory creation)
if (ExecuteWorkflow.downloadPiwikLogs) {
logger.info("Downloading piwik logs");
piwd
.GetOpenAIRELogs(
ExecuteWorkflow.repoLogPath,
ExecuteWorkflow.portalLogPath, ExecuteWorkflow.portalMatomoID);
}
logger.info("Downloaded piwik logs");
// Create DB tables, insert/update statistics
String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json";
piwikstatsdb.setCounterRobotsURL(cRobotsUrl);
if (ExecuteWorkflow.processPiwikLogs) {
logger.info("Processing logs");
piwikstatsdb.processLogs();
}
logger.info("Creating LaReferencia tables");
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(ExecuteWorkflow.lareferenciaBaseURL,
ExecuteWorkflow.lareferenciaAuthToken);
if (ExecuteWorkflow.laReferenciaEmptyDirs) {
logger.info("Recreating LaReferencia log directories");
lrf.reCreateLogDirs();
}
if (ExecuteWorkflow.downloadLaReferenciaLogs) {
logger.info("Downloading LaReferencia logs");
lrf.GetLaReferenciaRepos(ExecuteWorkflow.lareferenciaLogPath);
logger.info("Downloaded LaReferencia logs");
}
LaReferenciaStats lastats = new LaReferenciaStats(ExecuteWorkflow.lareferenciaLogPath);
if (ExecuteWorkflow.processLaReferenciaLogs) {
logger.info("Processing LaReferencia logs");
lastats.processLogs();
logger.info("LaReferencia logs done");
}
IrusStats irusstats = new IrusStats(ExecuteWorkflow.irusUKBaseURL);
if (ExecuteWorkflow.irusCreateTablesEmptyDirs) {
logger.info("Creating Irus Stats tables");
irusstats.createTables();
logger.info("Created Irus Stats tables");
logger.info("Re-create log dirs");
irusstats.reCreateLogDirs();
logger.info("Re-created log dirs");
}
if (ExecuteWorkflow.irusDownloadReports) {
irusstats.getIrusRRReport(ExecuteWorkflow.irusUKReportPath);
}
if (ExecuteWorkflow.irusProcessStats) {
irusstats.processIrusStats();
logger.info("Irus done");
}
SarcStats sarcStats = new SarcStats();
if (ExecuteWorkflow.sarcCreateTablesEmptyDirs) {
sarcStats.reCreateLogDirs();
}
if (ExecuteWorkflow.sarcDownloadReports) {
sarcStats.getAndProcessSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
}
if (ExecuteWorkflow.sarcProcessStats) {
sarcStats.processSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
sarcStats.finalizeSarcStats();
}
logger.info("Sarc done");
// finalize usagestats
if (ExecuteWorkflow.finalizeStats) {
piwikstatsdb.finalizeStats();
logger.info("Finalized stats");
}
// Make the tables available to Impala
if (ExecuteWorkflow.finalTablesVisibleToImpala) {
logger.info("Making tables visible to Impala");
invalidateMetadata();
}
logger.info("End");
}
private void invalidateMetadata() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getImpalaConnection().createStatement();
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".pageviews_stats";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
}

View File

@ -0,0 +1,231 @@
[
{
"paramName": "mat",
"paramLongName": "matomoAuthToken",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "mbu",
"paramLongName": "matomoBaseURL",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "rlp",
"paramLongName": "repoLogPath",
"paramDescription": "nameNode of the source cluster",
"paramRequired": true
},
{
"paramName": "plp",
"paramLongName": "portalLogPath",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "pmi",
"paramLongName": "portalMatomoID",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "iukbuw",
"paramLongName": "irusUKBaseURL",
"paramDescription": "working directory",
"paramRequired": true
},
{
"paramName": "iukrp",
"paramLongName": "irusUKReportPath",
"paramDescription": "maximum number of map tasks used in the distcp process",
"paramRequired": true
},
{
"paramName": "srpa",
"paramLongName": "sarcsReportPathArray",
"paramDescription": "memory for distcp action copying actionsets from remote cluster",
"paramRequired": true
},
{
"paramName": "srpna",
"paramLongName": "sarcsReportPathNonArray",
"paramDescription": "timeout for distcp copying actions from remote cluster",
"paramRequired": true
},
{
"paramName": "llp",
"paramLongName": "lareferenciaLogPath",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "lbu",
"paramLongName": "lareferenciaBaseURL",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "lat",
"paramLongName": "lareferenciaAuthToken",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "dbhu",
"paramLongName": "dbHiveUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "dbiu",
"paramLongName": "dbImpalaUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "usdbs",
"paramLongName": "usageStatsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "sdbs",
"paramLongName": "statsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "rdbt",
"paramLongName": "recreateDbAndTables",
"paramDescription": "Re-create database and initial tables?",
"paramRequired": true
},
{
"paramName": "pwed",
"paramLongName": "piwikEmptyDirs",
"paramDescription": "Empty piwik directories?",
"paramRequired": true
},
{
"paramName": "ppwl",
"paramLongName": "processPiwikLogs",
"paramDescription": "Process the piwiklogs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "dpwl",
"paramLongName": "downloadPiwikLogs",
"paramDescription": "download piwik logs?",
"paramRequired": true
},
{
"paramName": "slp",
"paramLongName": "startingLogPeriod",
"paramDescription": "Starting log period",
"paramRequired": true
},
{
"paramName": "elp",
"paramLongName": "endingLogPeriod",
"paramDescription": "Ending log period",
"paramRequired": true
},
{
"paramName": "npidd",
"paramLongName": "numberOfPiwikIdsToDownload",
"paramDescription": "Limit the number of the downloaded piwikids to the first numberOfPiwikIdsToDownload",
"paramRequired": true
},
{
"paramName": "nsidd",
"paramLongName": "numberOfSiteIdsToDownload",
"paramDescription": "Limit the number of the downloaded siteids (La Referencia logs) to the first numberOfSiteIdsToDownload",
"paramRequired": true
},
{
"paramName": "lerd",
"paramLongName": "laReferenciaEmptyDirs",
"paramDescription": "Empty LaReferencia directories?",
"paramRequired": true
},
{
"paramName": "plrl",
"paramLongName": "processLaReferenciaLogs",
"paramDescription": "Process the La Referencia logs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "dlrl",
"paramLongName": "downloadLaReferenciaLogs",
"paramDescription": "download La Referencia logs?",
"paramRequired": true
},
{
"paramName": "icted",
"paramLongName": "irusCreateTablesEmptyDirs",
"paramDescription": "Irus section: Create tables and empty JSON directories?",
"paramRequired": true
},
{
"paramName": "idr",
"paramLongName": "irusDownloadReports",
"paramDescription": "Irus section: Download reports?",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "irusProcessStats",
"paramDescription": "Irus section: Process stats?",
"paramRequired": true
},
{
"paramName": "inod",
"paramLongName": "irusNumberOfOpendoarsToDownload",
"paramDescription": "Limit the number of the downloaded Opendoars (Irus) to the first irusNumberOfOpendoarsToDownload",
"paramRequired": true
},
{
"paramName": "icted",
"paramLongName": "sarcCreateTablesEmptyDirs",
"paramDescription": "Sarc section: Create tables and empty JSON directories?",
"paramRequired": true
},
{
"paramName": "idr",
"paramLongName": "sarcDownloadReports",
"paramDescription": "Sarc section: Download reports?",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "sarcProcessStats",
"paramDescription": "Sarc section: Process stats?",
"paramRequired": true
},
{
"paramName": "inod",
"paramLongName": "sarcNumberOfIssnToDownload",
"paramDescription": "Limit the number of the downloaded ISSN (Sarc) to the first sarcNumberOfIssnToDownload",
"paramRequired": true
},
{
"paramName": "fs",
"paramLongName": "finalizeStats",
"paramDescription": "Create the usage_stats table?",
"paramRequired": true
},
{
"paramName": "ftvi",
"paramLongName": "finalTablesVisibleToImpala",
"paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala",
"paramRequired": true
},
{
"paramName": "nodt",
"paramLongName": "numberOfDownloadThreads",
"paramDescription": "Number of download threads",
"paramRequired": true
}
]

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1</value>
</property>
<property>
<name>impalaJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,90 @@
<workflow-app name="Usage Graph Stats" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>Hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>Hive server jdbc url</description>
</property>
<property>
<name>impalaJdbcUrl</name>
<description>Impala server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="Step1"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name='Step1'>
<java>
<main-class>eu.dnetlib.oa.graph.usagestats.export.ExecuteWorkflow</main-class>
<arg>--matomoAuthToken</arg><arg>${matomoAuthToken}</arg>
<arg>--matomoBaseURL</arg><arg>${matomoBaseURL}</arg>
<arg>--repoLogPath</arg><arg>${repoLogPath}</arg>
<arg>--portalLogPath</arg><arg>${portalLogPath}</arg>
<arg>--portalMatomoID</arg><arg>${portalMatomoID}</arg>
<arg>--irusUKBaseURL</arg><arg>${irusUKBaseURL}</arg>
<arg>--irusUKReportPath</arg><arg>${irusUKReportPath}</arg>
<arg>--sarcsReportPathArray</arg><arg>${sarcsReportPathArray}</arg>
<arg>--sarcsReportPathNonArray</arg><arg>${sarcsReportPathNonArray}</arg>
<arg>--lareferenciaLogPath</arg><arg>${lareferenciaLogPath}</arg>
<arg>--lareferenciaBaseURL</arg><arg>${lareferenciaBaseURL}</arg>
<arg>--lareferenciaAuthToken</arg><arg>${lareferenciaAuthToken}</arg>
<arg>--dbHiveUrl</arg><arg>${hiveJdbcUrl}</arg>
<arg>--dbImpalaUrl</arg><arg>${impalaJdbcUrl}</arg>
<arg>--usageStatsDBSchema</arg><arg>${usageStatsDBSchema}</arg>
<arg>--statsDBSchema</arg><arg>${statsDBSchema}</arg>
<arg>--recreateDbAndTables</arg><arg>${recreateDbAndTables}</arg>
<arg>--piwikEmptyDirs</arg><arg>${piwikEmptyDirs}</arg>
<arg>--downloadPiwikLogs</arg><arg>${downloadPiwikLogs}</arg>
<arg>--processPiwikLogs</arg><arg>${processPiwikLogs}</arg>
<arg>--startingLogPeriod</arg><arg>${startingLogPeriod}</arg>
<arg>--endingLogPeriod</arg><arg>${endingLogPeriod}</arg>
<arg>--numberOfPiwikIdsToDownload</arg><arg>${numberOfPiwikIdsToDownload}</arg>
<arg>--numberOfSiteIdsToDownload</arg><arg>${numberOfSiteIdsToDownload}</arg>
<arg>--laReferenciaEmptyDirs</arg><arg>${laReferenciaEmptyDirs}</arg>
<arg>--downloadLaReferenciaLogs</arg><arg>${downloadLaReferenciaLogs}</arg>
<arg>--processLaReferenciaLogs</arg><arg>${processLaReferenciaLogs}</arg>
<arg>--irusCreateTablesEmptyDirs</arg><arg>${irusCreateTablesEmptyDirs}</arg>
<arg>--irusDownloadReports</arg><arg>${irusDownloadReports}</arg>
<arg>--irusProcessStats</arg><arg>${irusProcessStats}</arg>
<arg>--irusNumberOfOpendoarsToDownload</arg><arg>${irusNumberOfOpendoarsToDownload}</arg>
<arg>--sarcCreateTablesEmptyDirs</arg><arg>${sarcCreateTablesEmptyDirs}</arg>
<arg>--sarcDownloadReports</arg><arg>${sarcDownloadReports}</arg>
<arg>--sarcProcessStats</arg><arg>${sarcProcessStats}</arg>
<arg>--sarcNumberOfIssnToDownload</arg><arg>${sarcNumberOfIssnToDownload}</arg>
<arg>--finalizeStats</arg><arg>${finalizeStats}</arg>
<arg>--finalTablesVisibleToImpala</arg><arg>${finalTablesVisibleToImpala}</arg>
<arg>--numberOfDownloadThreads</arg><arg>${numberOfDownloadThreads}</arg>
<capture-output/>
</java>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>