forked from D-Net/dnet-hadoop
Corrections on final steps
This commit is contained in:
parent
8826684130
commit
8f24a6388e
|
@ -58,6 +58,9 @@ public class ExecuteWorkflow {
|
|||
static boolean sarcProcessStats;
|
||||
static int sarcNumberOfIssnToDownload;
|
||||
|
||||
static boolean finalizeStats;
|
||||
static boolean finalTablesVisibleToImpala;
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
|
||||
// Sending the logs to the console
|
||||
|
@ -154,6 +157,15 @@ public class ExecuteWorkflow {
|
|||
sarcProcessStats = false;
|
||||
sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload"));
|
||||
|
||||
if (parser.get("finalizeStats").toLowerCase().equals("true"))
|
||||
finalizeStats = true;
|
||||
else
|
||||
finalizeStats = false;
|
||||
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
|
||||
finalTablesVisibleToImpala = true;
|
||||
else
|
||||
finalTablesVisibleToImpala = false;
|
||||
|
||||
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
|
||||
usagestatsExport.export();
|
||||
}
|
||||
|
|
|
@ -497,25 +497,65 @@ public class PiwikStatsDB {
|
|||
stmt = ConnectDB.getHiveConnection().createStatement();
|
||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
|
||||
logger.info("Dropping full_dates table");
|
||||
String dropFullDates = "DROP TABLE IF EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() +
|
||||
".full_dates";
|
||||
stmt.executeUpdate(dropFullDates);
|
||||
logger.info("Dropped full_dates table");
|
||||
|
||||
Calendar startCalendar = Calendar.getInstance();
|
||||
startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
|
||||
Calendar endCalendar = Calendar.getInstance();
|
||||
int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
|
||||
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
|
||||
|
||||
String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', " +
|
||||
"('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " +
|
||||
diffMonth + ", 1) AS offs";
|
||||
logger.info("Creating full_dates table");
|
||||
String sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".full_dates AS " +
|
||||
"SELECT from_unixtime(unix_timestamp(cast(add_months(from_date,i) AS DATE)), 'yyyy/MM') AS txn_date " +
|
||||
"FROM (SELECT DATE '2016-01-01' AS from_date) p " +
|
||||
"LATERAL VIEW " +
|
||||
"posexplode(split(space(" + diffMonth + "),' ')) pe AS i,x";
|
||||
stmt.executeUpdate(sql);
|
||||
logger.info("Created full_dates table");
|
||||
|
||||
sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, " +
|
||||
"coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, "
|
||||
+
|
||||
"coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
|
||||
"coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views " +
|
||||
"FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source " +
|
||||
logger.info("Creating downloads_stats table");
|
||||
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() +
|
||||
".downloads_stats " +
|
||||
"(`source` string, " +
|
||||
"`repository_id` string, " +
|
||||
"`result_id` string, " +
|
||||
"`date` string, " +
|
||||
"`count` bigint, " +
|
||||
"`openaire` bigint)";
|
||||
stmt.executeUpdate(createDownloadsStats);
|
||||
logger.info("Created downloads_stats table");
|
||||
|
||||
logger.info("Creating views_stats table");
|
||||
String createViewsStats = "CREATE TABLE IF NOT EXISTS " +
|
||||
ConnectDB.getUsageStatsDBSchema() +
|
||||
".views_stats " +
|
||||
"(`source` string, " +
|
||||
"`repository_id` string, " +
|
||||
"`result_id` string, " +
|
||||
"`date` string, " +
|
||||
"`count` bigint, " +
|
||||
"`openaire` bigint)";
|
||||
stmt.executeUpdate(createViewsStats);
|
||||
logger.info("Created views_stats table");
|
||||
|
||||
String createUsageStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats " +
|
||||
"AS SELECT coalesce(ds.source, vs.source) as source, " +
|
||||
"coalesce(ds.repository_id, vs.repository_id) as repository_id, " +
|
||||
"coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, " +
|
||||
"coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, " +
|
||||
"coalesce(ds.openaire, 0) as openaire_downloads, " +
|
||||
"coalesce(vs.openaire, 0) as openaire_views " +
|
||||
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats AS ds FULL OUTER JOIN " +
|
||||
ConnectDB.getUsageStatsDBSchema() + ".views_stats AS vs ON ds.source=vs.source " +
|
||||
"AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date";
|
||||
stmt.executeUpdate(sql);
|
||||
stmt.executeUpdate(createUsageStats);
|
||||
|
||||
stmt.close();
|
||||
ConnectDB.getHiveConnection().close();
|
||||
|
|
|
@ -346,7 +346,7 @@ public class SarcStats {
|
|||
ConnectDB.getStatsDBSchema() + ".datasource_results dr, " +
|
||||
ConnectDB.getStatsDBSchema() + ".result_pids ro " +
|
||||
"WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " +
|
||||
"s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS'";
|
||||
"s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS'";
|
||||
stmtImpala.executeUpdate(createDownloadsStatsImpala);
|
||||
logger.info("Creating downloads_stats_impala");
|
||||
|
||||
|
@ -525,14 +525,11 @@ public class SarcStats {
|
|||
private void renameKeysRecursively(String delimiter, JSONObject givenJsonObj) throws Exception {
|
||||
Set<String> jkeys = new HashSet<String>(givenJsonObj.keySet());
|
||||
for (String jkey : jkeys) {
|
||||
System.out.println("++++> " + jkey);
|
||||
|
||||
String[] splitArray = jkey.split(delimiter);
|
||||
String newJkey = splitArray[splitArray.length - 1];
|
||||
System.out.println("New jkey: " + jkey);
|
||||
|
||||
Object jval = givenJsonObj.get(jkey);
|
||||
System.out.println("jval ===> " + jval.getClass().getName());
|
||||
givenJsonObj.remove(jkey);
|
||||
givenJsonObj.put(newJkey, jval);
|
||||
|
||||
|
|
|
@ -5,10 +5,6 @@ import java.io.IOException;
|
|||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
|
||||
import javax.sound.midi.SysexMessage;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -29,31 +25,6 @@ public class UsageStatsExporter {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
|
||||
|
||||
public void runImpalaQuery() throws Exception {
|
||||
Statement stmt = ConnectDB.getImpalaConnection().createStatement();
|
||||
ConnectDB.getImpalaConnection().setAutoCommit(false);
|
||||
|
||||
logger.info("Executing Impala query");
|
||||
Statement statement = ConnectDB.getImpalaConnection().createStatement();
|
||||
|
||||
ResultSet rs = statement
|
||||
.executeQuery(
|
||||
// "CREATE TABLE usagestats_20200913.spyros_tmp5 AS\n" +
|
||||
// "SELECT s.source, d.id AS repository_id, ro.id as result_id, s.count, '0' \n" +
|
||||
// "FROM usagestats_20200913.sarc_sushilogtmp2 s, \n" +
|
||||
// "openaire_prod_stats_shadow_20200821.datasource_oids d, \n" +
|
||||
// "openaire_prod_stats_shadow_20200821.datasource_results dr, \n" +
|
||||
// "openaire_prod_stats_shadow_20200821.result_pids ro \n" +
|
||||
// "WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id \n" +
|
||||
// "AND s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS' ");
|
||||
|
||||
"CREATE TABLE usagestats_20200913.spyros_tmp6 AS\n" +
|
||||
"SELECT * \n" +
|
||||
"FROM usagestats_20200913.sarc_sushilogtmp2");
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
|
||||
private void reCreateLogDirs() throws IllegalArgumentException, IOException {
|
||||
FileSystem dfs = FileSystem.get(new Configuration());
|
||||
|
||||
|
@ -165,12 +136,16 @@ public class UsageStatsExporter {
|
|||
logger.info("Sarc done");
|
||||
|
||||
// finalize usagestats
|
||||
piwikstatsdb.finalizeStats();
|
||||
logger.info("Finalized stats");
|
||||
if (ExecuteWorkflow.finalizeStats) {
|
||||
piwikstatsdb.finalizeStats();
|
||||
logger.info("Finalized stats");
|
||||
}
|
||||
|
||||
// Make the tables available to Impala
|
||||
logger.info("Making tables visible to Impala");
|
||||
invalidateMetadata();
|
||||
if (ExecuteWorkflow.finalTablesVisibleToImpala) {
|
||||
logger.info("Making tables visible to Impala");
|
||||
invalidateMetadata();
|
||||
}
|
||||
|
||||
logger.info("End");
|
||||
}
|
||||
|
@ -180,23 +155,16 @@ public class UsageStatsExporter {
|
|||
|
||||
stmt = ConnectDB.getImpalaConnection().createStatement();
|
||||
|
||||
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".piwiklog";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".sushilog";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
|
||||
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog";
|
||||
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
stmt.close();
|
||||
ConnectDB.getHiveConnection().close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -196,5 +196,18 @@
|
|||
"paramLongName": "sarcNumberOfIssnToDownload",
|
||||
"paramDescription": "Limit the number of the downloaded ISSN (Sarc) to the first sarcNumberOfIssnToDownload",
|
||||
"paramRequired": true
|
||||
}
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "fs",
|
||||
"paramLongName": "finalizeStats",
|
||||
"paramDescription": "Create the usage_stats table?",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ftvi",
|
||||
"paramLongName": "finalTablesVisibleToImpala",
|
||||
"paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
||||
|
|
|
@ -75,6 +75,8 @@
|
|||
<arg>--sarcDownloadReports</arg><arg>${sarcDownloadReports}</arg>
|
||||
<arg>--sarcProcessStats</arg><arg>${sarcProcessStats}</arg>
|
||||
<arg>--sarcNumberOfIssnToDownload</arg><arg>${sarcNumberOfIssnToDownload}</arg>
|
||||
<arg>--finalizeStats</arg><arg>${finalizeStats}</arg>
|
||||
<arg>--finalTablesVisibleToImpala</arg><arg>${finalTablesVisibleToImpala}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="End" />
|
||||
|
|
Loading…
Reference in New Issue