More progress on adding piwiklogtmp to the code

This commit is contained in:
Spyros Zoupanos 2020-09-01 19:05:38 +03:00
parent f3dda9858c
commit 293d6accd4
1 changed files with 116 additions and 75 deletions

View File

@ -198,81 +198,122 @@ public class PiwikStatsDB {
Statement stmt = ConnectDB.getConnection().createStatement(); Statement stmt = ConnectDB.getConnection().createStatement();
ConnectDB.getConnection().setAutoCommit(false); ConnectDB.getConnection().setAutoCommit(false);
ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath); String stm_piwiklogtmp_json =
// File dir = new File(this.logRepoPath); "CREATE EXTERNAL TABLE IF NOT EXISTS " +
// File[] jsonFiles = dir.listFiles(); ConnectDB.getUsageStatsDBSchema() +
".piwiklogtmp_json(\n" +
" `idSite` STRING,\n" +
" `idVisit` STRING,\n" +
" `country` STRING,\n" +
" `referrerName` STRING,\n" +
" `browser` STRING,\n" +
" `actionDetails` ARRAY<\n" +
" struct<\n" +
" type: STRING,\n" +
" url: STRING,\n" +
" `customVariables`: struct<\n" +
" `1`: struct<\n" +
" `customVariablePageValue1`: STRING\n" +
" >\n" +
" >,\n" +
" timestamp: String\n" +
" >\n" +
" >\n" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '/user/spyros/logs/usage_stats_logs/Repologs2/'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\");\n" +
"";
stmt.executeUpdate(stm_piwiklogtmp_json);
PreparedStatement prepStatem = ConnectDB
.getConnection()
.prepareStatement(
"INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
int batch_size = 0;
JSONParser parser = new JSONParser();
for (String jsonFile : jsonFiles) {
System.out.println(jsonFile);
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
String idVisit = jsonObjectRow.get("idVisit").toString();
String country = jsonObjectRow.get("country").toString();
String referrerName = jsonObjectRow.get("referrerName").toString();
String agent = jsonObjectRow.get("browser").toString();
boolean botFound = false;
Iterator it = robotsList.iterator();
while (it.hasNext()) {
// Create a Pattern object
Pattern r = Pattern.compile(it.next().toString());
// Now create matcher object.
Matcher m = r.matcher(agent);
if (m.find()) {
// System.out.println("Found value: " + m.group(0));
botFound = true;
break;
}
}
if (botFound == false) {
String sourceItemType = "repItem";
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails")); String stm_piwiklogtmp =
for (Object actionDetail : actionDetails) { "CREATE TABLE " +
JSONObject actionDetailsObj = (JSONObject) actionDetail; ConnectDB.getUsageStatsDBSchema() +
".piwiklogtmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " +
"entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true');";
stmt.executeUpdate(processRepositoryLog);
if (actionDetailsObj.get("customVariables") != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Timestamp timestamp = new Timestamp(
Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
String url = actionDetailsObj.get("url").toString();
String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables"))
.get("1")).get("customVariablePageValue1").toString();
String action = actionDetailsObj.get("type").toString();
prepStatem.setInt(1, idSite);
prepStatem.setString(2, idVisit);
prepStatem.setString(3, country);
prepStatem.setString(4, action);
prepStatem.setString(5, url);
prepStatem.setString(6, oaipmh);
prepStatem.setString(7, sourceItemType);
prepStatem.setString(8, simpleDateFormat.format(timestamp));
prepStatem.setString(9, referrerName);
prepStatem.setString(10, agent);
prepStatem.addBatch();
batch_size++;
if (batch_size == 10000) {
prepStatem.executeBatch();
ConnectDB.getConnection().commit();
batch_size = 0;
}
}
}
}
}
}
prepStatem.executeBatch();
ConnectDB.getConnection().commit();
stmt.close(); stmt.close();
// ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
//// File dir = new File(this.logRepoPath);
//// File[] jsonFiles = dir.listFiles();
//
//
// PreparedStatement prepStatem = ConnectDB
// .getConnection()
// .prepareStatement(
// "INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
// int batch_size = 0;
// JSONParser parser = new JSONParser();
// for (String jsonFile : jsonFiles) {
// System.out.println(jsonFile);
// JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
// for (Object aJsonArray : jsonArray) {
// JSONObject jsonObjectRow = (JSONObject) aJsonArray;
// int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
// String idVisit = jsonObjectRow.get("idVisit").toString();
// String country = jsonObjectRow.get("country").toString();
// String referrerName = jsonObjectRow.get("referrerName").toString();
// String agent = jsonObjectRow.get("browser").toString();
// boolean botFound = false;
// Iterator it = robotsList.iterator();
// while (it.hasNext()) {
// // Create a Pattern object
// Pattern r = Pattern.compile(it.next().toString());
// // Now create matcher object.
// Matcher m = r.matcher(agent);
// if (m.find()) {
// // System.out.println("Found value: " + m.group(0));
// botFound = true;
// break;
// }
// }
// if (botFound == false) {
// String sourceItemType = "repItem";
//
// JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
// for (Object actionDetail : actionDetails) {
// JSONObject actionDetailsObj = (JSONObject) actionDetail;
//
// if (actionDetailsObj.get("customVariables") != null) {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
// Timestamp timestamp = new Timestamp(
// Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
// String url = actionDetailsObj.get("url").toString();
// String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables"))
// .get("1")).get("customVariablePageValue1").toString();
// String action = actionDetailsObj.get("type").toString();
//
// prepStatem.setInt(1, idSite);
// prepStatem.setString(2, idVisit);
// prepStatem.setString(3, country);
// prepStatem.setString(4, action);
// prepStatem.setString(5, url);
// prepStatem.setString(6, oaipmh);
// prepStatem.setString(7, sourceItemType);
// prepStatem.setString(8, simpleDateFormat.format(timestamp));
// prepStatem.setString(9, referrerName);
// prepStatem.setString(10, agent);
// prepStatem.addBatch();
// batch_size++;
// if (batch_size == 10000) {
// prepStatem.executeBatch();
// ConnectDB.getConnection().commit();
// batch_size = 0;
// }
// }
// }
// }
// }
// }
// prepStatem.executeBatch();
// ConnectDB.getConnection().commit();
// stmt.close();
} }
public void removeDoubleClicks() throws Exception { public void removeDoubleClicks() throws Exception {