smart-executor/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java

232 lines
6.8 KiB
Java
Raw Normal View History

/**
*
*/
package org.gcube.vremanagement.executor.scheduler;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class SmartExecutorScheduler {
private static Logger logger = LoggerFactory
.getLogger(SmartExecutorScheduler.class);
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
protected Map<UUID, Scheduler> activeSchedulers;
private static SmartExecutorScheduler smartExecutorScheduler;
public synchronized static SmartExecutorScheduler getInstance() {
if (smartExecutorScheduler == null) {
smartExecutorScheduler = new SmartExecutorScheduler();
}
return smartExecutorScheduler;
}
private SmartExecutorScheduler() {
activeSchedulers = new HashMap<UUID, Scheduler>();
}
protected TriggerBuilder<? extends Trigger> createTriggerBuilder(UUID uuid, ScheduleBuilder<? extends Trigger> sb){
return TriggerBuilder.newTrigger().withIdentity(uuid.toString())
.withSchedule(sb);
}
protected TriggerBuilder<? extends Trigger> getTriggerBuilderWithScheduling(UUID uuid, Scheduling scheduling) throws LaunchException{
final int times = scheduling.getSchedulingTimes();
if (scheduling.getCronExpression() != null) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
.cronSchedule(scheduling.getCronExpression());
return createTriggerBuilder(uuid, cronScheduleBuilder);
// TODO Limit number of times
}
if (scheduling.getDelay() != null) {
SimpleScheduleBuilder simpleScheduleBuilder;
if (times != 0) {
simpleScheduleBuilder = SimpleScheduleBuilder
.repeatSecondlyForTotalCount(times, scheduling.getDelay());
}else{
simpleScheduleBuilder = SimpleScheduleBuilder.
repeatSecondlyForever(scheduling.getDelay());
}
return createTriggerBuilder(uuid, simpleScheduleBuilder);
}
throw new LaunchException("Invalid Scheduling");
}
protected Scheduler reallySchedule(final UUID uuid, Scheduler scheduler, LaunchParameter parameter) throws LaunchException,
InputsNullException, PluginNotFoundException {
JobKey jobKey = new JobKey(uuid.toString());
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorJob.class).
withIdentity(jobKey).build();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put(SmartExecutorJob.UUID, uuid);
jobDataMap.put(SmartExecutorJob.LAUNCH_PARAMETER, parameter);
@SuppressWarnings("rawtypes")
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger()
.withIdentity(uuid.toString());
Scheduling scheduling = parameter.getScheduling();
if (scheduling != null) {
triggerBuilder = getTriggerBuilderWithScheduling(uuid, scheduling);
if (scheduling.getFirtStartTime() != null && scheduling.getFirtStartTime().longValue()!=0) {
Date triggerStartTime = new Date(scheduling.getFirtStartTime());
triggerBuilder.startAt(triggerStartTime);
} else {
triggerBuilder.startNow();
}
if (scheduling.getEndTime() != null && scheduling.getEndTime().longValue()!=0) {
Date triggerEndTime = new Date(scheduling.getEndTime());
triggerBuilder.endAt(triggerEndTime);
}
} else {
triggerBuilder.startNow();
}
try {
SmartExecutorJobListener sejl = new SmartExecutorJobListener();
scheduler.getListenerManager().addJobListener(sejl);
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
return scheduler;
}
public synchronized UUID schedule(LaunchParameter parameter) throws LaunchException,
InputsNullException, PluginNotFoundException {
Map<String, Object> inputs = parameter.getInputs();
if (inputs == null) {
throw new RuntimeException();
}
final UUID uuid = UUID.randomUUID();
Scheduler scheduler;
try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.getScheduler();
reallySchedule(uuid, scheduler, parameter);
activeSchedulers.put(uuid, scheduler);
scheduler.start();
} catch (SchedulerException e) {
throw new LaunchException(e);
}
return uuid;
}
public Scheduler getScheduler(UUID key){
return activeSchedulers.get(key);
}
public synchronized void stop(UUID uuid) throws SchedulerException {
Scheduler scheduler = activeSchedulers.get(uuid);
if(scheduler==null){
throw new SchedulerNotFoundException("Scheduler Not Found");
}
JobKey jobKey = new JobKey(uuid.toString());
boolean exist = scheduler.checkExists(jobKey);
if(!exist){
logger.trace("Job {} does not exist. Terminating the stop method", uuid);
return;
}else{
logger.trace("Job {} exist", uuid);
}
logger.trace("Getting {} list", JobExecutionContext.class.getSimpleName());
List<JobExecutionContext> cej = scheduler.getCurrentlyExecutingJobs();
while (cej.isEmpty()){
cej = scheduler.getCurrentlyExecutingJobs();
}
logger.trace("{} list got", JobExecutionContext.class.getSimpleName());
boolean interrupted = scheduler.interrupt(jobKey);
if (interrupted) {
logger.debug("Job {} interrupted successfully. Going to delete it.", uuid);
} else {
logger.debug("Job {} was not interrupted, going to delete it", uuid);
}
boolean deleted = scheduler.deleteJob(jobKey);
if (deleted) {
logger.debug("Job {} deleted successfully", uuid);
} else {
logger.debug("Job {} was not deleted", uuid);
}
activeSchedulers.remove(uuid);
scheduler.clear();
}
public void stopAll() {
List<UUID> set = new ArrayList<UUID>(activeSchedulers.keySet());
for (UUID uuid : set) {
try {
stop(uuid);
} catch (Exception e) {
logger.error("Error stopping plugin instace with UUID {}",
uuid, e);
}
}
}
}