accounting-manager/src/main/java/org/gcube/portlets/admin/accountingmanager/server/AccountingClientDaemon.java

114 lines
3.5 KiB
Java

package org.gcube.portlets.admin.accountingmanager.server;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContextEvent;
import org.gcube.portlets.admin.accountingmanager.server.amservice.cache.AccountingCache;
import org.gcube.portlets.admin.accountingmanager.server.util.TaskInProgress;
import org.gcube.portlets.admin.accountingmanager.server.util.TaskRequest;
import org.gcube.portlets.admin.accountingmanager.server.util.TaskStatus;
import org.gcube.portlets.admin.accountingmanager.shared.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Giancarlo Panichi email: <a
* href="mailto:g.panichi@isti.cnr.it">g.panichi@isti.cnr.it</a>
*
*/
public class AccountingClientDaemon implements Runnable {
private static Logger logger = LoggerFactory
.getLogger(AccountingClientDaemon.class);
private ServletContextEvent sce;
private volatile boolean running = true;
private volatile AccountingCache accountingCache;
private ArrayList<TaskInProgress> tasks;
private long timeout=Constants.SERVICE_CLIENT_TIMEOUT_DEFAULT_MILLIS;
public AccountingClientDaemon(ServletContextEvent sce,
AccountingCache accountingCache) {
this.sce = sce;
this.accountingCache = accountingCache;
tasks = new ArrayList<>();
}
public void terminate() {
running = false;
}
public void run() {
Queue<TaskRequest> jobQueue = new ConcurrentLinkedQueue<>();
sce.getServletContext().setAttribute(
SessionConstants.TASK_REQUEST_QUEUE, jobQueue);
// pool size matching Web services capacity
ExecutorService executorService = Executors.newFixedThreadPool(20);
while (running) {
if (!jobQueue.isEmpty()) {
final TaskRequest taskRequest = jobQueue.poll();
AccountingClientCallable accountingClientCallable = new AccountingClientCallable(
taskRequest, accountingCache);
Future<TaskStatus> futureResult = executorService
.submit(accountingClientCallable);
TaskInProgress taskInProgress = new TaskInProgress(
new GregorianCalendar(), futureResult);
tasks.add(taskInProgress);
}
if (!tasks.isEmpty()) {
ArrayList<TaskInProgress> dones = new ArrayList<>();
for (TaskInProgress taskInProgress : tasks) {
Future<TaskStatus> futureResult = taskInProgress
.getFuture();
if (futureResult.isDone()) {
TaskStatus result = null;
try {
result = futureResult
.get(timeout,
TimeUnit.MILLISECONDS);
logger.info("AccountingClientTask: " + result);
} catch (InterruptedException | ExecutionException e) {
logger.error(
"AccountingClientTask: "
+ e.getLocalizedMessage(), e);
} catch (TimeoutException e) {
logger.error("AccountingClientTask No response after "
+ timeout
+ " milliseconds!");
futureResult.cancel(true);
}
dones.add(taskInProgress);
} else {
GregorianCalendar now = new GregorianCalendar();
long diff = now.getTimeInMillis()
- taskInProgress.getStartTime()
.getTimeInMillis();
if (diff > timeout) {
futureResult.cancel(true);
dones.add(taskInProgress);
}
}
}
tasks.removeAll(dones);
}
}
}
}