From 7fdf994eb6a4eb604b10b3d2d0916139dcb63f91 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Mon, 19 Oct 2020 17:54:50 +0300 Subject: [PATCH] Multithreaded download for piwiklogs --- .../export/LaReferenciaDownloadLogs.java | 10 ++ .../usagestats/export/PiwikDownloadLogs.java | 153 +++++++++++++----- .../usagestats/export/UsageStatsExporter.java | 3 +- 3 files changed, 120 insertions(+), 46 deletions(-) diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java index 0a2854729..0e0e013cf 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/LaReferenciaDownloadLogs.java @@ -48,6 +48,16 @@ public class LaReferenciaDownloadLogs { // this.createTmpTables(); } + public void reCreateLogDirs() throws IllegalArgumentException, IOException { + FileSystem dfs = FileSystem.get(new Configuration()); + + logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); + dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true); + + logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath); + dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath)); + } + private void createTables() throws Exception { try { Statement stmt = ConnectDB.getHiveConnection().createStatement(); diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java index 4d54a344e..cd531f868 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/PiwikDownloadLogs.java @@ -15,6 +15,8 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -86,6 +88,103 @@ public class PiwikDownloadLogs { } } + class WorkerThread implements Runnable { + private Calendar currDay; + private int siteId; + private String repoLogsPath; + private String portalLogPath; + private String portalMatomoID; + + public WorkerThread(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath, + String portalMatomoID) throws IOException { + this.currDay = (Calendar) currDay.clone(); + this.siteId = new Integer(siteId); + this.repoLogsPath = new String(repoLogsPath); + this.portalLogPath = new String(portalLogPath); + this.portalMatomoID = new String(portalMatomoID); + } + + public void run() { + System.out + .println( + Thread.currentThread().getName() + " (Start) Thread for " + + "parameters: currDay=" + currDay + ", siteId=" + siteId + + ", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath + + ", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID); + try { + GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out + .println( + Thread.currentThread().getName() + " (End) Thread for " + + "parameters: currDay=" + currDay + ", siteId=" + siteId + + ", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath + + ", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID); + } + + public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath, + String portalMatomoID) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + Date date = currDay.getTime(); + logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); + + String period = "&period=day&date=" + sdf.format(date); + String outFolder = ""; + // portal siteId = 109; + if (siteId == Integer.parseInt(portalMatomoID)) { + outFolder = portalLogPath; + } else { + outFolder = repoLogsPath; + } + + String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format + + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; + String content = ""; + + int i = 0; + + JSONParser parser = new JSONParser(); + StringBuffer totalContent = new StringBuffer(); + FileSystem fs = FileSystem.get(new Configuration()); + FSDataOutputStream fin = fs + .create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true); + do { + String apiUrl = baseApiUrl; + + if (i > 0) { + apiUrl += "&filter_offset=" + (i * 1000); + } + + content = getJson(apiUrl); + if (content.length() == 0 || content.equals("[]")) + break; + + JSONArray jsonArray = (JSONArray) parser.parse(content); + for (Object aJsonArray : jsonArray) { + JSONObject jsonObjectRaw = (JSONObject) aJsonArray; + fin.write(jsonObjectRaw.toJSONString().getBytes()); + fin.writeChar('\n'); +// totalContent.append(jsonObjectRaw.toJSONString()); +// totalContent.append('\n'); + } + i++; + } while (true); + +// FileSystem fs = FileSystem.get(new Configuration()); +// FSDataOutputStream fin = fs +// .create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true); +// +// fin.write(totalContent.toString().getBytes()); + fin.close(); + fs.close(); + } + } + public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception { Statement statement = ConnectDB.getHiveConnection().createStatement(); @@ -121,6 +220,8 @@ public class PiwikDownloadLogs { end.add(Calendar.DAY_OF_MONTH, -1); logger.info("Ending period for log download: " + sdf.format(end.getTime())); +// FileSystem fs = FileSystem.get(new Configuration()); + ExecutorService executor = Executors.newFixedThreadPool(20);// creating a pool of 5 threadsσ for (int siteId : piwikIdToVisit) { logger.info("Now working on piwikId: " + siteId); @@ -141,51 +242,15 @@ public class PiwikDownloadLogs { rs_date.close(); for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) { - Date date = currDay.getTime(); - logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date)); - - String period = "&period=day&date=" + sdf.format(date); - String outFolder = ""; - // portal siteId = 109; - if (siteId == Integer.parseInt(portalMatomoID)) { - outFolder = portalLogPath; - } else { - outFolder = repoLogsPath; - } - FileSystem fs = FileSystem.get(new Configuration()); - FSDataOutputStream fin = fs - .create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true); - - String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format - + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; - String content = ""; - - int i = 0; - - JSONParser parser = new JSONParser(); - do { - String apiUrl = baseApiUrl; - - if (i > 0) { - apiUrl += "&filter_offset=" + (i * 1000); - } - - content = getJson(apiUrl); - if (content.length() == 0 || content.equals("[]")) - break; - - JSONArray jsonArray = (JSONArray) parser.parse(content); - for (Object aJsonArray : jsonArray) { - JSONObject jsonObjectRaw = (JSONObject) aJsonArray; - fin.write(jsonObjectRaw.toJSONString().getBytes()); - fin.writeChar('\n'); - } - - i++; - } while (true); - fin.close(); +// Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID, fs); + Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID); + executor.execute(worker);// calling execute method of ExecutorService } - } + executor.shutdown(); + while (!executor.isTerminated()) { + } + System.out.println("Finished all threads"); +// fs.close(); } } diff --git a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java index 1c6035543..46fc82ee6 100644 --- a/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java +++ b/dhp-workflows/dhp-usage-stats-update/src/main/java/eu/dnetlib/oa/graph/usagestats/export/UsageStatsExporter.java @@ -2,7 +2,6 @@ package eu.dnetlib.oa.graph.usagestats.export; import java.io.IOException; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -94,7 +93,7 @@ public class UsageStatsExporter { if (ExecuteWorkflow.laReferenciaEmptyDirs) { logger.info("Recreating LaReferencia log directories"); - piwikstatsdb.reCreateLogDirs(); + lrf.reCreateLogDirs(); } if (ExecuteWorkflow.downloadLaReferenciaLogs) {