2015-07-28 15:41:35 +02:00
|
|
|
/**
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
package org.gcube.vremanagement.executor.scheduler;
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
2015-09-28 17:44:12 +02:00
|
|
|
import java.util.Calendar;
|
2015-07-28 15:41:35 +02:00
|
|
|
import java.util.Date;
|
|
|
|
import java.util.HashMap;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.UUID;
|
|
|
|
|
2015-09-28 17:44:12 +02:00
|
|
|
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
2016-09-22 11:31:05 +02:00
|
|
|
import org.gcube.smartgears.ContextProvider;
|
2015-07-28 15:41:35 +02:00
|
|
|
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
|
|
|
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
2015-09-28 17:44:12 +02:00
|
|
|
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
|
|
|
|
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory;
|
2015-07-28 15:41:35 +02:00
|
|
|
import org.gcube.vremanagement.executor.exception.InputsNullException;
|
|
|
|
import org.gcube.vremanagement.executor.exception.LaunchException;
|
|
|
|
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
|
2015-10-08 12:10:55 +02:00
|
|
|
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
2015-07-28 15:41:35 +02:00
|
|
|
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
|
2015-11-30 12:25:05 +01:00
|
|
|
import org.gcube.vremanagement.executor.exception.SchedulerRemoveException;
|
|
|
|
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
|
|
|
|
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
|
2015-07-28 15:41:35 +02:00
|
|
|
import org.quartz.CronScheduleBuilder;
|
|
|
|
import org.quartz.JobBuilder;
|
|
|
|
import org.quartz.JobDataMap;
|
|
|
|
import org.quartz.JobDetail;
|
|
|
|
import org.quartz.JobExecutionContext;
|
|
|
|
import org.quartz.JobKey;
|
|
|
|
import org.quartz.ScheduleBuilder;
|
|
|
|
import org.quartz.Scheduler;
|
|
|
|
import org.quartz.SchedulerException;
|
|
|
|
import org.quartz.SchedulerFactory;
|
|
|
|
import org.quartz.SimpleScheduleBuilder;
|
|
|
|
import org.quartz.Trigger;
|
|
|
|
import org.quartz.TriggerBuilder;
|
|
|
|
import org.quartz.impl.StdSchedulerFactory;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
|
|
|
*/
|
|
|
|
public class SmartExecutorScheduler {
|
|
|
|
|
|
|
|
private static Logger logger = LoggerFactory
|
|
|
|
.getLogger(SmartExecutorScheduler.class);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Contains running plugin instances. The key is the associated random UUID.
|
|
|
|
* This is needed to correctly stop the running plugin execution if the
|
|
|
|
* container is stopped in the proper way
|
|
|
|
*/
|
|
|
|
protected Map<UUID, Scheduler> activeSchedulers;
|
|
|
|
|
|
|
|
private static SmartExecutorScheduler smartExecutorScheduler;
|
|
|
|
|
|
|
|
public synchronized static SmartExecutorScheduler getInstance() {
|
|
|
|
if (smartExecutorScheduler == null) {
|
|
|
|
smartExecutorScheduler = new SmartExecutorScheduler();
|
|
|
|
}
|
|
|
|
return smartExecutorScheduler;
|
|
|
|
}
|
|
|
|
|
|
|
|
private SmartExecutorScheduler() {
|
|
|
|
activeSchedulers = new HashMap<UUID, Scheduler>();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected TriggerBuilder<? extends Trigger> createTriggerBuilder(UUID uuid, ScheduleBuilder<? extends Trigger> sb){
|
|
|
|
return TriggerBuilder.newTrigger().withIdentity(uuid.toString())
|
|
|
|
.withSchedule(sb);
|
|
|
|
}
|
|
|
|
|
|
|
|
protected TriggerBuilder<? extends Trigger> getTriggerBuilderWithScheduling(UUID uuid, Scheduling scheduling) throws LaunchException{
|
|
|
|
|
|
|
|
final int times = scheduling.getSchedulingTimes();
|
|
|
|
|
|
|
|
if (scheduling.getCronExpression() != null) {
|
|
|
|
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
|
|
|
|
.cronSchedule(scheduling.getCronExpression());
|
|
|
|
|
|
|
|
return createTriggerBuilder(uuid, cronScheduleBuilder);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (scheduling.getDelay() != null) {
|
|
|
|
SimpleScheduleBuilder simpleScheduleBuilder;
|
|
|
|
|
|
|
|
if (times != 0) {
|
|
|
|
simpleScheduleBuilder = SimpleScheduleBuilder
|
|
|
|
.repeatSecondlyForTotalCount(times, scheduling.getDelay());
|
|
|
|
}else{
|
|
|
|
simpleScheduleBuilder = SimpleScheduleBuilder.
|
|
|
|
repeatSecondlyForever(scheduling.getDelay());
|
|
|
|
}
|
|
|
|
|
|
|
|
return createTriggerBuilder(uuid, simpleScheduleBuilder);
|
|
|
|
}
|
|
|
|
|
|
|
|
throw new LaunchException("Invalid Scheduling");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
/**
|
|
|
|
* Create the Scheduler using the strategy provided by LaunchParameter
|
|
|
|
* @param uuid the UUID will be used to identify the task
|
|
|
|
* @param parameter LaunchParameter requested in service invocation
|
|
|
|
* @return the created scheduler
|
|
|
|
* @throws LaunchException if the LaunchParameter does not contains a valid
|
|
|
|
* scheduling strategy
|
|
|
|
* @throws SchedulerException if the scheduler cannot be created by the
|
|
|
|
* scheduler factory
|
|
|
|
*/
|
|
|
|
protected Scheduler reallySchedule(final UUID uuid, LaunchParameter parameter) throws LaunchException, SchedulerException {
|
|
|
|
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
|
|
|
|
Scheduler scheduler = schedulerFactory.getScheduler();
|
2015-07-28 15:41:35 +02:00
|
|
|
|
|
|
|
JobKey jobKey = new JobKey(uuid.toString());
|
2015-11-30 12:25:05 +01:00
|
|
|
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorTask.class).
|
2015-07-28 15:41:35 +02:00
|
|
|
withIdentity(jobKey).build();
|
|
|
|
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
2015-11-30 12:25:05 +01:00
|
|
|
jobDataMap.put(SmartExecutorTask.UUID, uuid);
|
|
|
|
jobDataMap.put(SmartExecutorTask.LAUNCH_PARAMETER, parameter);
|
2015-07-28 15:41:35 +02:00
|
|
|
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
|
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger()
|
|
|
|
.withIdentity(uuid.toString());
|
|
|
|
|
|
|
|
Scheduling scheduling = parameter.getScheduling();
|
|
|
|
|
|
|
|
if (scheduling != null) {
|
|
|
|
|
|
|
|
triggerBuilder = getTriggerBuilderWithScheduling(uuid, scheduling);
|
|
|
|
|
|
|
|
if (scheduling.getFirtStartTime() != null && scheduling.getFirtStartTime().longValue()!=0) {
|
|
|
|
Date triggerStartTime = new Date(scheduling.getFirtStartTime());
|
|
|
|
triggerBuilder.startAt(triggerStartTime);
|
|
|
|
} else {
|
|
|
|
triggerBuilder.startNow();
|
2015-09-28 17:44:12 +02:00
|
|
|
scheduling.setFirstStartTime(Calendar.getInstance().getTimeInMillis());
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if (scheduling.getEndTime() != null && scheduling.getEndTime().longValue()!=0) {
|
|
|
|
Date triggerEndTime = new Date(scheduling.getEndTime());
|
|
|
|
triggerBuilder.endAt(triggerEndTime);
|
|
|
|
}
|
|
|
|
|
2015-09-25 11:20:54 +02:00
|
|
|
try {
|
2016-09-22 11:31:05 +02:00
|
|
|
String runningInstanceID = ContextProvider.get().profile(GCoreEndpoint.class).id();
|
2015-11-30 12:25:05 +01:00
|
|
|
logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ",
|
|
|
|
uuid.toString(), runningInstanceID, parameter);
|
|
|
|
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
2015-09-28 17:44:12 +02:00
|
|
|
stc.addScheduledTask(uuid, runningInstanceID, parameter);
|
|
|
|
} catch (Exception e) {
|
2015-11-30 12:25:05 +01:00
|
|
|
logger.error("Unable to persist Scheduled Task {}", uuid.toString(), e.getCause());
|
2015-09-28 17:44:12 +02:00
|
|
|
}
|
2015-09-25 11:20:54 +02:00
|
|
|
|
2015-07-28 15:41:35 +02:00
|
|
|
} else {
|
|
|
|
triggerBuilder.startNow();
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
2015-11-30 12:25:05 +01:00
|
|
|
SmartExecutorTaskListener sejl = new SmartExecutorTaskListener();
|
2015-07-28 15:41:35 +02:00
|
|
|
scheduler.getListenerManager().addJobListener(sejl);
|
|
|
|
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
|
|
|
|
} catch (SchedulerException e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
return scheduler;
|
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
/**
|
|
|
|
* Schedule a task execution
|
|
|
|
* @param parameter LaunchParameter requested in service invocation
|
|
|
|
* @return the UUID which will identify the task
|
|
|
|
* @throws LaunchException if the LaunchParameter does not contains a valid
|
|
|
|
* scheduling strategy
|
|
|
|
* @throws InputsNullException if provided input map is null
|
|
|
|
* @throws PluginNotFoundException if the request plugin is not available on
|
|
|
|
* this smart executor instance
|
|
|
|
*/
|
|
|
|
public synchronized UUID schedule(LaunchParameter parameter)
|
|
|
|
throws InputsNullException, PluginNotFoundException, LaunchException {
|
2015-07-28 15:41:35 +02:00
|
|
|
Map<String, Object> inputs = parameter.getInputs();
|
|
|
|
if (inputs == null) {
|
2015-11-30 12:25:05 +01:00
|
|
|
throw new InputsNullException();
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
/*
|
|
|
|
* Checking if the requested plugin is available on this smart executor
|
|
|
|
* instance
|
|
|
|
*/
|
|
|
|
PluginManager.getPluginDeclaration(parameter.getPluginName());
|
|
|
|
|
2015-07-28 15:41:35 +02:00
|
|
|
final UUID uuid = UUID.randomUUID();
|
|
|
|
|
|
|
|
try {
|
2015-11-30 12:25:05 +01:00
|
|
|
Scheduler scheduler = reallySchedule(uuid, parameter);
|
2015-07-28 15:41:35 +02:00
|
|
|
activeSchedulers.put(uuid, scheduler);
|
|
|
|
scheduler.start();
|
|
|
|
} catch (SchedulerException e) {
|
|
|
|
throw new LaunchException(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
return uuid;
|
|
|
|
}
|
|
|
|
|
|
|
|
public Scheduler getScheduler(UUID key){
|
|
|
|
return activeSchedulers.get(key);
|
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
protected void stopLastcurrentExecution(Scheduler scheduler, UUID uuid)
|
|
|
|
throws UnableToInterruptTaskException{
|
2015-07-28 15:41:35 +02:00
|
|
|
|
|
|
|
JobKey jobKey = new JobKey(uuid.toString());
|
2015-11-30 12:25:05 +01:00
|
|
|
|
|
|
|
try {
|
|
|
|
logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid);
|
|
|
|
if(!scheduler.checkExists(jobKey)){
|
|
|
|
logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid);
|
|
|
|
throw new SchedulerNotFoundException("Scheduler Not Found");
|
|
|
|
}
|
2016-09-22 11:31:05 +02:00
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
boolean interrupted = scheduler.interrupt(jobKey);
|
|
|
|
if (interrupted) {
|
|
|
|
logger.debug("SmartExecutor Task {} interrupted successfully.", uuid);
|
|
|
|
} else {
|
2016-09-22 11:31:05 +02:00
|
|
|
List<JobExecutionContext> list = getCurrentlyExecutingJobs(scheduler);
|
|
|
|
if(list!=null && list.size()>0){
|
|
|
|
logger.debug("SmartExecutor Task {} was not interrupted.", uuid);
|
|
|
|
throw new UnableToInterruptTaskException(uuid);
|
|
|
|
}
|
2015-11-30 12:25:05 +01:00
|
|
|
}
|
|
|
|
} catch (UnableToInterruptTaskException e) {
|
|
|
|
throw e;
|
2016-09-22 11:31:05 +02:00
|
|
|
} catch(Exception e){
|
|
|
|
throw new UnableToInterruptTaskException(uuid, e);
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
2015-11-30 12:25:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected void deleteScheduler(Scheduler scheduler, UUID uuid) throws SchedulerRemoveException {
|
|
|
|
|
|
|
|
JobKey jobKey = new JobKey(uuid.toString());
|
2015-07-28 15:41:35 +02:00
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
try {
|
|
|
|
logger.debug("Going to delete SmartExecutor Scheduled Task {}", uuid);
|
|
|
|
boolean deleted = scheduler.deleteJob(jobKey);
|
|
|
|
if (deleted) {
|
|
|
|
logger.debug("SmartExecutor Task {} deleted successfully", uuid);
|
|
|
|
} else {
|
|
|
|
logger.debug("SmartExecutor Task {} was not deleted", uuid);
|
|
|
|
throw new SchedulerRemoveException(uuid);
|
|
|
|
}
|
|
|
|
} catch(SchedulerRemoveException e){
|
|
|
|
throw e;
|
|
|
|
} catch(Exception e1){
|
|
|
|
throw new SchedulerRemoveException(uuid, e1);
|
|
|
|
} finally {
|
|
|
|
activeSchedulers.remove(uuid);
|
|
|
|
try {
|
|
|
|
scheduler.clear();
|
|
|
|
} catch(SchedulerException e){
|
|
|
|
throw new SchedulerRemoveException(uuid, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
protected List<JobExecutionContext> getCurrentlyExecutingJobs(Scheduler scheduler) throws SchedulerException{
|
2015-07-28 15:41:35 +02:00
|
|
|
logger.trace("Getting {} list", JobExecutionContext.class.getSimpleName());
|
|
|
|
List<JobExecutionContext> cej = scheduler.getCurrentlyExecutingJobs();
|
2015-11-30 12:25:05 +01:00
|
|
|
logger.trace("{} list got {}", JobExecutionContext.class.getSimpleName(), cej);
|
|
|
|
return cej;
|
|
|
|
}
|
|
|
|
|
|
|
|
public LaunchParameter getLaunchParameter(Scheduler scheduler, JobKey jobKey) throws SchedulerException{
|
|
|
|
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
|
|
|
|
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
|
|
|
return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-09-22 11:31:05 +02:00
|
|
|
protected void removeFromPersistence(boolean global, UUID uuid, boolean remove) throws SchedulePersistenceException{
|
|
|
|
try {
|
|
|
|
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
|
|
|
if(remove){
|
|
|
|
logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid);
|
|
|
|
stc.removeScheduledTask(uuid);
|
|
|
|
}else{
|
|
|
|
if(global){
|
|
|
|
logger.debug("Going to release the SmartExecutor Scheduled Task {}. The Task can be take in charge from another SmartExecutor instance", uuid);
|
|
|
|
stc.releaseScheduledTask(uuid);
|
|
|
|
}else{
|
|
|
|
logger.debug("Going to remove the SmartExecutor Scheduled Task {} from local scheduling", uuid);
|
|
|
|
stc.removeScheduledTask(uuid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}catch(Exception e){
|
|
|
|
throw new SchedulePersistenceException(
|
|
|
|
String.format("Unable to Remove Scheduled Task %s from global scheduling",
|
|
|
|
uuid.toString()), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
/**
|
|
|
|
* Stop the execution of the Task identified by UUID
|
|
|
|
* @param uuid which identify the Task
|
|
|
|
* @param stopOnly
|
|
|
|
* @param remove : when the Task is a Scheduled one indicate if the Task
|
|
|
|
* has to be released or to be removed (the argument is set to true when
|
|
|
|
* an explicit request arrive to remove the scheduled task)
|
|
|
|
* @throws UnableToInterruptTaskException
|
|
|
|
* @throws SchedulerRemoveException
|
|
|
|
* @throws SchedulePersistenceException
|
|
|
|
* @throws SchedulerNotFoundException
|
|
|
|
* @throws SchedulerException
|
|
|
|
*/
|
|
|
|
public synchronized void stop(UUID uuid, boolean stopOnly, boolean remove)
|
|
|
|
throws UnableToInterruptTaskException, SchedulerRemoveException,
|
|
|
|
SchedulePersistenceException, SchedulerException {
|
2015-07-28 15:41:35 +02:00
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
Scheduler scheduler = activeSchedulers.get(uuid);
|
|
|
|
if(scheduler==null){
|
|
|
|
logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid);
|
2016-09-22 11:31:05 +02:00
|
|
|
removeFromPersistence(true, uuid, remove);
|
2015-11-30 12:25:05 +01:00
|
|
|
return;
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
JobKey jobKey = new JobKey(uuid.toString());
|
|
|
|
boolean exist = scheduler.checkExists(jobKey);
|
|
|
|
if(!exist){
|
2016-09-22 11:31:05 +02:00
|
|
|
logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid);
|
2015-11-30 12:25:05 +01:00
|
|
|
activeSchedulers.remove(uuid);
|
|
|
|
return;
|
|
|
|
}else{
|
2016-09-22 11:31:05 +02:00
|
|
|
logger.trace("SmartExecutor Task {} to stop exist", uuid);
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
|
|
|
|
2015-11-30 12:25:05 +01:00
|
|
|
LaunchParameter launchParameter = getLaunchParameter(scheduler, jobKey);
|
|
|
|
Scheduling scheduling = launchParameter.getScheduling();
|
|
|
|
boolean scheduled = launchParameter.getScheduling() != null ? true : false;
|
|
|
|
|
2016-09-22 11:31:05 +02:00
|
|
|
stopLastcurrentExecution(scheduler, uuid);
|
2015-10-08 12:10:55 +02:00
|
|
|
|
|
|
|
try {
|
2016-09-22 11:31:05 +02:00
|
|
|
if(stopOnly ^ scheduled){
|
|
|
|
deleteScheduler(scheduler, uuid);
|
2015-10-08 12:10:55 +02:00
|
|
|
}
|
|
|
|
}catch(Exception e){
|
2016-09-22 11:31:05 +02:00
|
|
|
throw e;
|
2015-11-30 12:25:05 +01:00
|
|
|
} finally {
|
2016-09-22 11:31:05 +02:00
|
|
|
if(!stopOnly && scheduled){
|
|
|
|
/* Removing scheduling from persistence */
|
|
|
|
removeFromPersistence(scheduling.getGlobal(), uuid, remove);
|
|
|
|
}
|
2015-10-08 12:10:55 +02:00
|
|
|
}
|
|
|
|
|
2015-07-28 15:41:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public void stopAll() {
|
|
|
|
List<UUID> set = new ArrayList<UUID>(activeSchedulers.keySet());
|
|
|
|
for (UUID uuid : set) {
|
|
|
|
try {
|
2015-11-30 12:25:05 +01:00
|
|
|
stop(uuid, true, false);
|
2015-07-28 15:41:35 +02:00
|
|
|
} catch (Exception e) {
|
|
|
|
logger.error("Error stopping plugin instace with UUID {}",
|
|
|
|
uuid, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|