Making tables visible to Impala

This commit is contained in:
Spyros Zoupanos 2020-10-09 22:28:45 +03:00
parent dd9df4ae58
commit a852dd3a0d
2 changed files with 47 additions and 83 deletions

View File

@ -4,24 +4,16 @@ package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URLDecoder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -511,83 +503,21 @@ public class PiwikStatsDB {
int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
// String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, "
+ diffMonth + ", 1) AS offs;";
String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', " +
"('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " +
diffMonth + ", 1) AS offs";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
stmt.executeUpdate(sql);
// sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
stmt.executeUpdate(sql);
sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
stmt.executeUpdate(sql);
sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
stmt.executeUpdate(sql);
sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
stmt.executeUpdate(sql);
sql = "DROP VIEW IF EXISTS result_views_monthly_tmp";
stmt.executeUpdate(sql);
sql = "DROP TABLE IF EXISTS piwiklogtmp;";
stmt.executeUpdate(sql);
sql = "DROP TABLE IF EXISTS sushilogtmp;";
sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, " +
"coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, "
+
"coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
"coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views " +
"FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source " +
"AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().commit();
ConnectDB.getHiveConnection().close();
}

View File

@ -3,7 +3,10 @@ package eu.dnetlib.oa.graph.usagestats.export;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import javax.sound.midi.SysexMessage;
@ -161,8 +164,39 @@ public class UsageStatsExporter {
}
logger.info("Sarc done");
// // finalize usagestats
// piwikstatsdb.finalizeStats();
// log.info("finalized stats");
// finalize usagestats
piwikstatsdb.finalizeStats();
logger.info("Finalized stats");
// Make the tables available to Impala
logger.info("Making tables visible to Impala");
invalidateMetadata();
logger.info("End");
}
private void invalidateMetadata() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getImpalaConnection().createStatement();
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".sushilog";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
}