2020-09-19 23:00:59 +02:00
2020-09-19 23:59:33 +02:00
package eu.dnetlib.oa.graph.usagestats.export ;
2020-09-19 23:00:59 +02:00
import java.io.* ;
import java.net.URL ;
import java.net.URLConnection ;
import java.sql.PreparedStatement ;
import java.sql.ResultSet ;
import java.sql.Statement ;
import java.text.SimpleDateFormat ;
2020-10-04 16:03:01 +02:00
import java.util.ArrayList ;
2020-09-19 23:00:59 +02:00
import java.util.Calendar ;
2020-09-19 23:59:33 +02:00
import java.util.Date ;
2020-10-04 16:03:01 +02:00
import java.util.List ;
2020-09-19 23:59:33 +02:00
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.FSDataOutputStream ;
import org.apache.hadoop.fs.FileSystem ;
import org.apache.hadoop.fs.Path ;
2020-09-19 23:00:59 +02:00
import org.json.simple.JSONArray ;
import org.json.simple.JSONObject ;
import org.json.simple.parser.JSONParser ;
2020-10-02 15:25:21 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2020-09-19 23:00:59 +02:00
2020-10-02 15:25:21 +02:00
/ * *
* @author D . Pierrakos , S . Zoupanos
* /
2020-09-19 23:00:59 +02:00
public class LaReferenciaDownloadLogs {
2020-09-19 23:59:33 +02:00
private final String piwikUrl ;
private Date startDate ;
private final String tokenAuth ;
/ *
* The Piwik ' s API method
* /
private final String APImethod = " ?module=API&method=Live.getLastVisitsDetails " ;
private final String format = " &format=json " ;
private final String ApimethodGetAllSites = " ?module=API&method=SitesManager.getSitesWithViewAccess " ;
2020-10-02 15:25:21 +02:00
private static final Logger logger = LoggerFactory . getLogger ( LaReferenciaDownloadLogs . class ) ;
2020-09-19 23:59:33 +02:00
public LaReferenciaDownloadLogs ( String piwikUrl , String tokenAuth ) throws Exception {
this . piwikUrl = piwikUrl ;
this . tokenAuth = tokenAuth ;
this . createTables ( ) ;
// this.createTmpTables();
}
private void createTables ( ) throws Exception {
try {
2020-09-27 12:19:45 +02:00
Statement stmt = ConnectDB . getHiveConnection ( ) . createStatement ( ) ;
2020-09-19 23:59:33 +02:00
2020-10-02 15:25:21 +02:00
logger . info ( " Creating LaReferencia tables " ) ;
2020-09-19 23:59:33 +02:00
String sqlCreateTableLareferenciaLog = " CREATE TABLE IF NOT EXISTS " +
ConnectDB . getUsageStatsDBSchema ( ) + " .lareferencialog(matomoid INT, " +
" source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
" source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
" clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
" stored as orc tblproperties('transactional'='true') " ;
stmt . executeUpdate ( sqlCreateTableLareferenciaLog ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Created LaReferencia tables " ) ;
2020-09-19 23:59:33 +02:00
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialog "
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
// + "FROM lareferencialog "
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt . close ( ) ;
2020-09-27 12:19:45 +02:00
ConnectDB . getHiveConnection ( ) . close ( ) ;
2020-10-02 15:25:21 +02:00
logger . info ( " Lareferencia Tables Created " ) ;
2020-09-19 23:59:33 +02:00
} catch ( Exception e ) {
2020-10-02 15:25:21 +02:00
logger . error ( " Failed to create tables: " + e ) ;
2020-09-19 23:59:33 +02:00
throw new Exception ( " Failed to create tables: " + e . toString ( ) , e ) ;
// System.exit(0);
}
}
2020-09-20 10:27:27 +02:00
// private void createTmpTables() throws Exception {
//
// try {
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
// String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialogtmp "
// + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
// + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
// + "FROM lareferencialogtmp "
// + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
// stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
//
// stmt.close();
// log.info("Lareferencia Tmp Tables Created");
//
// } catch (Exception e) {
// log.error("Failed to create tmptables: " + e);
// throw new Exception("Failed to create tmp tables: " + e.toString(), e);
// // System.exit(0);
// }
// }
2020-09-19 23:59:33 +02:00
private String getPiwikLogUrl ( ) {
return piwikUrl + " / " ;
}
private String getJson ( String url ) throws Exception {
try {
URL website = new URL ( url ) ;
URLConnection connection = website . openConnection ( ) ;
StringBuilder response ;
try ( BufferedReader in = new BufferedReader ( new InputStreamReader ( connection . getInputStream ( ) ) ) ) {
response = new StringBuilder ( ) ;
String inputLine ;
while ( ( inputLine = in . readLine ( ) ) ! = null ) {
response . append ( inputLine ) ;
2020-09-20 11:56:04 +02:00
// response.append("\n");
2020-09-19 23:59:33 +02:00
}
}
2020-09-20 11:56:04 +02:00
2020-09-19 23:59:33 +02:00
return response . toString ( ) ;
} catch ( Exception e ) {
2020-10-02 15:25:21 +02:00
logger . error ( " Failed to get URL: " + e ) ;
2020-09-19 23:59:33 +02:00
throw new Exception ( " Failed to get URL: " + e . toString ( ) , e ) ;
}
}
public void GetLaReferenciaRepos ( String repoLogsPath ) throws Exception {
String baseApiUrl = getPiwikLogUrl ( ) + ApimethodGetAllSites + format + " &token_auth= " + this . tokenAuth ;
String content = " " ;
2020-10-06 22:44:25 +02:00
List < Integer > siteIdsToVisit = new ArrayList < Integer > ( ) ;
2020-10-04 16:03:01 +02:00
// Getting all the siteIds in a list for logging reasons & limiting the list
// to the max number of siteIds
2020-09-19 23:59:33 +02:00
content = getJson ( baseApiUrl ) ;
JSONParser parser = new JSONParser ( ) ;
JSONArray jsonArray = ( JSONArray ) parser . parse ( content ) ;
for ( Object aJsonArray : jsonArray ) {
JSONObject jsonObjectRow = ( JSONObject ) aJsonArray ;
2020-10-06 22:44:25 +02:00
siteIdsToVisit . add ( Integer . parseInt ( jsonObjectRow . get ( " idsite " ) . toString ( ) ) ) ;
2020-10-04 16:03:01 +02:00
}
2020-10-06 22:44:25 +02:00
logger . info ( " Found the following siteIds for download: " + siteIdsToVisit ) ;
2020-10-04 16:03:01 +02:00
if ( ExecuteWorkflow . numberOfPiwikIdsToDownload > 0 & &
2020-10-06 22:44:25 +02:00
ExecuteWorkflow . numberOfPiwikIdsToDownload < = siteIdsToVisit . size ( ) ) {
2020-10-04 16:03:01 +02:00
logger . info ( " Trimming siteIds list to the size of: " + ExecuteWorkflow . numberOfPiwikIdsToDownload ) ;
2020-10-06 22:44:25 +02:00
siteIdsToVisit = siteIdsToVisit . subList ( 0 , ExecuteWorkflow . numberOfPiwikIdsToDownload ) ;
2020-10-04 16:03:01 +02:00
}
2020-10-06 22:44:25 +02:00
logger . info ( " Downloading from repos with the followins siteIds: " + siteIdsToVisit ) ;
2020-10-04 16:03:01 +02:00
2020-10-06 22:44:25 +02:00
for ( int siteId : siteIdsToVisit ) {
2020-10-04 16:03:01 +02:00
logger . info ( " Now working on piwikId: " + siteId ) ;
this . GetLaReFerenciaLogs ( repoLogsPath , siteId ) ;
2020-09-19 23:59:33 +02:00
}
}
public void GetLaReFerenciaLogs ( String repoLogsPath ,
int laReferencialMatomoID ) throws Exception {
2020-10-02 15:25:21 +02:00
logger . info ( " Downloading logs for LaReferencia repoid " + laReferencialMatomoID ) ;
2020-09-20 11:56:04 +02:00
2020-10-04 16:03:01 +02:00
SimpleDateFormat sdf = new SimpleDateFormat ( " yyyy-MM-dd " ) ;
// Setting the starting period
2020-10-05 18:09:31 +02:00
Calendar start = ( Calendar ) ExecuteWorkflow . startingLogPeriod . clone ( ) ;
2020-10-04 16:03:01 +02:00
logger . info ( " Starting period for log download: " + sdf . format ( start . getTime ( ) ) ) ;
2020-09-19 23:59:33 +02:00
2020-10-04 16:03:01 +02:00
// Setting the ending period (last day of the month)
2020-10-05 18:09:31 +02:00
Calendar end = ( Calendar ) ExecuteWorkflow . endingLogPeriod . clone ( ) ;
2020-10-04 16:03:01 +02:00
end . add ( Calendar . MONTH , + 1 ) ;
2020-09-19 23:59:33 +02:00
end . add ( Calendar . DAY_OF_MONTH , - 1 ) ;
2020-10-05 18:09:31 +02:00
logger . info ( " Ending period for log download: " + sdf . format ( end . getTime ( ) ) ) ;
2020-09-19 23:59:33 +02:00
PreparedStatement st = ConnectDB
2020-09-27 12:19:45 +02:00
. getHiveConnection ( )
2020-09-19 23:59:33 +02:00
. prepareStatement (
" SELECT max(timestamp) FROM " + ConnectDB . getUsageStatsDBSchema ( ) +
2020-09-20 10:27:27 +02:00
" .lareferencialog WHERE matomoid=? GROUP BY timestamp HAVING max(timestamp) is not null " ) ;
2020-09-19 23:59:33 +02:00
st . setInt ( 1 , laReferencialMatomoID ) ;
ResultSet rs_date = st . executeQuery ( ) ;
while ( rs_date . next ( ) ) {
if ( rs_date . getString ( 1 ) ! = null & & ! rs_date . getString ( 1 ) . equals ( " null " )
& & ! rs_date . getString ( 1 ) . equals ( " " ) ) {
start . setTime ( sdf . parse ( rs_date . getString ( 1 ) ) ) ;
}
}
rs_date . close ( ) ;
2020-10-04 16:03:01 +02:00
for ( Calendar currDay = ( Calendar ) start . clone ( ) ; currDay . before ( end ) ; currDay . add ( Calendar . DATE , 1 ) ) {
Date date = currDay . getTime ( ) ;
2020-10-02 15:25:21 +02:00
logger
2020-09-19 23:59:33 +02:00
. info (
" Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for "
+ sdf . format ( date ) ) ;
2020-09-20 11:56:04 +02:00
2020-09-19 23:59:33 +02:00
String period = " &period=day&date= " + sdf . format ( date ) ;
String outFolder = " " ;
outFolder = repoLogsPath ;
FileSystem fs = FileSystem . get ( new Configuration ( ) ) ;
2020-09-20 11:56:04 +02:00
FSDataOutputStream fin = fs
. create (
new Path ( outFolder + " / " + laReferencialMatomoID + " _LaRefPiwiklog " + sdf . format ( ( date ) ) + " .json " ) ,
true ) ;
2020-09-19 23:59:33 +02:00
String baseApiUrl = getPiwikLogUrl ( ) + APImethod + " &idSite= " + laReferencialMatomoID + period + format
+ " &expanded=5&filter_limit=1000&token_auth= " + tokenAuth ;
String content = " " ;
int i = 0 ;
2020-09-20 11:56:04 +02:00
JSONParser parser = new JSONParser ( ) ;
2020-10-15 21:08:24 +02:00
do {
2020-09-19 23:59:33 +02:00
String apiUrl = baseApiUrl ;
if ( i > 0 ) {
apiUrl + = " &filter_offset= " + ( i * 1000 ) ;
}
content = getJson ( apiUrl ) ;
2020-10-15 21:08:24 +02:00
if ( content . length ( ) = = 0 | | content . equals ( " [] " ) )
break ;
2020-09-19 23:59:33 +02:00
2020-09-20 11:56:04 +02:00
JSONArray jsonArray = ( JSONArray ) parser . parse ( content ) ;
for ( Object aJsonArray : jsonArray ) {
JSONObject jsonObjectRaw = ( JSONObject ) aJsonArray ;
fin . write ( jsonObjectRaw . toJSONString ( ) . getBytes ( ) ) ;
fin . writeChar ( '\n' ) ;
}
2020-09-19 23:59:33 +02:00
2020-10-02 15:25:21 +02:00
logger
. info (
" Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID
2020-09-20 11:56:04 +02:00
+ " and for "
+ sdf . format ( date ) ) ;
2020-09-19 23:59:33 +02:00
i + + ;
2020-10-15 21:08:24 +02:00
} while ( true ) ;
2020-09-20 11:56:04 +02:00
fin . close ( ) ;
2020-09-19 23:59:33 +02:00
}
}
2020-09-19 23:00:59 +02:00
}