package org.gcube.portlets.user.dataminermanagertester.server; import java.util.ArrayList; import java.util.GregorianCalendar; import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.servlet.ServletContextEvent; import org.gcube.portlets.user.dataminermanagertester.server.task.TaskInProgress; import org.gcube.portlets.user.dataminermanagertester.server.task.TaskRequest; ***REMOVED*** import org.gcube.portlets.user.dataminermanagertester.shared.task.TaskStatus; ***REMOVED*** ***REMOVED*** /** * * @author Giancarlo Panichi * * */ public class DataMinerTesterBatchDaemon implements Runnable { private static Logger logger = LoggerFactory.getLogger(DataMinerTesterBatchDaemon.class); private long timeout = Constants.SERVICE_CLIENT_TIMEOUT_DEFAULT_MILLIS; private long timeoutUpdate = Constants.SERVICE_CLIENT_THREAD_POOL_TIME_OUT_UPDATE_MILLIS; private ServletContextEvent sce; private volatile boolean running = true; private ArrayList tasks; private Timer threadPoolTimeoutUpdateTimer = null; public DataMinerTesterBatchDaemon(ServletContextEvent sce) { this.sce = sce; tasks = new ArrayList<>(); initTimeout(); ***REMOVED*** private void initTimeout() { sce.getServletContext().setAttribute(SessionConstants.DATAMINERTESTER_MONITOR_TIME_OUT_PERIODMILLIS, Long.valueOf(timeout)); retrieveTimeOut(); startThreadPoolTimeoutUpdateTimer(); ***REMOVED*** public void terminate() { running = false; if (threadPoolTimeoutUpdateTimer != null) { threadPoolTimeoutUpdateTimer.cancel(); ***REMOVED*** ***REMOVED*** public void run() { Queue jobQueue = new ConcurrentLinkedQueue<>(); sce.getServletContext().setAttribute(SessionConstants.TASK_REQUEST_QUEUE, jobQueue); // pool size matching Web services capacity ExecutorService executorService = Executors.newFixedThreadPool(20); while (running) { while (!jobQueue.isEmpty()) { TaskRequest taskRequest = jobQueue.poll(); DataMinerTesterCallable accountingClientCallable = new DataMinerTesterCallable(taskRequest); Future futureResult = executorService.submit(accountingClientCallable); TaskInProgress taskInProgress = new TaskInProgress(new GregorianCalendar(), futureResult); tasks.add(taskInProgress); ***REMOVED*** if (!tasks.isEmpty()) { ArrayList dones = new ArrayList<>(); for (TaskInProgress taskInProgress : tasks) { Future futureResult = taskInProgress.getFuture(); if (futureResult.isDone()) { TaskStatus result = null; ***REMOVED*** result = futureResult.get(timeout, TimeUnit.MILLISECONDS); logger.debug("DataMinerTesterTask: " + result); ***REMOVED*** catch (InterruptedException | ExecutionException e) { logger.error("DataMinerTesterTask: " + e.getLocalizedMessage(), e); ***REMOVED*** catch (TimeoutException e) { logger.error("DataMinerTesterTask No response after " + timeout + " milliseconds!"); futureResult.cancel(true); ***REMOVED*** dones.add(taskInProgress); ***REMOVED*** else { GregorianCalendar now = new GregorianCalendar(); long diff = now.getTimeInMillis() - taskInProgress.getStartTime().getTimeInMillis(); if (diff > timeout) { futureResult.cancel(true); dones.add(taskInProgress); ***REMOVED*** ***REMOVED*** ***REMOVED*** tasks.removeAll(dones); ***REMOVED*** ***REMOVED*** Thread.sleep(Constants.DAEMON_SLEEP_MILLIS); ***REMOVED*** catch (InterruptedException e) { ***REMOVED*** ***REMOVED*** ***REMOVED*** private void retrieveTimeOut() { long timeo = 0; logger.info("DataMinerTester use default configuration for threadpool"); timeo = Constants.SERVICE_CLIENT_TIMEOUT_DEFAULT_MILLIS; if (timeo > 0) { timeout = timeo; sce.getServletContext().setAttribute(SessionConstants.DATAMINERTESTER_MONITOR_TIME_OUT_PERIODMILLIS, Long.valueOf(timeout)); ***REMOVED*** ***REMOVED*** private void startThreadPoolTimeoutUpdateTimer() { ***REMOVED*** threadPoolTimeoutUpdateTimer = new Timer(); threadPoolTimeoutUpdateTimer.schedule(new TimerTask() { @Override public void run() { logger.debug("ThreadPool request update of timeout"); retrieveTimeOut(); ***REMOVED*** ***REMOVED***, timeoutUpdate, timeoutUpdate); ***REMOVED*** logger.error("Error retrieving thread pool timeout!", e); return; ***REMOVED*** ***REMOVED*** ***REMOVED***