data-miner-manager-tester/src/main/java/org/gcube/portlets/user/dataminermanagertester/server/DataMinerTesterBatchDaemon....

150 lines
4.5 KiB
Java
Executable File

package org.gcube.portlets.user.dataminermanagertester.server;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContextEvent;
import org.gcube.portlets.user.dataminermanagertester.server.task.TaskInProgress;
import org.gcube.portlets.user.dataminermanagertester.server.task.TaskRequest;
import org.gcube.portlets.user.dataminermanagertester.shared.Constants;
import org.gcube.portlets.user.dataminermanagertester.shared.task.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Giancarlo Panichi
*
*
*/
public class DataMinerTesterBatchDaemon implements Runnable {
private static Logger logger = LoggerFactory.getLogger(DataMinerTesterBatchDaemon.class);
private long timeout = Constants.SERVICE_CLIENT_TIMEOUT_DEFAULT_MILLIS;
private long timeoutUpdate = Constants.SERVICE_CLIENT_THREAD_POOL_TIME_OUT_UPDATE_MILLIS;
private ServletContextEvent sce;
private volatile boolean running = true;
private ArrayList<TaskInProgress> tasks;
private Timer threadPoolTimeoutUpdateTimer = null;
public DataMinerTesterBatchDaemon(ServletContextEvent sce) {
this.sce = sce;
tasks = new ArrayList<>();
initTimeout();
}
private void initTimeout() {
sce.getServletContext().setAttribute(SessionConstants.DATAMINERTESTER_MONITOR_TIME_OUT_PERIODMILLIS,
Long.valueOf(timeout));
retrieveTimeOut();
startThreadPoolTimeoutUpdateTimer();
}
public void terminate() {
running = false;
if (threadPoolTimeoutUpdateTimer != null) {
threadPoolTimeoutUpdateTimer.cancel();
}
}
public void run() {
Queue<TaskRequest> jobQueue = new ConcurrentLinkedQueue<>();
sce.getServletContext().setAttribute(SessionConstants.TASK_REQUEST_QUEUE, jobQueue);
// pool size matching Web services capacity
ExecutorService executorService = Executors.newFixedThreadPool(20);
while (running) {
while (!jobQueue.isEmpty()) {
TaskRequest taskRequest = jobQueue.poll();
DataMinerTesterCallable accountingClientCallable = new DataMinerTesterCallable(taskRequest);
Future<TaskStatus> futureResult = executorService.submit(accountingClientCallable);
TaskInProgress taskInProgress = new TaskInProgress(new GregorianCalendar(), futureResult);
tasks.add(taskInProgress);
}
if (!tasks.isEmpty()) {
ArrayList<TaskInProgress> dones = new ArrayList<>();
for (TaskInProgress taskInProgress : tasks) {
Future<TaskStatus> futureResult = taskInProgress.getFuture();
if (futureResult.isDone()) {
TaskStatus result = null;
try {
result = futureResult.get(timeout, TimeUnit.MILLISECONDS);
logger.debug("DataMinerTesterTask: " + result);
} catch (InterruptedException | ExecutionException e) {
logger.error("DataMinerTesterTask: " + e.getLocalizedMessage(), e);
} catch (TimeoutException e) {
logger.error("DataMinerTesterTask No response after " + timeout + " milliseconds!");
futureResult.cancel(true);
}
dones.add(taskInProgress);
} else {
GregorianCalendar now = new GregorianCalendar();
long diff = now.getTimeInMillis() - taskInProgress.getStartTime().getTimeInMillis();
if (diff > timeout) {
futureResult.cancel(true);
dones.add(taskInProgress);
}
}
}
tasks.removeAll(dones);
}
try {
Thread.sleep(Constants.DAEMON_SLEEP_MILLIS);
} catch (InterruptedException e) {
}
}
}
private void retrieveTimeOut() {
long timeo = 0;
logger.info("DataMinerTester use default configuration for threadpool");
timeo = Constants.SERVICE_CLIENT_TIMEOUT_DEFAULT_MILLIS;
if (timeo > 0) {
timeout = timeo;
sce.getServletContext().setAttribute(SessionConstants.DATAMINERTESTER_MONITOR_TIME_OUT_PERIODMILLIS,
Long.valueOf(timeout));
}
}
private void startThreadPoolTimeoutUpdateTimer() {
try {
threadPoolTimeoutUpdateTimer = new Timer();
threadPoolTimeoutUpdateTimer.schedule(new TimerTask() {
@Override
public void run() {
logger.debug("ThreadPool request update of timeout");
retrieveTimeOut();
}
}, timeoutUpdate, timeoutUpdate);
} catch (Throwable e) {
logger.error("Error retrieving thread pool timeout!", e);
return;
}
}
}