diff --git a/pom.xml b/pom.xml index 08a1f8e..e7a86f0 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ + org.gcube.resources.discovery ic-client @@ -68,20 +69,14 @@ discovery-client provided - org.gcube.core - common-generic-clients + common-scope provided org.gcube.core - common-gcube-calls - provided - - - org.gcube.core - common-jaxws-calls + common-scope-maps provided @@ -95,10 +90,10 @@ provided - org.gcube.core - common-smartgears-app + org.gcube.resources + common-gcore-resources + provided - org.gcube.common authorization-client @@ -114,40 +109,57 @@ smart-executor-client [1.4.0-SNAPSHOT,2.0.0-SNAPSHOT] - - - + + + com.fasterxml.jackson.core + jackson-core + provided + + + + + org.gcube.core + common-generic-clients + + + org.gcube.core + common-smartgears-app + + com.orientechnologies orientdb-client - org.gcube.vremanagement smart-executor-api [1.4.0-SNAPSHOT, 2.0.0-SNAPSHOT) + + com.sun.xml.ws jaxws-rt 2.1.7 + org.slf4j slf4j-api diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java index b3f1250..b225298 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java @@ -19,6 +19,7 @@ import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFact import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; +import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,12 @@ public class SmartExecutorImpl implements SmartExecutor { logger.info("Launch requested {}", parameter); - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + SmartExecutorScheduler smartExecutorScheduler; + try { + smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); + } catch (SchedulerException e) { + throw new ExecutorException(e); + } UUID uuid = smartExecutorScheduler.schedule(parameter, null); logger.info( String.format( @@ -61,7 +67,7 @@ public class SmartExecutorImpl implements SmartExecutor { @Override public boolean stop(String executionIdentifier) throws ExecutorException { logger.info("Stop requested for {}", executionIdentifier); - boolean ret = unSchedule(executionIdentifier, true, false); + boolean ret = unSchedule(executionIdentifier, false); logger.info("{} was{} stopped succesfully", executionIdentifier, ret? "" : " not"); return ret; } @@ -72,18 +78,18 @@ public class SmartExecutorImpl implements SmartExecutor { throws ExecutorException { logger.info("UnSchedule requested for {} globally : {}", executionIdentifier, globally); - boolean ret = unSchedule(executionIdentifier, false, globally); + boolean ret = unSchedule(executionIdentifier, globally); logger.info("{} was{} unscheduled {} succesfully", executionIdentifier, ret? "" : " not", globally? "globally": "locally"); return ret; } // TODO Manage better exception to to advise the caller - protected boolean unSchedule(String executionIdentifier, boolean stopOnly, boolean globally) throws ExecutorException { + protected boolean reallyUnSchedule(String executionIdentifier, boolean globally) throws ExecutorException { boolean currentStopped = true; try { - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); UUID uuid = UUID.fromString(executionIdentifier); - smartExecutorScheduler.stop(uuid, stopOnly, globally); + smartExecutorScheduler.stop(uuid, globally); } catch (SchedulerNotFoundException e) { // currentStopped = true; logger.error("Error unscheduling task {}", executionIdentifier, e); diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java index 83523d8..2d8ee68 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java @@ -38,6 +38,8 @@ import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; +import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -297,6 +299,7 @@ public class SmartExecutorInitializator implements ApplicationManager { @Override public void onInit() { String scope = getCurrentScope(); + logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor is Starting on scope {}\n" @@ -330,54 +333,60 @@ public class SmartExecutorInitializator implements ApplicationManager { // Persistence to clean previous situation of a failure of HostingNode try { - logger.debug("Going to get Orphan Scheduled Tasks"); + logger.debug("Going to get Orphan Scheduled Tasks in scope {}", scope); List scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(availablePlugins.values()); + if(scheduledTasks.size()==0){ + logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", scope); + } + for(final ScheduledTask scheduledTask : scheduledTasks){ + final ObjectMapper mapper = ObjectMapperManager.getObjectMapper(); + + String taskAsString = mapper.writeValueAsString(scheduledTask); + try { // Reserving the task. smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask); }catch (Exception e) { - logger.debug("someone else is going to take in charge the scheduled task. Skipping."); + logger.debug("({}) Someone else is going to take in charge the scheduled task {}. Skipping.", scope, taskAsString); continue; } - - final ObjectMapper mapper = ObjectMapperManager.getObjectMapper(); Thread thread = new Thread(){ @Override public void run(){ LaunchParameter launchParameter = scheduledTask.getLaunchParameter(); + try { - logger.info("Going to schedule an already scheduled task with the following parameters {}", mapper.writeValueAsString(launchParameter)); + logger.info("({}) Going to schedule an already scheduled task with the following parameters {}", scope, mapper.writeValueAsString(launchParameter)); } catch (Exception e1) { } - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); - String scheduledTasktoken = scheduledTask.getToken(); try { setContext(scheduledTasktoken); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); // A new Scheduled Task will be persisted due to launch. Removing it smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask); smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID()); } catch (Exception e) { - logger.error("Error while trying to relaunch scheduled task.", e); + logger.error("({}) Error while trying to relaunch scheduled task.", scope, e); try { smartExecutorPersistenceConnector.addScheduledTask(scheduledTask); } catch (Exception ex) { - logger.error("Unable to "); + logger.error("({}) Unable to add back scheduled task {}", scope, taskAsString); } } } }; - + thread.start(); - } } catch (Exception e) { @@ -409,14 +418,21 @@ public class SmartExecutorInitializator implements ApplicationManager { + "-------------------------------------------------------", getCurrentScope()); - // TODO release scheduled tasks - SmartExecutorScheduler.getInstance().stopAll(); - + + SmartExecutorScheduler scheduler; + try { + scheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); + scheduler.stopAll(); + SmartExecutorSchedulerFactory.remove(); + } catch (SchedulerException e) { + logger.error("", e); + } cleanServiceEndpoints(); + try { - SmartExecutorPersistenceFactory.getPersistenceConnector().close(); - } catch (Exception e) { + SmartExecutorPersistenceFactory.closePersistenceConnector(); + } catch (Throwable e) { logger.error("Unable to correctly close {} for scope {}", SmartExecutorPersistenceConnector.class.getSimpleName(), getCurrentScope(), e); @@ -427,6 +443,5 @@ public class SmartExecutorInitializator implements ApplicationManager { + "Smart Executor Stopped Successfully on scope {}\n" + "-------------------------------------------------------", getCurrentScope()); - } } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java index c2e91bf..2e688f5 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java @@ -6,11 +6,13 @@ package org.gcube.vremanagement.executor.persistence; import java.util.HashMap; import java.util.UUID; +import org.gcube.common.clients.exceptions.DiscoveryException; import org.gcube.common.resources.gcore.HostingNode; import org.gcube.smartgears.ContextProvider; import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; +import org.gcube.vremanagement.executor.exception.ExecutorException; import org.gcube.vremanagement.executor.json.ObjectMapperManager; import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.plugin.PluginState; @@ -79,7 +81,6 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif } }catch (Exception e) { logger.error("Unable to check if current hosting node is the same of the one in ScheduledTask", e); - } String address = runOn.getEService().getAddress(); @@ -95,18 +96,26 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif .getExecutorProxy(pluginName, null, null, specificEndpointDiscoveryFilter).build(); proxy.getStateEvolution(uuid.toString()); - } catch (Exception e) { + logger.trace("{} is not orphan.", ObjectMapperManager + .getObjectMapper().writeValueAsString(scheduledTask)); + return false; + } catch (DiscoveryException | ExecutorException e) { // The instance was not found or the request failed. // The scheduledTask is considered orphan logger.trace("{} is considered orphan.", ObjectMapperManager - .getObjectMapper().writeValueAsString(scheduledTask)); + .getObjectMapper().writeValueAsString(scheduledTask), e); return true; + } catch (Throwable e) { + // The scheduledTask is NOT considered orphan + logger.trace("{} is NOT considered orphan.", ObjectMapperManager + .getObjectMapper().writeValueAsString(scheduledTask), e); + return false; } } catch (Exception e) { String string = ObjectMapperManager.getObjectMapper() .writeValueAsString(scheduledTask); logger.error("Error while checking orphanity of " + string - + ". Considering as not orphan."); + + ". Considering as not orphan.", e); } return false; diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java index 3dca539..433eaf0 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java @@ -25,7 +25,7 @@ public abstract class SmartExecutorPersistenceFactory { persistenceConnectors = new HashMap(); } - public static SmartExecutorPersistenceConnector getPersistenceConnector(String scope){ + private static SmartExecutorPersistenceConnector getPersistenceConnector(String scope) throws Exception { if(scope==null){ String error = "No Scope available."; logger.error(error); @@ -35,16 +35,7 @@ public abstract class SmartExecutorPersistenceFactory { logger.trace("Retrieving {} for scope {}", SmartExecutorPersistenceConnector.class.getSimpleName(), scope); - return persistenceConnectors.get(scope); - } - - /** - * @return the persistenceConnector - */ - public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception { - String scope = SmartExecutorInitializator.getCurrentScope(); - SmartExecutorPersistenceConnector persistence = - getPersistenceConnector(scope); + SmartExecutorPersistenceConnector persistence = persistenceConnectors.get(scope); if(persistence==null){ logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.", @@ -55,7 +46,7 @@ public abstract class SmartExecutorPersistenceFactory { SmartExecutorPersistenceConfiguration configuration = new SmartExecutorPersistenceConfiguration(className); - persistence = new OrientDBPersistenceConnector(scope, configuration); + persistence = new OrientDBPersistenceConnector(configuration); persistenceConnectors.put(SmartExecutorInitializator.getCurrentScope(), persistence); } @@ -63,6 +54,14 @@ public abstract class SmartExecutorPersistenceFactory { return persistence; } + /** + * @return the persistenceConnector + */ + public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception { + String scope = SmartExecutorInitializator.getCurrentScope(); + return getPersistenceConnector(scope); + } + public static synchronized void closePersistenceConnector() throws Exception { String scope = SmartExecutorInitializator.getCurrentScope(); SmartExecutorPersistenceConnector persistence = diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java index 1205320..2a4aa9d 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java @@ -42,8 +42,6 @@ public class OrientDBPersistenceConnector extends protected final static int LAST = -1; - protected final String scope; - protected final String SCOPE = "scope"; protected final String UUID = "uuid"; protected final String ITERATION = "iteration"; @@ -54,21 +52,13 @@ public class OrientDBPersistenceConnector extends protected OPartitionedDatabasePool oPartitionedDatabasePool; protected ObjectMapper mapper; - public OrientDBPersistenceConnector(String scope, - SmartExecutorPersistenceConfiguration configuration) + public OrientDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception { super(); - this.scope = scope; prepareConnection(configuration); this.mapper = ObjectMapperManager.getObjectMapper(); } - public OrientDBPersistenceConnector( - SmartExecutorPersistenceConfiguration configuration) - throws Exception { - this(SmartExecutorInitializator.getCurrentScope(), configuration); - } - protected void prepareConnection( SmartExecutorPersistenceConfiguration configuration) throws Exception { @@ -99,7 +89,7 @@ public class OrientDBPersistenceConnector extends String type = PluginStateEvolution.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); - params.put(SCOPE, scope); + params.put(SCOPE, SmartExecutorInitializator.getCurrentScope()); OSQLSynchQuery query = null; if (iterationNumber != LAST) { @@ -169,7 +159,7 @@ public class OrientDBPersistenceConnector extends PluginStateEvolution.class.getSimpleName()); String json = mapper.writeValueAsString(pluginStateEvolution); doc.fromJSON(json); - doc.field(SCOPE, scope); + doc.field(SCOPE, SmartExecutorInitializator.getCurrentScope()); doc.save(); db.commit(); @@ -263,7 +253,7 @@ public class OrientDBPersistenceConnector extends } catch (Exception e) { logger.error( "An Exception occurred while evaluating if {} is orphan", - json); + json, e); } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java index 4347477..85952ca 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java @@ -5,9 +5,10 @@ package org.gcube.vremanagement.executor.scheduler; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; @@ -16,9 +17,7 @@ import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException; -import org.gcube.vremanagement.executor.exception.SchedulerRemoveException; import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException; import org.gcube.vremanagement.executor.json.ObjectMapperManager; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; @@ -34,11 +33,9 @@ import org.quartz.JobKey; import org.quartz.ScheduleBuilder; import org.quartz.Scheduler; import org.quartz.SchedulerException; -import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; -import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,26 +47,15 @@ public class SmartExecutorScheduler { private static Logger logger = LoggerFactory .getLogger(SmartExecutorScheduler.class); - /** - * Contains running plugin instances. The key is the associated random UUID. - * This is needed to correctly stop the running plugin execution if the - * container is stopped in the proper way - */ - protected Map activeSchedulers; - - private static SmartExecutorScheduler smartExecutorScheduler; - - public synchronized static SmartExecutorScheduler getInstance() { - if (smartExecutorScheduler == null) { - smartExecutorScheduler = new SmartExecutorScheduler(); - } - return smartExecutorScheduler; - } - - private SmartExecutorScheduler() { - activeSchedulers = new HashMap(); - } + protected Set scheduledJobs; + protected final Scheduler shedul; + + SmartExecutorScheduler(Scheduler scheduler) throws SchedulerException { + this.shedul = scheduler; + this.shedul.start(); + this.scheduledJobs = new HashSet<>(); + } protected TriggerBuilder createTriggerBuilder(UUID uuid, ScheduleBuilder sb){ return TriggerBuilder.newTrigger().withIdentity(uuid.toString()) @@ -115,9 +101,7 @@ public class SmartExecutorScheduler { * @throws SchedulerException if the scheduler cannot be created by the * scheduler factory */ - protected Scheduler reallySchedule(final UUID uuid, LaunchParameter parameter) throws LaunchException, SchedulerException { - SchedulerFactory schedulerFactory = new StdSchedulerFactory(); - Scheduler scheduler = schedulerFactory.getScheduler(); + protected void reallySchedule(final UUID uuid, LaunchParameter parameter) throws LaunchException, SchedulerException { JobKey jobKey = new JobKey(uuid.toString()); JobDetail jobDetail = JobBuilder.newJob(SmartExecutorTask.class). @@ -183,13 +167,11 @@ public class SmartExecutorScheduler { try { SmartExecutorTaskListener sejl = new SmartExecutorTaskListener(); - scheduler.getListenerManager().addJobListener(sejl); - scheduler.scheduleJob(jobDetail, triggerBuilder.build()); + shedul.getListenerManager().addJobListener(sejl); + shedul.scheduleJob(jobDetail, triggerBuilder.build()); } catch (SchedulerException e) { throw new RuntimeException(e); } - - return scheduler; } /** @@ -220,75 +202,39 @@ public class SmartExecutorScheduler { } try { - Scheduler scheduler = reallySchedule(uuid, parameter); - activeSchedulers.put(uuid, scheduler); - scheduler.start(); + reallySchedule(uuid, parameter); + scheduledJobs.add(uuid); } catch (SchedulerException e) { throw new LaunchException(e); } return uuid; } - - public Scheduler getScheduler(UUID key){ - return activeSchedulers.get(key); - } - protected void stopLastcurrentExecution(Scheduler scheduler, UUID uuid) + protected void stopTask(UUID uuid) throws UnableToInterruptTaskException{ JobKey jobKey = new JobKey(uuid.toString()); try { logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid); - if(!scheduler.checkExists(jobKey)){ - logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid); + if(!shedul.checkExists(jobKey)){ + logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid); + scheduledJobs.remove(uuid); throw new SchedulerNotFoundException("Scheduler Not Found"); } - boolean interrupted = scheduler.interrupt(jobKey); + boolean interrupted = shedul.interrupt(jobKey); + shedul.deleteJob(jobKey); if (interrupted) { logger.debug("SmartExecutor Task {} interrupted successfully.", uuid); } else { - List list = getCurrentlyExecutingJobs(scheduler); - if(list!=null && list.size()>0){ - logger.debug("SmartExecutor Task {} was not interrupted.", uuid); - throw new UnableToInterruptTaskException(uuid); - } + logger.debug("SmartExecutor Task {} was not interrupted.", uuid); } - } catch (UnableToInterruptTaskException e) { - throw e; + } catch(Exception e){ throw new UnableToInterruptTaskException(uuid, e); - } - } - - - protected void deleteScheduler(Scheduler scheduler, UUID uuid) throws SchedulerRemoveException { - - JobKey jobKey = new JobKey(uuid.toString()); - - try { - logger.debug("Going to delete SmartExecutor Scheduled Task {}", uuid); - boolean deleted = scheduler.deleteJob(jobKey); - if (deleted) { - logger.debug("SmartExecutor Task {} deleted successfully", uuid); - } else { - logger.debug("SmartExecutor Task {} was not deleted", uuid); - throw new SchedulerRemoveException(uuid); - } - } catch(SchedulerRemoveException e){ - throw e; - } catch(Exception e1){ - throw new SchedulerRemoveException(uuid, e1); - } finally { - activeSchedulers.remove(uuid); - try { - scheduler.clear(); - } catch(SchedulerException e){ - throw new SchedulerRemoveException(uuid, e); - } - } + } } protected List getCurrentlyExecutingJobs(Scheduler scheduler) throws SchedulerException{ @@ -298,35 +244,13 @@ public class SmartExecutorScheduler { return cej; } - public LaunchParameter getLaunchParameter(Scheduler scheduler, JobKey jobKey) throws SchedulerException{ - JobDetail jobDetail = scheduler.getJobDetail(jobKey); + public LaunchParameter getLaunchParameter(JobKey jobKey) throws SchedulerException{ + JobDetail jobDetail = shedul.getJobDetail(jobKey); JobDataMap jobDataMap = jobDetail.getJobDataMap(); return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER); } - protected void removeFromPersistence(boolean global, UUID uuid, boolean remove) throws SchedulePersistenceException{ - try { - ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getScheduledTaskPersistence(); - if(remove){ - logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid); - stc.removeScheduledTask(uuid); - }else{ - if(global){ - logger.debug("Going to release the SmartExecutor Scheduled Task {}. The Task can be take in charge from another SmartExecutor instance", uuid); - stc.releaseScheduledTask(uuid); - }else{ - logger.debug("Going to remove the SmartExecutor Scheduled Task {} from local scheduling", uuid); - stc.removeScheduledTask(uuid); - } - } - }catch(Exception e){ - throw new SchedulePersistenceException( - String.format("Unable to Remove Scheduled Task %s from global scheduling", - uuid.toString()), e); - } - } - /** * Stop the execution of the Task identified by UUID * @param uuid which identify the Task @@ -334,59 +258,40 @@ public class SmartExecutorScheduler { * @param remove : when the Task is a Scheduled one indicate if the Task * has to be released or to be removed (the argument is set to true when * an explicit request arrive to remove the scheduled task) - * @throws UnableToInterruptTaskException - * @throws SchedulerRemoveException - * @throws SchedulePersistenceException + * @throws Exception * @throws SchedulerNotFoundException - * @throws SchedulerException */ - public synchronized void stop(UUID uuid, boolean stopOnly, boolean remove) - throws UnableToInterruptTaskException, SchedulerRemoveException, - SchedulePersistenceException, SchedulerException { - - Scheduler scheduler = activeSchedulers.get(uuid); - if(scheduler==null){ - logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid); - removeFromPersistence(true, uuid, remove); - return; - } + public synchronized void stop(UUID uuid, boolean remove) + throws Exception { JobKey jobKey = new JobKey(uuid.toString()); - boolean exist = scheduler.checkExists(jobKey); - if(!exist){ - logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid); - activeSchedulers.remove(uuid); - return; - }else{ - logger.trace("SmartExecutor Task {} to stop exist", uuid); - } - - LaunchParameter launchParameter = getLaunchParameter(scheduler, jobKey); + LaunchParameter launchParameter = getLaunchParameter(jobKey); Scheduling scheduling = launchParameter.getScheduling(); - boolean scheduled = launchParameter.getScheduling() != null ? true : false; + boolean scheduled = scheduling != null ? true : false; - stopLastcurrentExecution(scheduler, uuid); + stopTask(uuid); - try { - if(stopOnly ^ scheduled){ - deleteScheduler(scheduler, uuid); - } - }catch(Exception e){ - throw e; - } finally { - if(!stopOnly && scheduled){ - /* Removing scheduling from persistence */ - removeFromPersistence(scheduling.getGlobal(), uuid, remove); + ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getScheduledTaskPersistence(); + + if(scheduled){ + if(remove){ + logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid); + stc.removeScheduledTask(uuid); + }else{ + if(scheduling.getGlobal()){ + logger.debug("Going to release the SmartExecutor Scheduled Task {}. The Task can be take in charge from another SmartExecutor instance", uuid); + stc.releaseScheduledTask(uuid); + } } } } public void stopAll() { - List set = new ArrayList(activeSchedulers.keySet()); + List set = new ArrayList(scheduledJobs); for (UUID uuid : set) { try { - stop(uuid, true, false); + stop(uuid, false); } catch (Exception e) { logger.error("Error stopping plugin instace with UUID {}", uuid, e); diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java new file mode 100644 index 0000000..53dad23 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java @@ -0,0 +1,70 @@ +package org.gcube.vremanagement.executor.scheduler; + +import java.util.HashMap; +import java.util.Map; + +import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SchedulerFactory; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SmartExecutorSchedulerFactory { + + private static Logger logger = LoggerFactory.getLogger(SmartExecutorScheduler.class); + + private static Map smartExecutorSchedulers = new HashMap<>(); + + protected static SchedulerFactory schedulerFactory; + + static { + schedulerFactory = new StdSchedulerFactory(); + smartExecutorSchedulers = new HashMap<>(); + } + + private static SmartExecutorScheduler getSmartExecutorScheduler(String scope) throws SchedulerException { + if(scope==null){ + String error = "No Scope available."; + logger.error(error); + throw new RuntimeException(error); + } + + logger.trace("Retrieving {} for scope {}", + SmartExecutorPersistenceConnector.class.getSimpleName(), scope); + + SmartExecutorScheduler smartExecutorScheduler = smartExecutorSchedulers.get(scope); + + if(smartExecutorScheduler==null){ + logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.", + SmartExecutorScheduler.class.getSimpleName(), + scope, Map.class.getSimpleName()); + + Scheduler scheduler = schedulerFactory.getScheduler(); + smartExecutorScheduler = new SmartExecutorScheduler(scheduler); + + smartExecutorSchedulers.put(SmartExecutorInitializator.getCurrentScope(), + smartExecutorScheduler); + } + + return smartExecutorScheduler; + } + + /** + * @return the persistenceConnector + * @throws SchedulerException + */ + public static synchronized SmartExecutorScheduler getSmartExecutorScheduler() throws SchedulerException { + String scope = SmartExecutorInitializator.getCurrentScope(); + return getSmartExecutorScheduler(scope); + } + + + public static void remove(){ + String scope = SmartExecutorInitializator.getCurrentScope(); + smartExecutorSchedulers.remove(scope); + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java index 938ce72..c63032b 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java @@ -16,9 +16,6 @@ import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException; import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; -import org.gcube.vremanagement.executor.exception.SchedulerRemoveException; -import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; import org.gcube.vremanagement.executor.plugin.Plugin; @@ -31,7 +28,6 @@ import org.quartz.InterruptableJob; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; -import org.quartz.SchedulerException; import org.quartz.UnableToInterruptJobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,9 +296,8 @@ public class SmartExecutorTask implements InterruptableJob { } protected void deschedule(boolean globally) - throws UnableToInterruptTaskException, SchedulerRemoveException, - SchedulePersistenceException, SchedulerException { - SmartExecutorScheduler.getInstance().stop(uuid, false, globally); + throws Exception { + SmartExecutorSchedulerFactory.getSmartExecutorScheduler().stop(uuid, globally); } @Override diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java index 2e22284..a932ce4 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java @@ -20,6 +20,7 @@ import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFact import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; +import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory; import org.junit.Assert; import org.junit.Test; import org.quartz.CronExpression; @@ -32,470 +33,457 @@ import org.slf4j.LoggerFactory; public class SmartExecutorSchedulerTest extends ScopedTest { private static Logger logger = LoggerFactory.getLogger(SmartExecutorSchedulerTest.class); - + public static final String START = "START"; public static final String END = "END"; - + public static SmartExecutorPersistenceConnector pc; - + public UUID scheduleTest(Scheduling scheduling, Long sleepTime) throws Exception { Map inputs = new HashMap(); - if(sleepTime==null){ - sleepTime = new Long(10*1000); // 10 sec = 10 * 1000 millisec + if (sleepTime == null) { + sleepTime = new Long(10 * 1000); // 10 sec = 10 * 1000 millisec } inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime); inputs.put("Test UUID", UUID.randomUUID()); logger.debug("Inputs : {}", inputs); - + LaunchParameter parameter = new LaunchParameter(HelloWorldPluginDeclaration.NAME, inputs); parameter.setScheduling(scheduling); - - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); UUID uuid = smartExecutorScheduler.schedule(parameter, null); logger.debug("Scheduled Job with ID {}", uuid); - + pc = SmartExecutorPersistenceFactory.getPersistenceConnector(); - + return uuid; } - - /* DeprecatedHelloWorldPlugin dependency needed - @Test - public void deprecatedConstructorTest() throws Exception { - Map inputs = new HashMap(); - Long sleepTime = new Long(10*1000); // 10 sec = 10 * 1000 millisec - inputs.put(DeprecatedHelloWorldPlugin.SLEEP_TIME, sleepTime); - inputs.put("Test UUID", UUID.randomUUID()); - logger.debug("Inputs : {}", inputs); - - LaunchParameter parameter = new LaunchParameter(DeprecatedHelloWorldPluginDeclaration.class.newInstance(), inputs); - parameter.setScheduling(null); - - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); - UUID uuid = smartExecutorScheduler.schedule(parameter); - logger.debug("Scheduled Job with ID {}", uuid); - - long startTime = Calendar.getInstance().getTimeInMillis(); - long endTime = startTime; - while(endTime <= (startTime + 12000)){ - endTime = Calendar.getInstance().getTimeInMillis(); - } - PluginState pluginState = pc.getLastPluginInstanceState(uuid); - Assert.assertEquals(PluginState.DONE, pluginState); - } - */ - + + /* + * DeprecatedHelloWorldPlugin dependency needed + * + * @Test public void deprecatedConstructorTest() throws Exception { + * Map inputs = new HashMap(); Long + * sleepTime = new Long(10*1000); // 10 sec = 10 * 1000 millisec + * inputs.put(DeprecatedHelloWorldPlugin.SLEEP_TIME, sleepTime); + * inputs.put("Test UUID", UUID.randomUUID()); logger.debug("Inputs : {}", + * inputs); + * + * LaunchParameter parameter = new + * LaunchParameter(DeprecatedHelloWorldPluginDeclaration.class.newInstance() + * , inputs); parameter.setScheduling(null); + * + * SmartExecutorScheduler smartExecutorScheduler = + * SmartExecutorScheduler.getInstance(); UUID uuid = + * smartExecutorScheduler.schedule(parameter); + * logger.debug("Scheduled Job with ID {}", uuid); + * + * long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = + * startTime; while(endTime <= (startTime + 12000)){ endTime = + * Calendar.getInstance().getTimeInMillis(); } PluginState pluginState = + * pc.getLastPluginInstanceState(uuid); + * Assert.assertEquals(PluginState.DONE, pluginState); } + */ + @Test public void schedulingTest() throws Exception { UUID uuid = scheduleTest(null, null); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 12000)){ + while (endTime <= (startTime + 12000)) { endTime = Calendar.getInstance().getTimeInMillis(); } PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid); Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState()); } - + @Test public void earlyStopTest() throws Exception { - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); UUID uuid = scheduleTest(null, null); try { - smartExecutorScheduler.stop(uuid, true, false); - } catch(UnableToInterruptTaskException e){ + smartExecutorScheduler.stop(uuid, false); + } catch (UnableToInterruptTaskException e) { logger.error("UnableToInterruptTaskException this is the normal behaviour.", e); return; } - + long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 12000)){ + while (endTime <= (startTime + 12000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - - try{ + + try { PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5); Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState()); - }catch(PluginStateNotRetrievedException e){ + } catch (PluginStateNotRetrievedException e) { // OK logger.error("PluginStateNotRetrievedException this can be acceptable in some tests", e); } - + } - + @Test public void middleStopTest() throws Exception { UUID uuid = scheduleTest(null, null); - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); - + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); + long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 2000)){ + while (endTime <= (startTime + 2000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - - smartExecutorScheduler.stop(uuid, true, false); - + + smartExecutorScheduler.stop(uuid, false); + startTime = Calendar.getInstance().getTimeInMillis(); endTime = startTime; - while(endTime <= (startTime + 10000)){ + while (endTime <= (startTime + 10000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - + PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid); Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState()); } - + @Test public void lateStopTest() throws Exception { UUID uuid = scheduleTest(null, null); - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 12000)){ + while (endTime <= (startTime + 12000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - smartExecutorScheduler.stop(uuid, true, false); - + smartExecutorScheduler.stop(uuid, false); + PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid); Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState()); } - - + @Test public void doubleLaunchfirstStoppedSchedulingTest() throws Exception { UUID first = scheduleTest(null, null); logger.debug("First scheduled id {}", first); UUID second = scheduleTest(null, null); logger.debug("Second scheduled id {}", second); - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 2000)){ + while (endTime <= (startTime + 2000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - smartExecutorScheduler.stop(first, true, false); - + smartExecutorScheduler.stop(first, false); + startTime = Calendar.getInstance().getTimeInMillis(); endTime = startTime; - while(endTime <= (startTime + 12000)){ + while (endTime <= (startTime + 12000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - + PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(first); Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState()); - + pluginStateEvolution = pc.getLastPluginInstanceState(second); Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState()); } - + @Test public void delayed() throws Exception { Scheduling scheduling = new Scheduling(20); UUID uuid = scheduleTest(scheduling, new Long(10 * 1000)); - - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 83 * 1000)){ + while (endTime <= (startTime + 83 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - smartExecutorScheduler.stop(uuid, true, false); - + smartExecutorScheduler.stop(uuid, false); + startTime = Calendar.getInstance().getTimeInMillis(); endTime = startTime; - while(endTime <= (startTime + 30 * 1000)){ + while (endTime <= (startTime + 30 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - for(int i=1; i<5; i++){ + for (int i = 1; i < 5; i++) { PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, i); Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState()); } - + PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5); Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState()); - + } - + @Test public void delayedPreviousMustBeTerminated() throws Exception { Scheduling scheduling = new Scheduling(20, true); UUID uuid = scheduleTest(scheduling, new Long(22 * 1000)); - - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 80 * 1000)){ + while (endTime <= (startTime + 80 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - smartExecutorScheduler.stop(uuid, true, false); - + smartExecutorScheduler.stop(uuid, false); + startTime = Calendar.getInstance().getTimeInMillis(); endTime = startTime; - while(endTime <= (startTime + 30 * 1000)){ + while (endTime <= (startTime + 30 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - for(int i=1; i<5; i++){ + for (int i = 1; i < 5; i++) { PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, i); - if(i%2!=0){ + if (i % 2 != 0) { Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState()); - }else{ + } else { Assert.assertEquals(PluginState.DISCARDED, pluginStateEvolution.getPluginState()); } } - + PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5); Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState()); - + } - + @Test public void delayedAllPreviousMustBeTerminated() throws Exception { Scheduling scheduling = new Scheduling(20, true); UUID uuid = scheduleTest(scheduling, new Long(45 * 1000)); - - SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; - while(endTime <= (startTime + 1.5 * 60 * 1000)){ + while (endTime <= (startTime + 1.5 * 60 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - smartExecutorScheduler.stop(uuid, true, false); - + smartExecutorScheduler.stop(uuid, false); + startTime = Calendar.getInstance().getTimeInMillis(); endTime = startTime; - while(endTime <= (startTime + 30 * 1000)){ + while (endTime <= (startTime + 30 * 1000)) { endTime = Calendar.getInstance().getTimeInMillis(); } - PluginState[] expectedStates = new PluginState[]{ - PluginState.DONE, - PluginState.DISCARDED, - PluginState.DISCARDED, - PluginState.STOPPED - }; - - for(int i=0; i