forked from D-Net/dnet-hadoop
More progress on LaReFerenciaLogs
This commit is contained in:
parent
053588c365
commit
2b2bac9b28
|
@ -1,10 +1,5 @@
|
||||||
package eu.dnetlib.oa.graph.usagestats.export;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
package eu.dnetlib.oa.graph.usagestats.export;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -13,182 +8,211 @@ import java.sql.PreparedStatement;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.json.simple.JSONArray;
|
import org.json.simple.JSONArray;
|
||||||
import org.json.simple.JSONObject;
|
import org.json.simple.JSONObject;
|
||||||
import org.json.simple.parser.JSONParser;
|
import org.json.simple.parser.JSONParser;
|
||||||
|
|
||||||
public class LaReferenciaDownloadLogs {
|
public class LaReferenciaDownloadLogs {
|
||||||
|
|
||||||
private final String piwikUrl;
|
private final String piwikUrl;
|
||||||
private Date startDate;
|
private Date startDate;
|
||||||
private final String tokenAuth;
|
private final String tokenAuth;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
The Piwik's API method
|
* The Piwik's API method
|
||||||
*/
|
*/
|
||||||
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
|
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
|
||||||
private final String format = "&format=json";
|
private final String format = "&format=json";
|
||||||
private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess";
|
private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess";
|
||||||
|
|
||||||
private final Logger log = Logger.getLogger(this.getClass());
|
private final Logger log = Logger.getLogger(this.getClass());
|
||||||
|
|
||||||
public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception {
|
public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception {
|
||||||
this.piwikUrl = piwikUrl;
|
this.piwikUrl = piwikUrl;
|
||||||
this.tokenAuth = tokenAuth;
|
this.tokenAuth = tokenAuth;
|
||||||
this.createTables();
|
this.createTables();
|
||||||
this.createTmpTables();
|
// this.createTmpTables();
|
||||||
}
|
}
|
||||||
private void createTables() throws Exception {
|
|
||||||
try {
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
|
||||||
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
|
||||||
String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
|
||||||
+ " ON INSERT TO lareferencialog "
|
|
||||||
+ " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
|
|
||||||
+ "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
|
|
||||||
+ "FROM lareferencialog "
|
|
||||||
+ "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
|
||||||
String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
|
|
||||||
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
|
||||||
stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
|
|
||||||
stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
|
|
||||||
|
|
||||||
stmt.close();
|
private void createTables() throws Exception {
|
||||||
ConnectDB.getConnection().close();
|
try {
|
||||||
log.info("Lareferencia Tables Created");
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
|
|
||||||
} catch (Exception e) {
|
System.out.println("====> Creating LaReferencia tables");
|
||||||
log.error("Failed to create tables: " + e);
|
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
|
||||||
throw new Exception("Failed to create tables: " + e.toString(), e);
|
ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " +
|
||||||
//System.exit(0);
|
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
|
||||||
}
|
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
|
||||||
}
|
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
|
||||||
|
"stored as orc tblproperties('transactional'='true')";
|
||||||
|
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
||||||
|
System.out.println("====> Created LaReferencia tables");
|
||||||
|
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||||
|
// + " ON INSERT TO lareferencialog "
|
||||||
|
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
|
||||||
|
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
|
||||||
|
// + "FROM lareferencialog "
|
||||||
|
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||||
|
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
|
||||||
|
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
|
||||||
|
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
|
||||||
|
|
||||||
private void createTmpTables() throws Exception {
|
stmt.close();
|
||||||
|
ConnectDB.getConnection().close();
|
||||||
|
log.info("Lareferencia Tables Created");
|
||||||
|
|
||||||
try {
|
} catch (Exception e) {
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
log.error("Failed to create tables: " + e);
|
||||||
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
throw new Exception("Failed to create tables: " + e.toString(), e);
|
||||||
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
// System.exit(0);
|
||||||
+ " ON INSERT TO lareferencialogtmp "
|
}
|
||||||
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
|
}
|
||||||
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
|
|
||||||
+ "FROM lareferencialogtmp "
|
|
||||||
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
|
||||||
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
|
|
||||||
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
|
|
||||||
|
|
||||||
stmt.close();
|
private void createTmpTables() throws Exception {
|
||||||
log.info("Lareferencia Tmp Tables Created");
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
try {
|
||||||
log.error("Failed to create tmptables: " + e);
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
throw new Exception("Failed to create tmp tables: " + e.toString(), e);
|
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
||||||
//System.exit(0);
|
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||||
}
|
+ " ON INSERT TO lareferencialogtmp "
|
||||||
}
|
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
|
||||||
private String getPiwikLogUrl() {
|
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
|
||||||
return piwikUrl + "/";
|
+ "FROM lareferencialogtmp "
|
||||||
}
|
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||||
|
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
|
||||||
|
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
|
||||||
|
|
||||||
private String getJson(String url) throws Exception {
|
stmt.close();
|
||||||
try {
|
log.info("Lareferencia Tmp Tables Created");
|
||||||
URL website = new URL(url);
|
|
||||||
URLConnection connection = website.openConnection();
|
|
||||||
|
|
||||||
StringBuilder response;
|
} catch (Exception e) {
|
||||||
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
log.error("Failed to create tmptables: " + e);
|
||||||
response = new StringBuilder();
|
throw new Exception("Failed to create tmp tables: " + e.toString(), e);
|
||||||
String inputLine;
|
// System.exit(0);
|
||||||
while ((inputLine = in.readLine()) != null) {
|
}
|
||||||
response.append(inputLine);
|
}
|
||||||
response.append("\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return response.toString();
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Failed to get URL: " + e);
|
|
||||||
throw new Exception("Failed to get URL: " + e.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void GetLaReferenciaRepos(String repoLogsPath) throws Exception {
|
private String getPiwikLogUrl() {
|
||||||
|
return piwikUrl + "/";
|
||||||
|
}
|
||||||
|
|
||||||
String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth;
|
private String getJson(String url) throws Exception {
|
||||||
String content = "";
|
try {
|
||||||
|
URL website = new URL(url);
|
||||||
|
URLConnection connection = website.openConnection();
|
||||||
|
|
||||||
content = getJson(baseApiUrl);
|
StringBuilder response;
|
||||||
JSONParser parser = new JSONParser();
|
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
|
||||||
JSONArray jsonArray = (JSONArray) parser.parse(content);
|
response = new StringBuilder();
|
||||||
for (Object aJsonArray : jsonArray) {
|
String inputLine;
|
||||||
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
while ((inputLine = in.readLine()) != null) {
|
||||||
int idSite = Integer.parseInt(jsonObjectRow.get("idsite").toString());
|
response.append(inputLine);
|
||||||
this.GetLaReFerenciaLogs(repoLogsPath, idSite);
|
response.append("\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return response.toString();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to get URL: " + e);
|
||||||
|
throw new Exception("Failed to get URL: " + e.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void GetLaReFerenciaLogs(String repoLogsPath,
|
public void GetLaReferenciaRepos(String repoLogsPath) throws Exception {
|
||||||
int laReferencialMatomoID) throws Exception {
|
|
||||||
|
|
||||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth;
|
||||||
|
String content = "";
|
||||||
|
|
||||||
Calendar start = Calendar.getInstance();
|
content = getJson(baseApiUrl);
|
||||||
start.set(Calendar.YEAR, 2020);
|
JSONParser parser = new JSONParser();
|
||||||
start.set(Calendar.MONTH, Calendar.JANUARY);
|
JSONArray jsonArray = (JSONArray) parser.parse(content);
|
||||||
start.set(Calendar.DAY_OF_MONTH, 1);
|
for (Object aJsonArray : jsonArray) {
|
||||||
|
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
||||||
|
int idSite = Integer.parseInt(jsonObjectRow.get("idsite").toString());
|
||||||
|
this.GetLaReFerenciaLogs(repoLogsPath, idSite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Calendar end = Calendar.getInstance();
|
public void GetLaReFerenciaLogs(String repoLogsPath,
|
||||||
end.add(Calendar.DAY_OF_MONTH, -1);
|
int laReferencialMatomoID) throws Exception {
|
||||||
|
|
||||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
|
||||||
PreparedStatement st = ConnectDB.getConnection().prepareStatement("SELECT max(timestamp) FROM lareferencialog WHERE matomoid=? HAVING max(timestamp) is not null;");
|
|
||||||
st.setInt(1, laReferencialMatomoID);
|
|
||||||
|
|
||||||
ResultSet rs_date = st.executeQuery();
|
Calendar start = Calendar.getInstance();
|
||||||
while (rs_date.next()) {
|
start.set(Calendar.YEAR, 2020);
|
||||||
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") && !rs_date.getString(1).equals("")) {
|
start.set(Calendar.MONTH, Calendar.JANUARY);
|
||||||
start.setTime(sdf.parse(rs_date.getString(1)));
|
start.set(Calendar.DAY_OF_MONTH, 1);
|
||||||
}
|
|
||||||
}
|
|
||||||
rs_date.close();
|
|
||||||
|
|
||||||
for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
|
Calendar end = Calendar.getInstance();
|
||||||
log.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for " + sdf.format(date));
|
end.add(Calendar.DAY_OF_MONTH, -1);
|
||||||
|
|
||||||
String period = "&period=day&date=" + sdf.format(date);
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||||
String outFolder = "";
|
PreparedStatement st = ConnectDB
|
||||||
outFolder = repoLogsPath;
|
.getConnection()
|
||||||
|
.prepareStatement(
|
||||||
|
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() +
|
||||||
|
".lareferencialog WHERE matomoid=? HAVING max(timestamp) is not null");
|
||||||
|
st.setInt(1, laReferencialMatomoID);
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(new Configuration());
|
ResultSet rs_date = st.executeQuery();
|
||||||
|
while (rs_date.next()) {
|
||||||
|
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
|
||||||
|
&& !rs_date.getString(1).equals("")) {
|
||||||
|
start.setTime(sdf.parse(rs_date.getString(1)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rs_date.close();
|
||||||
|
|
||||||
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
|
||||||
String content = "";
|
log
|
||||||
|
.info(
|
||||||
|
"Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for "
|
||||||
|
+ sdf.format(date));
|
||||||
|
|
||||||
int i = 0;
|
String period = "&period=day&date=" + sdf.format(date);
|
||||||
|
String outFolder = "";
|
||||||
|
outFolder = repoLogsPath;
|
||||||
|
|
||||||
while (!content.equals("[]\n")) {
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
|
||||||
FSDataOutputStream fin = fs.create(new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + "_" + i + ".json"), true);
|
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format
|
||||||
String apiUrl = baseApiUrl;
|
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
||||||
|
String content = "";
|
||||||
|
|
||||||
if (i > 0) {
|
int i = 0;
|
||||||
apiUrl += "&filter_offset=" + (i * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
content = getJson(apiUrl);
|
while (!content.equals("[]\n")) {
|
||||||
|
|
||||||
fin.write(content.getBytes());
|
FSDataOutputStream fin = fs
|
||||||
|
.create(
|
||||||
|
new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + "_"
|
||||||
|
+ i + ".json"),
|
||||||
|
true);
|
||||||
|
String apiUrl = baseApiUrl;
|
||||||
|
|
||||||
i++;
|
if (i > 0) {
|
||||||
fin.close();
|
apiUrl += "&filter_offset=" + (i * 1000);
|
||||||
}
|
}
|
||||||
//fin.close();
|
|
||||||
//out.close();
|
|
||||||
|
|
||||||
}
|
content = getJson(apiUrl);
|
||||||
|
|
||||||
// }
|
fin.write(content.getBytes());
|
||||||
}
|
|
||||||
|
i++;
|
||||||
|
fin.close();
|
||||||
|
}
|
||||||
|
// fin.close();
|
||||||
|
// out.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
|
||||||
package eu.dnetlib.oa.graph.usagestats.export;
|
package eu.dnetlib.oa.graph.usagestats.export;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
@ -13,9 +14,9 @@ import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.json.simple.JSONArray;
|
import org.json.simple.JSONArray;
|
||||||
|
@ -24,310 +25,324 @@ import org.json.simple.parser.JSONParser;
|
||||||
|
|
||||||
public class LaReferenciaStats {
|
public class LaReferenciaStats {
|
||||||
|
|
||||||
private String logRepoPath;
|
private String logRepoPath;
|
||||||
|
|
||||||
private Statement stmt = null;
|
private Statement stmt = null;
|
||||||
|
|
||||||
private final Logger log = Logger.getLogger(this.getClass());
|
private final Logger log = Logger.getLogger(this.getClass());
|
||||||
private String CounterRobotsURL;
|
private String CounterRobotsURL;
|
||||||
private ArrayList robotsList;
|
private ArrayList robotsList;
|
||||||
|
|
||||||
public LaReferenciaStats(String logRepoPath) throws Exception {
|
public LaReferenciaStats(String logRepoPath) throws Exception {
|
||||||
this.logRepoPath = logRepoPath;
|
this.logRepoPath = logRepoPath;
|
||||||
this.createTables();
|
this.createTables();
|
||||||
this.createTmpTables();
|
this.createTmpTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
private void connectDB() throws Exception {
|
* private void connectDB() throws Exception { try { ConnectDB connectDB = new ConnectDB(); } catch (Exception e) {
|
||||||
try {
|
* log.error("Connect to db failed: " + e); throw new Exception("Failed to connect to db: " + e.toString(), e); } }
|
||||||
ConnectDB connectDB = new ConnectDB();
|
*/
|
||||||
} catch (Exception e) {
|
private void createTables() throws Exception {
|
||||||
log.error("Connect to db failed: " + e);
|
try {
|
||||||
throw new Exception("Failed to connect to db: " + e.toString(), e);
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
}
|
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
||||||
}
|
String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||||
*/
|
+ " ON INSERT TO lareferencialog "
|
||||||
private void createTables() throws Exception {
|
+ " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
|
||||||
try {
|
+ "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
+ "FROM lareferencialog "
|
||||||
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
+ "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||||
String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
|
||||||
+ " ON INSERT TO lareferencialog "
|
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
||||||
+ " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
|
stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
|
||||||
+ "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
|
stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
|
||||||
+ "FROM lareferencialog "
|
|
||||||
+ "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
|
||||||
String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
|
|
||||||
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
|
||||||
stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
|
|
||||||
stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
|
|
||||||
|
|
||||||
stmt.close();
|
stmt.close();
|
||||||
ConnectDB.getConnection().close();
|
ConnectDB.getConnection().close();
|
||||||
log.info("Lareferencia Tables Created");
|
log.info("Lareferencia Tables Created");
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to create tables: " + e);
|
log.error("Failed to create tables: " + e);
|
||||||
throw new Exception("Failed to create tables: " + e.toString(), e);
|
throw new Exception("Failed to create tables: " + e.toString(), e);
|
||||||
//System.exit(0);
|
// System.exit(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createTmpTables() throws Exception {
|
private void createTmpTables() throws Exception {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
||||||
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||||
+ " ON INSERT TO lareferencialogtmp "
|
+ " ON INSERT TO lareferencialogtmp "
|
||||||
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
|
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
|
||||||
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
|
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
|
||||||
+ "FROM lareferencialogtmp "
|
+ "FROM lareferencialogtmp "
|
||||||
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
||||||
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
|
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
|
||||||
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
|
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
|
||||||
|
|
||||||
stmt.close();
|
stmt.close();
|
||||||
log.info("Lareferencia Tmp Tables Created");
|
log.info("Lareferencia Tmp Tables Created");
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to create tmptables: " + e);
|
log.error("Failed to create tmptables: " + e);
|
||||||
throw new Exception("Failed to create tmp tables: " + e.toString(), e);
|
throw new Exception("Failed to create tmp tables: " + e.toString(), e);
|
||||||
//System.exit(0);
|
// System.exit(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processLogs() throws Exception {
|
public void processLogs() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
processlaReferenciaLog();
|
processlaReferenciaLog();
|
||||||
log.info("LaReferencia repository process done");
|
log.info("LaReferencia repository process done");
|
||||||
removeDoubleClicks();
|
removeDoubleClicks();
|
||||||
log.info("LaReferencia removing double clicks done");
|
log.info("LaReferencia removing double clicks done");
|
||||||
viewsStats();
|
viewsStats();
|
||||||
log.info("LaReferencia views done");
|
log.info("LaReferencia views done");
|
||||||
downloadsStats();
|
downloadsStats();
|
||||||
log.info("LaReferencia downloads done");
|
log.info("LaReferencia downloads done");
|
||||||
updateProdTables();
|
updateProdTables();
|
||||||
log.info("LaReferencia update productions tables done");
|
log.info("LaReferencia update productions tables done");
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to process logs: " + e);
|
log.error("Failed to process logs: " + e);
|
||||||
throw new Exception("Failed to process logs: " + e.toString(), e);
|
throw new Exception("Failed to process logs: " + e.toString(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processlaReferenciaLog() throws Exception {
|
public void processlaReferenciaLog() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
ConnectDB.getConnection().setAutoCommit(false);
|
ConnectDB.getConnection().setAutoCommit(false);
|
||||||
ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
|
ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
|
||||||
|
|
||||||
//File dir = new File(this.logRepoPath);
|
// File dir = new File(this.logRepoPath);
|
||||||
//File[] jsonFiles = dir.listFiles();
|
// File[] jsonFiles = dir.listFiles();
|
||||||
PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO lareferencialogtmp (matomoid, source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?,?)");
|
PreparedStatement prepStatem = ConnectDB
|
||||||
int batch_size = 0;
|
.getConnection()
|
||||||
|
.prepareStatement(
|
||||||
|
"INSERT INTO lareferencialogtmp (matomoid, source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?,?)");
|
||||||
|
int batch_size = 0;
|
||||||
|
|
||||||
JSONParser parser = new JSONParser();
|
JSONParser parser = new JSONParser();
|
||||||
for (String jsonFile : jsonFiles) {
|
for (String jsonFile : jsonFiles) {
|
||||||
System.out.println(jsonFile);
|
System.out.println(jsonFile);
|
||||||
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
|
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
|
||||||
for (Object aJsonArray : jsonArray) {
|
for (Object aJsonArray : jsonArray) {
|
||||||
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
||||||
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
|
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
|
||||||
String idVisit = jsonObjectRow.get("idVisit").toString();
|
String idVisit = jsonObjectRow.get("idVisit").toString();
|
||||||
String country = jsonObjectRow.get("country").toString();
|
String country = jsonObjectRow.get("country").toString();
|
||||||
String referrerName = jsonObjectRow.get("referrerName").toString();
|
String referrerName = jsonObjectRow.get("referrerName").toString();
|
||||||
String agent = jsonObjectRow.get("browser").toString();
|
String agent = jsonObjectRow.get("browser").toString();
|
||||||
String sourceItemType = "repItem";
|
String sourceItemType = "repItem";
|
||||||
|
|
||||||
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
|
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
|
||||||
for (Object actionDetail : actionDetails) {
|
for (Object actionDetail : actionDetails) {
|
||||||
JSONObject actionDetailsObj = (JSONObject) actionDetail;
|
JSONObject actionDetailsObj = (JSONObject) actionDetail;
|
||||||
|
|
||||||
if (actionDetailsObj.get("customVariables") != null) {
|
if (actionDetailsObj.get("customVariables") != null) {
|
||||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
|
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||||
Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
|
Timestamp timestamp = new Timestamp(
|
||||||
String url = actionDetailsObj.get("url").toString();
|
Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
|
||||||
String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
|
String url = actionDetailsObj.get("url").toString();
|
||||||
String opendoar = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("2")).get("customVariablePageValue2").toString();
|
String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1"))
|
||||||
String action = actionDetailsObj.get("type").toString();
|
.get("customVariablePageValue1")
|
||||||
prepStatem.setInt(1, idSite);
|
.toString();
|
||||||
prepStatem.setString(2, "opendoar____::" + opendoar);
|
String opendoar = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("2"))
|
||||||
prepStatem.setString(3, idVisit);
|
.get("customVariablePageValue2")
|
||||||
prepStatem.setString(4, country);
|
.toString();
|
||||||
prepStatem.setString(5, action);
|
String action = actionDetailsObj.get("type").toString();
|
||||||
prepStatem.setString(6, url);
|
prepStatem.setInt(1, idSite);
|
||||||
prepStatem.setString(7, oaipmh);
|
prepStatem.setString(2, "opendoar____::" + opendoar);
|
||||||
prepStatem.setString(8, sourceItemType);
|
prepStatem.setString(3, idVisit);
|
||||||
prepStatem.setString(9, simpleDateFormat.format(timestamp));
|
prepStatem.setString(4, country);
|
||||||
prepStatem.setString(10, referrerName);
|
prepStatem.setString(5, action);
|
||||||
prepStatem.setString(11, agent);
|
prepStatem.setString(6, url);
|
||||||
//prepStatem.setString(11, );
|
prepStatem.setString(7, oaipmh);
|
||||||
prepStatem.addBatch();
|
prepStatem.setString(8, sourceItemType);
|
||||||
batch_size++;
|
prepStatem.setString(9, simpleDateFormat.format(timestamp));
|
||||||
if (batch_size == 10000) {
|
prepStatem.setString(10, referrerName);
|
||||||
prepStatem.executeBatch();
|
prepStatem.setString(11, agent);
|
||||||
ConnectDB.getConnection().commit();
|
// prepStatem.setString(11, );
|
||||||
batch_size = 0;
|
prepStatem.addBatch();
|
||||||
}
|
batch_size++;
|
||||||
}
|
if (batch_size == 10000) {
|
||||||
}
|
prepStatem.executeBatch();
|
||||||
}
|
ConnectDB.getConnection().commit();
|
||||||
}
|
batch_size = 0;
|
||||||
try {
|
}
|
||||||
prepStatem.executeBatch();
|
}
|
||||||
ConnectDB.getConnection().commit();
|
}
|
||||||
stmt.close();
|
}
|
||||||
} catch (Exception e) {
|
}
|
||||||
|
try {
|
||||||
|
prepStatem.executeBatch();
|
||||||
|
ConnectDB.getConnection().commit();
|
||||||
|
stmt.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
|
||||||
if (e instanceof java.sql.SQLException) {
|
if (e instanceof java.sql.SQLException) {
|
||||||
java.sql.SQLException ne = ((java.sql.SQLException) e).getNextException();
|
java.sql.SQLException ne = ((java.sql.SQLException) e).getNextException();
|
||||||
System.out.println(ne.getMessage());
|
System.out.println(ne.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeDoubleClicks() throws Exception {
|
public void removeDoubleClicks() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
ConnectDB.getConnection().setAutoCommit(false);
|
ConnectDB.getConnection().setAutoCommit(false);
|
||||||
|
|
||||||
//clean download double clicks
|
// clean download double clicks
|
||||||
String sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
String sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
stmt.close();
|
stmt.close();
|
||||||
ConnectDB.getConnection().commit();
|
ConnectDB.getConnection().commit();
|
||||||
|
|
||||||
stmt = ConnectDB.getConnection().createStatement();
|
stmt = ConnectDB.getConnection().createStatement();
|
||||||
|
|
||||||
//clean view double clicks
|
// clean view double clicks
|
||||||
sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
stmt.close();
|
stmt.close();
|
||||||
ConnectDB.getConnection().commit();
|
ConnectDB.getConnection().commit();
|
||||||
//conn.close();
|
// conn.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void viewsStats() throws Exception {
|
public void viewsStats() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
ConnectDB.getConnection().setAutoCommit(false);
|
ConnectDB.getConnection().setAutoCommit(false);
|
||||||
|
|
||||||
//String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
// String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as
|
||||||
String sql = "CREATE OR REPLACE VIEW la_result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
// views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS
|
||||||
stmt.executeUpdate(sql);
|
// VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='action' and (source_item_type='oaItem'
|
||||||
|
// or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
||||||
|
String sql = "CREATE OR REPLACE VIEW la_result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
||||||
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date,
|
||||||
sql = "CREATE TABLE IF NOT EXISTS la_views_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM la_result_views_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
// max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p,
|
||||||
stmt.executeUpdate(sql);
|
// datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by
|
||||||
|
// repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
||||||
|
sql = "CREATE TABLE IF NOT EXISTS la_views_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM la_result_views_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
||||||
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
stmt.close();
|
stmt.close();
|
||||||
ConnectDB.getConnection().commit();
|
ConnectDB.getConnection().commit();
|
||||||
ConnectDB.getConnection().close();
|
ConnectDB.getConnection().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void downloadsStats() throws Exception {
|
private void downloadsStats() throws Exception {
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
ConnectDB.getConnection().setAutoCommit(false);
|
ConnectDB.getConnection().setAutoCommit(false);
|
||||||
|
|
||||||
//String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
// String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as
|
||||||
String sql = "CREATE OR REPLACE VIEW la_result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
// downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS
|
||||||
stmt.executeUpdate(sql);
|
// VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='download' and
|
||||||
|
// (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id,
|
||||||
|
// month;";
|
||||||
|
String sql = "CREATE OR REPLACE VIEW la_result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
||||||
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
//sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date,
|
||||||
|
// max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro
|
||||||
|
// where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY
|
||||||
|
// repository_id, result_id, date;";
|
||||||
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
||||||
sql = "CREATE TABLE IF NOT EXISTS la_downloads_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM la_result_downloads_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
sql = "CREATE TABLE IF NOT EXISTS la_downloads_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM la_result_downloads_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
stmt.close();
|
stmt.close();
|
||||||
ConnectDB.getConnection().commit();
|
ConnectDB.getConnection().commit();
|
||||||
ConnectDB.getConnection().close();
|
ConnectDB.getConnection().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateProdTables() throws SQLException, Exception {
|
||||||
|
|
||||||
private void updateProdTables() throws SQLException, Exception {
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
||||||
|
ConnectDB.getConnection().setAutoCommit(false);
|
||||||
|
String sql = "insert into lareferencialog select * from lareferencialogtmp;";
|
||||||
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
Statement stmt = ConnectDB.getConnection().createStatement();
|
sql = "insert into views_stats select * from la_views_stats_tmp;";
|
||||||
ConnectDB.getConnection().setAutoCommit(false);
|
stmt.executeUpdate(sql);
|
||||||
String sql = "insert into lareferencialog select * from lareferencialogtmp;";
|
|
||||||
stmt.executeUpdate(sql);
|
|
||||||
|
|
||||||
sql = "insert into views_stats select * from la_views_stats_tmp;";
|
sql = "insert into public.views_stats select * from la_views_stats_tmp;";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
sql = "insert into public.views_stats select * from la_views_stats_tmp;";
|
sql = "insert into downloads_stats select * from la_downloads_stats_tmp;";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
sql = "insert into downloads_stats select * from la_downloads_stats_tmp;";
|
sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
|
||||||
stmt.executeUpdate(sql);
|
stmt.executeUpdate(sql);
|
||||||
|
|
||||||
sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
|
stmt.close();
|
||||||
stmt.executeUpdate(sql);
|
ConnectDB.getConnection().commit();
|
||||||
|
ConnectDB.getConnection().close();
|
||||||
|
|
||||||
stmt.close();
|
}
|
||||||
ConnectDB.getConnection().commit();
|
|
||||||
ConnectDB.getConnection().close();
|
|
||||||
|
|
||||||
}
|
private ArrayList<String> listHdfsDir(String dir) throws Exception {
|
||||||
|
FileSystem hdfs = FileSystem.get(new Configuration());
|
||||||
|
RemoteIterator<LocatedFileStatus> Files;
|
||||||
|
ArrayList<String> fileNames = new ArrayList<>();
|
||||||
|
|
||||||
private ArrayList<String> listHdfsDir(String dir) throws Exception {
|
try {
|
||||||
FileSystem hdfs = FileSystem.get(new Configuration());
|
Path exportPath = new Path(hdfs.getUri() + dir);
|
||||||
RemoteIterator<LocatedFileStatus> Files;
|
Files = hdfs.listFiles(exportPath, false);
|
||||||
ArrayList<String> fileNames = new ArrayList<>();
|
while (Files.hasNext()) {
|
||||||
|
String fileName = Files.next().getPath().toString();
|
||||||
|
// log.info("Found hdfs file " + fileName);
|
||||||
|
fileNames.add(fileName);
|
||||||
|
}
|
||||||
|
// hdfs.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
|
||||||
|
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
return fileNames;
|
||||||
Path exportPath = new Path(hdfs.getUri() + dir);
|
}
|
||||||
Files = hdfs.listFiles(exportPath, false);
|
|
||||||
while (Files.hasNext()) {
|
|
||||||
String fileName = Files.next().getPath().toString();
|
|
||||||
//log.info("Found hdfs file " + fileName);
|
|
||||||
fileNames.add(fileName);
|
|
||||||
}
|
|
||||||
//hdfs.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
|
|
||||||
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return fileNames;
|
private String readHDFSFile(String filename) throws Exception {
|
||||||
}
|
String result;
|
||||||
|
try {
|
||||||
|
|
||||||
private String readHDFSFile(String filename) throws Exception {
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
String result;
|
// log.info("reading file : " + filename);
|
||||||
try {
|
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(new Configuration());
|
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
|
||||||
//log.info("reading file : " + filename);
|
|
||||||
|
|
||||||
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
|
StringBuilder sb = new StringBuilder();
|
||||||
|
String line = br.readLine();
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
while (line != null) {
|
||||||
String line = br.readLine();
|
if (!line.equals("[]")) {
|
||||||
|
sb.append(line);
|
||||||
|
}
|
||||||
|
// sb.append(line);
|
||||||
|
line = br.readLine();
|
||||||
|
}
|
||||||
|
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
|
||||||
|
if (result.equals("")) {
|
||||||
|
result = "[]";
|
||||||
|
}
|
||||||
|
|
||||||
while (line != null) {
|
// fs.close();
|
||||||
if (!line.equals("[]")) {
|
} catch (Exception e) {
|
||||||
sb.append(line);
|
log.error(e);
|
||||||
}
|
throw new Exception(e);
|
||||||
//sb.append(line);
|
}
|
||||||
line = br.readLine();
|
|
||||||
}
|
|
||||||
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
|
|
||||||
if (result.equals("")) {
|
|
||||||
result = "[]";
|
|
||||||
}
|
|
||||||
|
|
||||||
//fs.close();
|
return result;
|
||||||
} catch (Exception e) {
|
}
|
||||||
log.error(e);
|
|
||||||
throw new Exception(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,21 +57,22 @@ public class UsageStatsExporter {
|
||||||
// piwikstatsdb.processLogs();
|
// piwikstatsdb.processLogs();
|
||||||
log.info("process logs done");
|
log.info("process logs done");
|
||||||
|
|
||||||
// LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(lareferenciaBaseURL,lareferenciaAuthToken);
|
System.out.println("====> Creating LaReferencia tables");
|
||||||
// lrf.GetLaReferenciaRepos(lareferenciaLogPath);
|
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(lareferenciaBaseURL, lareferenciaAuthToken);
|
||||||
|
lrf.GetLaReferenciaRepos(lareferenciaLogPath);
|
||||||
// LaReferenciaStats lastats = new LaReferenciaStats(lareferenciaLogPath);
|
// LaReferenciaStats lastats = new LaReferenciaStats(lareferenciaLogPath);
|
||||||
// lastats.processLogs();
|
// lastats.processLogs();
|
||||||
// log.info("LaReferencia logs done");
|
// log.info("LaReferencia logs done");
|
||||||
|
|
||||||
IrusStats irusstats = new IrusStats(irusUKBaseURL);
|
// IrusStats irusstats = new IrusStats(irusUKBaseURL);
|
||||||
// irusstats.getIrusRRReport(irusUKReportPath);
|
// irusstats.getIrusRRReport(irusUKReportPath);
|
||||||
|
|
||||||
// irusstats.processIrusStats();
|
// irusstats.processIrusStats();
|
||||||
// log.info("irus done");
|
// log.info("irus done");
|
||||||
|
|
||||||
SarcStats sarcStats = new SarcStats();
|
// SarcStats sarcStats = new SarcStats();
|
||||||
// sarcStats.getAndProcessSarc(sarcsReportPathArray, sarcsReportPathNonArray);
|
// sarcStats.getAndProcessSarc(sarcsReportPathArray, sarcsReportPathNonArray);
|
||||||
sarcStats.finalizeSarcStats();
|
// sarcStats.finalizeSarcStats();
|
||||||
// log.info("sarc done");
|
// log.info("sarc done");
|
||||||
|
|
||||||
// // finalize usagestats
|
// // finalize usagestats
|
||||||
|
|
Loading…
Reference in New Issue