refs #521: Support Unscheduling of repetitive task on SmartExecutor

https://support.d4science.org/issues/521

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@119021 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-09-25 09:20:54 +00:00
parent 8e97a0b67f
commit c6df0384ca
9 changed files with 153 additions and 47 deletions

View File

@ -11,6 +11,7 @@ import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException; import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
@ -48,10 +49,21 @@ public class SmartExecutorImpl implements SmartExecutor {
/**{@inheritDoc}*/ /**{@inheritDoc}*/
@Override @Override
public void unSchedule(String executionIdentifier) throws SchedulerException { public boolean unSchedule(String executionIdentifier) {
boolean currentStopped = true;
try {
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorInitalizator.getSmartExecutorScheduler(); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorInitalizator.getSmartExecutorScheduler();
UUID uuid = UUID.fromString(executionIdentifier); UUID uuid = UUID.fromString(executionIdentifier);
smartExecutorScheduler.stop(uuid); smartExecutorScheduler.stop(uuid);
} catch (SchedulerNotFoundException snfe) {
currentStopped = true;
} catch(SchedulerException e){
currentStopped = false;
}
// TODO Remove from configuration
return currentStopped;
} }
/**{@inheritDoc}*/ /**{@inheritDoc}*/

View File

@ -35,8 +35,7 @@ import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent; import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler; import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.configuration.JSONLaunchParameter; import org.gcube.vremanagement.executor.configuration.jsonbased.FileConfiguredTasks;
import org.gcube.vremanagement.executor.configuration.ConfiguredTasks;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
@ -74,12 +73,12 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
protected static SmartExecutorScheduler smartExecutorScheduler; protected static SmartExecutorScheduler smartExecutorScheduler;
protected static ConfiguredTasks configuredTasks; protected static FileConfiguredTasks configuredTasks;
/** /**
* @return the configuredTasks * @return the configuredTasks
*/ */
public static ConfiguredTasks getConfiguredTasks() { public static FileConfiguredTasks getConfiguredTasks() {
return configuredTasks; return configuredTasks;
} }
@ -361,8 +360,8 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
logger.trace("Going to Run Configured Tasks"); logger.trace("Going to Run Configured Tasks");
try { try {
configuredTasks = new ConfiguredTasks(ctx.persistence().location()); configuredTasks = new FileConfiguredTasks(ctx.persistence().location());
List<JSONLaunchParameter> configuredTaskList = configuredTasks.getConfiguredTasks(); List<LaunchParameter> configuredTaskList = configuredTasks.getConfiguredTasks();
SmartExecutorImpl smartExecutorImpl = new SmartExecutorImpl(); SmartExecutorImpl smartExecutorImpl = new SmartExecutorImpl();
for(LaunchParameter parameter : configuredTaskList){ for(LaunchParameter parameter : configuredTaskList){

View File

@ -0,0 +1,23 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import java.util.List;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface LaunchConfiguration {
public List<LaunchParameter> getScheduledLaunch() throws SchedulePersistenceException;
public void addLaunch(LaunchParameter parameter) throws SchedulePersistenceException;
public void removeLaunch(LaunchParameter parameter)throws SchedulePersistenceException;
}

View File

@ -1,7 +1,7 @@
/** /**
* *
*/ */
package org.gcube.vremanagement.executor.configuration; package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -12,7 +12,10 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.LaunchConfiguration;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.utils.IOUtility; import org.gcube.vremanagement.executor.utils.IOUtility;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
@ -24,22 +27,23 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* *
*/ */
public class ConfiguredTasks { public class FileConfiguredTasks implements LaunchConfiguration {
/** /**
* Logger * Logger
*/ */
private static Logger logger = LoggerFactory.getLogger(ConfiguredTasks.class); private static Logger logger = LoggerFactory.getLogger(FileConfiguredTasks.class);
protected String configurationFileLocation; protected String configurationFileLocation;
protected List<JSONLaunchParameter> configuredTasks; protected List<LaunchParameter> configuredTasks;
public static final String CONFIG_TASK_FILENAME = "definedTasks.json"; public static final String CONFIG_TASK_FILENAME = "definedTasks.json";
public ConfiguredTasks(String location) throws IOException, JSONException { public FileConfiguredTasks(String location) throws IOException, JSONException {
this.configurationFileLocation = location; this.configurationFileLocation = location;
this.configuredTasks = new ArrayList<JSONLaunchParameter>(); this.configuredTasks = new ArrayList<LaunchParameter>();
this.configuredTasks = retriveConfiguredTask(); this.configuredTasks = retriveConfiguredTask();
} }
@ -53,11 +57,11 @@ public class ConfiguredTasks {
return configurationFileLocation + "/" + CONFIG_TASK_FILENAME; return configurationFileLocation + "/" + CONFIG_TASK_FILENAME;
} }
protected List<JSONLaunchParameter> retriveConfiguredTask() public List<LaunchParameter> retriveConfiguredTask()
throws IOException, JSONException { throws IOException, JSONException {
String configuredTasksDefinition = IOUtility.readFile(configurationFileName(configurationFileLocation)); String configuredTasksDefinition = IOUtility.readFile(configurationFileName(configurationFileLocation));
List<JSONLaunchParameter> tasks = new ArrayList<JSONLaunchParameter>(); List<LaunchParameter> tasks = new ArrayList<LaunchParameter>();
JSONArray jsonArray = new JSONArray(configuredTasksDefinition); JSONArray jsonArray = new JSONArray(configuredTasksDefinition);
for(int i=0; i<jsonArray.length(); i++){ for(int i=0; i<jsonArray.length(); i++){
@ -80,11 +84,12 @@ public class ConfiguredTasks {
writer.close(); writer.close();
} }
protected void writeOnConfigurationFile() throws JSONException, IOException{ protected void writeOnConfigurationFile() throws JSONException, IOException, ParseException{
String fileName = configurationFileName(configurationFileLocation); String fileName = configurationFileName(configurationFileLocation);
JSONArray jsonArray = new JSONArray(); JSONArray jsonArray = new JSONArray();
for(JSONLaunchParameter jsonLaunchParameter : configuredTasks){ for(LaunchParameter launchParameter : configuredTasks){
JSONLaunchParameter jsonLaunchParameter = new JSONLaunchParameter(launchParameter);
jsonArray.put(jsonLaunchParameter.toJSON()); jsonArray.put(jsonLaunchParameter.toJSON());
} }
String jsonArrayString = jsonArray.toString(); String jsonArrayString = jsonArray.toString();
@ -92,33 +97,58 @@ public class ConfiguredTasks {
FileUtils.writeStringToFile(new File(fileName), jsonArrayString); FileUtils.writeStringToFile(new File(fileName), jsonArrayString);
} }
public synchronized void addLaunch(JSONLaunchParameter parameter) @Override
throws ParseException, JSONException, IOException { public synchronized void addLaunch(LaunchParameter parameter) throws SchedulePersistenceException{
configuredTasks.add(parameter); try {
writeOnConfigurationFile(); addLaunch(new JSONLaunchParameter(parameter));
} catch (ParseException e) {
throw new SchedulePersistenceException(e.getCause());
}
} }
public synchronized void addLaunch(JSONLaunchParameter parameter) throws SchedulePersistenceException {
configuredTasks.add(parameter);
try {
writeOnConfigurationFile();
} catch (JSONException | IOException | ParseException e) {
throw new SchedulePersistenceException();
}
}
public synchronized void removeLaunch(JSONLaunchParameter parameter) @Override
public void removeLaunch(LaunchParameter parameter)
throws SchedulePersistenceException {
try {
removeLaunch(new JSONLaunchParameter(parameter));
} catch (JSONException | IOException | ParseException e) {
throw new SchedulePersistenceException(e.getCause());
}
}
protected synchronized void removeLaunch(JSONLaunchParameter parameter)
throws ParseException, JSONException, IOException { throws ParseException, JSONException, IOException {
configuredTasks.remove(parameter); configuredTasks.remove(parameter);
writeOnConfigurationFile(); writeOnConfigurationFile();
} }
@Override
public List<LaunchParameter> getScheduledLaunch()
throws SchedulePersistenceException {
return null;
}
/** /**
* @return the configuredTasks * @return the configuredTasks
*/ */
public List<JSONLaunchParameter> getConfiguredTasks() { public List<LaunchParameter> getConfiguredTasks() {
return configuredTasks; return configuredTasks;
} }
/** /**
* @param configuredTasks the configuredTasks to set * @param configuredTasks the configuredTasks to set
*/ */
public void setConfiguredTasks(List<JSONLaunchParameter> configuredTasks) { public void setConfiguredTasks(List<LaunchParameter> configuredTasks) {
this.configuredTasks = configuredTasks; this.configuredTasks = configuredTasks;
} }
} }

View File

@ -1,7 +1,7 @@
/** /**
* *
*/ */
package org.gcube.vremanagement.executor.configuration; package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap; import java.util.HashMap;

View File

@ -1,7 +1,7 @@
/** /**
* *
*/ */
package org.gcube.vremanagement.executor.configuration; package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.text.ParseException; import java.text.ParseException;

View File

@ -0,0 +1,32 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class SchedulePersistenceException extends Exception {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -3261726979079756047L;
public SchedulePersistenceException() {
super();
}
public SchedulePersistenceException(String message) {
super(message);
}
public SchedulePersistenceException(Throwable throwable){
super(throwable);
}
public SchedulePersistenceException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -10,11 +10,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.gcube.vremanagement.executor.SmartExecutorInitalizator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException; import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.quartz.CronScheduleBuilder; import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder; import org.quartz.JobBuilder;
@ -97,8 +99,8 @@ public class SmartExecutorScheduler {
} }
protected Scheduler reallySchedule(final UUID uuid, Scheduler scheduler, LaunchParameter parameter) throws LaunchException, protected Scheduler reallySchedule(final UUID uuid, Scheduler scheduler,
InputsNullException, PluginNotFoundException { LaunchParameter parameter) throws LaunchException, InputsNullException, PluginNotFoundException {
JobKey jobKey = new JobKey(uuid.toString()); JobKey jobKey = new JobKey(uuid.toString());
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorJob.class). JobDetail jobDetail = JobBuilder.newJob(SmartExecutorJob.class).
@ -130,12 +132,16 @@ public class SmartExecutorScheduler {
triggerBuilder.endAt(triggerEndTime); triggerBuilder.endAt(triggerEndTime);
} }
try {
SmartExecutorInitalizator.getConfiguredTasks().addLaunch(parameter);
} catch (SchedulePersistenceException e) {
logger.error("Unable to persist the scheduling", e.getCause());
}
} else { } else {
triggerBuilder.startNow(); triggerBuilder.startNow();
} }
try { try {
SmartExecutorJobListener sejl = new SmartExecutorJobListener(); SmartExecutorJobListener sejl = new SmartExecutorJobListener();
scheduler.getListenerManager().addJobListener(sejl); scheduler.getListenerManager().addJobListener(sejl);

View File

@ -12,7 +12,11 @@ import java.util.Map;
import org.acme.HelloWorldPlugin; import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration; import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.jsonbased.FileConfiguredTasks;
import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.json.JSONException; import org.json.JSONException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -21,15 +25,15 @@ import org.junit.Test;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* *
*/ */
public class ParserTest { public class ConfiguredTasksTest {
public static final String TEST = "test"; public static final String TEST = "test";
public void checkOriginal(ConfiguredTasks parser, int size){ public void checkOriginal(FileConfiguredTasks parser, int size){
List<JSONLaunchParameter> configuredTasks = parser.getConfiguredTasks(); List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
Assert.assertEquals(size, configuredTasks.size()); Assert.assertEquals(size, configuredTasks.size());
JSONLaunchParameter parameter = configuredTasks.get(0); JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(0);
Assert.assertEquals(HelloWorldPluginDeclaration.NAME, parameter.getPluginName()); Assert.assertEquals(HelloWorldPluginDeclaration.NAME, parameter.getPluginName());
Map<String, Object> inputs = parameter.getInputs(); Map<String, Object> inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
@ -37,7 +41,7 @@ public class ParserTest {
Assert.assertEquals(null, parameter.getScheduling()); Assert.assertEquals(null, parameter.getScheduling());
Assert.assertEquals(true, parameter.isPersist()); Assert.assertEquals(true, parameter.isPersist());
parameter = configuredTasks.get(1); parameter = (JSONLaunchParameter) configuredTasks.get(1);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs(); inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
@ -51,7 +55,7 @@ public class ParserTest {
Assert.assertEquals(false, scheduling.mustPreviousExecutionsCompleted()); Assert.assertEquals(false, scheduling.mustPreviousExecutionsCompleted());
Assert.assertEquals(true, parameter.isPersist()); Assert.assertEquals(true, parameter.isPersist());
parameter = configuredTasks.get(2); parameter = (JSONLaunchParameter) configuredTasks.get(2);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs(); inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
@ -63,10 +67,10 @@ public class ParserTest {
@Test @Test
public void testLaunchConfiguredTask() throws IOException, JSONException, ParseException { public void testLaunchConfiguredTask() throws SchedulePersistenceException, IOException, JSONException, ParseException {
String location = new File(".").getAbsolutePath(); String location = new File(".").getAbsolutePath();
location = location + "/src/test/resources"; location = location + "/src/test/resources";
ConfiguredTasks parser = new ConfiguredTasks(location); FileConfiguredTasks parser = new FileConfiguredTasks(location);
checkOriginal(parser, 3); checkOriginal(parser, 3);
@ -76,11 +80,11 @@ public class ParserTest {
JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs, true); JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs, true);
parser.addLaunch(added); parser.addLaunch(added);
parser = new ConfiguredTasks(location); parser = new FileConfiguredTasks(location);
checkOriginal(parser, 4); checkOriginal(parser, 4);
List<JSONLaunchParameter> configuredTasks = parser.getConfiguredTasks(); List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
JSONLaunchParameter parameter = configuredTasks.get(3); JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(3);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs(); inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
@ -90,7 +94,7 @@ public class ParserTest {
parser.removeLaunch(parameter); parser.removeLaunch(parameter);
parser = new ConfiguredTasks(location); parser = new FileConfiguredTasks(location);
checkOriginal(parser, 3); checkOriginal(parser, 3);
} }