More progress on loading JSON Serde jar

This commit is contained in:
Spyros Zoupanos 2020-09-07 00:01:05 +03:00
parent 5af2abbea5
commit e2c70f64ed
2 changed files with 69 additions and 6 deletions

View File

@ -213,7 +213,7 @@ public class PiwikStatsDB {
".piwiklogtmp_json";
stmt.executeUpdate(drop_piwiklogtmp_json);
String stm_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
String create_piwiklogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".piwiklogtmp_json(\n" +
" `idSite` STRING,\n" +
@ -237,22 +237,36 @@ public class PiwikStatsDB {
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + UsageStatsExporter.repoLogPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(stm_piwiklogtmp_json);
stmt.executeUpdate(create_piwiklogtmp_json);
String drop_piwiklogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".piwiklogtmp";
stmt.executeUpdate(drop_piwiklogtmp);
String stm_piwiklogtmp = "CREATE TABLE " +
String create_piwiklogtmp = "CREATE TABLE " +
ConnectDB.getUsageStatsDBSchema() +
".piwiklogtmp (source BIGINT, id_Visit STRING, country STRING, action STRING, url STRING, " +
"entity_id STRING, source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(stm_piwiklogtmp);
stmt.executeUpdate(create_piwiklogtmp);
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
String insert_piwiklogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"SELECT DISTINCT cast(idSite as BIGINT) as source, idVisit as id_Visit, country, " +
"actiondetail.type as action, actiondetail.url as url, " +
"actiondetail.customVariables.`1`.`customVariablePageValue1` as entity_id, " +
"'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " +
"referrerName as referrer_name, browser as agent\n" +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp_json\n" +
"LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail";
stmt.executeUpdate(insert_piwiklogtmp);
stmt.close();
System.exit(0);
// ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
//// File dir = new File(this.logRepoPath);
//// File[] jsonFiles = dir.listFiles();
@ -336,10 +350,21 @@ public class PiwikStatsDB {
ConnectDB.getConnection().setAutoCommit(false);
// clean download double clicks
String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp " +
"WHERE EXISTS (\n" +
"SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp \n" +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p1, " +
ConnectDB.getUsageStatsDBSchema() + ".piwiklogtmp p2\n" +
"WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id \n"
+
"AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp \n" +
"AND p1.timestamp<p2.timestamp AND ((unix_timestamp(p2.timestamp)-unix_timestamp(p1.timestamp))/60)<30 \n" +
"AND piwiklogtmp.source=p1.source AND piwiklogtmp.id_visit=p1.id_visit \n" +
"AND piwiklogtmp.action=p1.action AND piwiklogtmp.entity_id=p1.entity_id AND piwiklogtmp.timestamp=p1.timestamp)";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getConnection().commit();
System.exit(0);
stmt = ConnectDB.getConnection().createStatement();

View File

@ -1,6 +1,11 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Properties;
import org.apache.log4j.Logger;
@ -20,10 +25,41 @@ public class UsageStatsExporter {
static String irusUKReportPath = "/user/spyros/logs/usage_stats_logs2/irusUKReports";
static String sarcsReportPath = "/user/spyros/logs/usage_stats_logs2/sarcReports";
private static final Class[] parameters = new Class[] {
URL.class
};
public UsageStatsExporter(Properties properties) {
this.properties = properties;
}
public static void addFile(String s) throws IOException {
File f = new File(s);
addFile(f);
}// end method
public static void addFile(File f) throws IOException {
addURL(f.toURL());
}// end method
public static void addURL(URL u) throws IOException {
URLClassLoader sysloader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Class sysclass = URLClassLoader.class;
try {
Method method = sysclass.getDeclaredMethod("addURL", parameters);
method.setAccessible(true);
method.invoke(sysloader, new Object[] {
u
});
} catch (Throwable t) {
t.printStackTrace();
throw new IOException("Error, could not add URL to system classloader");
} // end try catch
}// end method
// public void export() throws Exception {
public void export() throws Exception {
@ -35,6 +71,8 @@ public class UsageStatsExporter {
// String portalMatomoID = properties.getProperty("portal_MatomoID");
// String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl");
addFile("/usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
// connect to DB
ConnectDB.init(properties);