forked from D-Net/dnet-hadoop
Multithreaded download for piwiklogs
This commit is contained in:
parent
6cc58e2720
commit
7fdf994eb6
|
@ -48,6 +48,16 @@ public class LaReferenciaDownloadLogs {
|
||||||
// this.createTmpTables();
|
// this.createTmpTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reCreateLogDirs() throws IllegalArgumentException, IOException {
|
||||||
|
FileSystem dfs = FileSystem.get(new Configuration());
|
||||||
|
|
||||||
|
logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
|
||||||
|
dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true);
|
||||||
|
|
||||||
|
logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
|
||||||
|
dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath));
|
||||||
|
}
|
||||||
|
|
||||||
private void createTables() throws Exception {
|
private void createTables() throws Exception {
|
||||||
try {
|
try {
|
||||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||||
|
|
|
@ -15,6 +15,8 @@ import java.util.ArrayList;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import javax.net.ssl.HostnameVerifier;
|
import javax.net.ssl.HostnameVerifier;
|
||||||
import javax.net.ssl.HttpsURLConnection;
|
import javax.net.ssl.HttpsURLConnection;
|
||||||
|
@ -86,6 +88,103 @@ public class PiwikDownloadLogs {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class WorkerThread implements Runnable {
|
||||||
|
private Calendar currDay;
|
||||||
|
private int siteId;
|
||||||
|
private String repoLogsPath;
|
||||||
|
private String portalLogPath;
|
||||||
|
private String portalMatomoID;
|
||||||
|
|
||||||
|
public WorkerThread(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
|
||||||
|
String portalMatomoID) throws IOException {
|
||||||
|
this.currDay = (Calendar) currDay.clone();
|
||||||
|
this.siteId = new Integer(siteId);
|
||||||
|
this.repoLogsPath = new String(repoLogsPath);
|
||||||
|
this.portalLogPath = new String(portalLogPath);
|
||||||
|
this.portalMatomoID = new String(portalMatomoID);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
System.out
|
||||||
|
.println(
|
||||||
|
Thread.currentThread().getName() + " (Start) Thread for "
|
||||||
|
+ "parameters: currDay=" + currDay + ", siteId=" + siteId +
|
||||||
|
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
|
||||||
|
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
|
||||||
|
try {
|
||||||
|
GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.out
|
||||||
|
.println(
|
||||||
|
Thread.currentThread().getName() + " (End) Thread for "
|
||||||
|
+ "parameters: currDay=" + currDay + ", siteId=" + siteId +
|
||||||
|
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
|
||||||
|
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
|
||||||
|
String portalMatomoID) throws Exception {
|
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||||
|
|
||||||
|
Date date = currDay.getTime();
|
||||||
|
logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
|
||||||
|
|
||||||
|
String period = "&period=day&date=" + sdf.format(date);
|
||||||
|
String outFolder = "";
|
||||||
|
// portal siteId = 109;
|
||||||
|
if (siteId == Integer.parseInt(portalMatomoID)) {
|
||||||
|
outFolder = portalLogPath;
|
||||||
|
} else {
|
||||||
|
outFolder = repoLogsPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
|
||||||
|
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
||||||
|
String content = "";
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
JSONParser parser = new JSONParser();
|
||||||
|
StringBuffer totalContent = new StringBuffer();
|
||||||
|
FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
FSDataOutputStream fin = fs
|
||||||
|
.create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true);
|
||||||
|
do {
|
||||||
|
String apiUrl = baseApiUrl;
|
||||||
|
|
||||||
|
if (i > 0) {
|
||||||
|
apiUrl += "&filter_offset=" + (i * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
content = getJson(apiUrl);
|
||||||
|
if (content.length() == 0 || content.equals("[]"))
|
||||||
|
break;
|
||||||
|
|
||||||
|
JSONArray jsonArray = (JSONArray) parser.parse(content);
|
||||||
|
for (Object aJsonArray : jsonArray) {
|
||||||
|
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
|
||||||
|
fin.write(jsonObjectRaw.toJSONString().getBytes());
|
||||||
|
fin.writeChar('\n');
|
||||||
|
// totalContent.append(jsonObjectRaw.toJSONString());
|
||||||
|
// totalContent.append('\n');
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
// FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
// FSDataOutputStream fin = fs
|
||||||
|
// .create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true);
|
||||||
|
//
|
||||||
|
// fin.write(totalContent.toString().getBytes());
|
||||||
|
fin.close();
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
|
public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
|
||||||
|
|
||||||
Statement statement = ConnectDB.getHiveConnection().createStatement();
|
Statement statement = ConnectDB.getHiveConnection().createStatement();
|
||||||
|
@ -121,6 +220,8 @@ public class PiwikDownloadLogs {
|
||||||
end.add(Calendar.DAY_OF_MONTH, -1);
|
end.add(Calendar.DAY_OF_MONTH, -1);
|
||||||
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
|
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
|
||||||
|
|
||||||
|
// FileSystem fs = FileSystem.get(new Configuration());
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(20);// creating a pool of 5 threadsσ
|
||||||
for (int siteId : piwikIdToVisit) {
|
for (int siteId : piwikIdToVisit) {
|
||||||
|
|
||||||
logger.info("Now working on piwikId: " + siteId);
|
logger.info("Now working on piwikId: " + siteId);
|
||||||
|
@ -141,51 +242,15 @@ public class PiwikDownloadLogs {
|
||||||
rs_date.close();
|
rs_date.close();
|
||||||
|
|
||||||
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
|
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
|
||||||
Date date = currDay.getTime();
|
// Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID, fs);
|
||||||
logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
|
Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
|
||||||
|
executor.execute(worker);// calling execute method of ExecutorService
|
||||||
String period = "&period=day&date=" + sdf.format(date);
|
|
||||||
String outFolder = "";
|
|
||||||
// portal siteId = 109;
|
|
||||||
if (siteId == Integer.parseInt(portalMatomoID)) {
|
|
||||||
outFolder = portalLogPath;
|
|
||||||
} else {
|
|
||||||
outFolder = repoLogsPath;
|
|
||||||
}
|
|
||||||
FileSystem fs = FileSystem.get(new Configuration());
|
|
||||||
FSDataOutputStream fin = fs
|
|
||||||
.create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true);
|
|
||||||
|
|
||||||
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
|
|
||||||
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
|
||||||
String content = "";
|
|
||||||
|
|
||||||
int i = 0;
|
|
||||||
|
|
||||||
JSONParser parser = new JSONParser();
|
|
||||||
do {
|
|
||||||
String apiUrl = baseApiUrl;
|
|
||||||
|
|
||||||
if (i > 0) {
|
|
||||||
apiUrl += "&filter_offset=" + (i * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
content = getJson(apiUrl);
|
|
||||||
if (content.length() == 0 || content.equals("[]"))
|
|
||||||
break;
|
|
||||||
|
|
||||||
JSONArray jsonArray = (JSONArray) parser.parse(content);
|
|
||||||
for (Object aJsonArray : jsonArray) {
|
|
||||||
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
|
|
||||||
fin.write(jsonObjectRaw.toJSONString().getBytes());
|
|
||||||
fin.writeChar('\n');
|
|
||||||
}
|
|
||||||
|
|
||||||
i++;
|
|
||||||
} while (true);
|
|
||||||
fin.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
executor.shutdown();
|
||||||
|
while (!executor.isTerminated()) {
|
||||||
|
}
|
||||||
|
System.out.println("Finished all threads");
|
||||||
|
// fs.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
package eu.dnetlib.oa.graph.usagestats.export;
|
package eu.dnetlib.oa.graph.usagestats.export;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
|
||||||
|
@ -94,7 +93,7 @@ public class UsageStatsExporter {
|
||||||
|
|
||||||
if (ExecuteWorkflow.laReferenciaEmptyDirs) {
|
if (ExecuteWorkflow.laReferenciaEmptyDirs) {
|
||||||
logger.info("Recreating LaReferencia log directories");
|
logger.info("Recreating LaReferencia log directories");
|
||||||
piwikstatsdb.reCreateLogDirs();
|
lrf.reCreateLogDirs();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ExecuteWorkflow.downloadLaReferenciaLogs) {
|
if (ExecuteWorkflow.downloadLaReferenciaLogs) {
|
||||||
|
|
Loading…
Reference in New Issue