refs #521: Support Unscheduling of repetitive task on SmartExecutor
https://support.d4science.org/issues/521 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@120469 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
14a346e9c7
commit
81bea923f1
|
@ -5,9 +5,6 @@
|
|||
<wb-resource deploy-path="/" source-path="/src/main/webapp" tag="defaultRootSource"/>
|
||||
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/java"/>
|
||||
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/resources"/>
|
||||
<dependent-module archiveName="smart-executor-api-1.2.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-api/smart-executor-api">
|
||||
<dependency-type>uses</dependency-type>
|
||||
</dependent-module>
|
||||
<property name="java-output-path" value="/smart-executor/target/classes"/>
|
||||
<property name="context-root" value="smart-executor"/>
|
||||
</wb-module>
|
||||
|
|
|
@ -47,7 +47,6 @@ public class SmartExecutorImpl implements SmartExecutor {
|
|||
return ctx;
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public String launch(LaunchParameter parameter) throws InputsNullException,
|
||||
|
@ -59,26 +58,43 @@ public class SmartExecutorImpl implements SmartExecutor {
|
|||
|
||||
return uuid.toString();
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public boolean stop(String executionIdentifier) throws ExecutorException {
|
||||
return unSchedule(executionIdentifier, true, false);
|
||||
}
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public boolean unSchedule(String executionIdentifier) throws ExecutorException {
|
||||
return unSchedule(executionIdentifier, false, false);
|
||||
}
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public boolean unSchedule(String executionIdentifier, boolean globally)
|
||||
throws ExecutorException {
|
||||
return unSchedule(executionIdentifier, false, globally);
|
||||
}
|
||||
|
||||
// TODO Manage better exception to to advise the caller
|
||||
protected boolean unSchedule(String executionIdentifier, boolean stopOnly, boolean globally) throws ExecutorException {
|
||||
boolean currentStopped = true;
|
||||
try {
|
||||
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
|
||||
UUID uuid = UUID.fromString(executionIdentifier);
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, stopOnly, globally);
|
||||
} catch (SchedulerNotFoundException snfe) {
|
||||
currentStopped = true;
|
||||
logger.error(String.format("Error unscheduling task {}", executionIdentifier), snfe);
|
||||
logger.error("Error unscheduling task {}", executionIdentifier, snfe);
|
||||
} catch(SchedulerException e){
|
||||
currentStopped = false;
|
||||
logger.error(String.format("Error unscheduling task {}", executionIdentifier), e);
|
||||
logger.error("Error unscheduling task {}", executionIdentifier, e);
|
||||
} catch(SchedulePersistenceException ex){
|
||||
currentStopped = true;
|
||||
logger.error("Error removing scheduled task from persistence.", ex);
|
||||
}
|
||||
|
||||
return currentStopped;
|
||||
}
|
||||
|
||||
|
@ -106,29 +122,4 @@ public class SmartExecutorImpl implements SmartExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
/* *{@inheritDoc}* /
|
||||
@Override
|
||||
public PluginState getState(PluginDeclaration pluginDeclaration, String executionIdentifier)
|
||||
throws PluginInstanceNotFoundException, ExecutorException {
|
||||
try {
|
||||
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceConnector.getPersistenceConnector();
|
||||
return persistenceConnector.getLastPluginInstanceState(pluginDeclaration, UUID.fromString(executionIdentifier));
|
||||
} catch (Exception e) {
|
||||
throw new PluginInstanceNotFoundException();
|
||||
}
|
||||
}
|
||||
|
||||
/**{@inheritDoc}* /
|
||||
@Override
|
||||
public PluginState getIterationState(PluginDeclaration pluginDeclaration, String executionIdentifier, int iterationNumber)
|
||||
throws PluginInstanceNotFoundException, ExecutorException {
|
||||
try {
|
||||
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceConnector.getPersistenceConnector();
|
||||
return persistenceConnector.getPluginInstanceState(pluginDeclaration, UUID.fromString(executionIdentifier), iterationNumber);
|
||||
} catch (Exception e) {
|
||||
throw new PluginInstanceNotFoundException();
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
|
|
@ -11,13 +11,15 @@ import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
|||
|
||||
/**
|
||||
* Every implementation MUST take in account to store/query the records
|
||||
* on the current scope which is not passed as argument but MUSt be retrieved
|
||||
* on the current scope which is not passed as argument but MUST be retrieved
|
||||
* using {#org.gcube.common.scope.api.ScopeProvider} facilities
|
||||
* i.e. ScopeProvider.instance.get()
|
||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
*/
|
||||
public interface ScheduledTaskConfiguration {
|
||||
|
||||
public static final String SCOPE = "scope";
|
||||
|
||||
/**
|
||||
* Retrieve from the #SmartExecutorPersistenceConnector the orphaned
|
||||
* Scheduled tasks
|
||||
|
@ -26,29 +28,43 @@ public interface ScheduledTaskConfiguration {
|
|||
*/
|
||||
public List<LaunchParameter> getAvailableScheduledTasks() throws SchedulePersistenceException;
|
||||
|
||||
|
||||
/**
|
||||
* @param uuid the uuid which identify the task on the SmartExecutor instance
|
||||
* Return the Scheduled Task if any, null otherwise
|
||||
* @param uuid which identify the Scheduled Task
|
||||
* @return LaunchParameter of the Scheduled task if any, null otherwise
|
||||
* @throws SchedulePersistenceException if fails
|
||||
*/
|
||||
public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException;
|
||||
|
||||
|
||||
/**
|
||||
* Create a Scheduled Task on persistence
|
||||
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
|
||||
* @param parameter
|
||||
* @throws SchedulePersistenceException
|
||||
* @throws SchedulePersistenceException if fails
|
||||
*/
|
||||
public void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter) throws SchedulePersistenceException;
|
||||
|
||||
/**
|
||||
* @param uuid
|
||||
* @throws SchedulePersistenceException
|
||||
* Reserve an orphan Scheduled tasks
|
||||
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
|
||||
* @throws SchedulePersistenceException if fails
|
||||
*/
|
||||
public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException;
|
||||
|
||||
/**
|
||||
* @param uuid the uuid which identify the task on the SmartExecutor instance
|
||||
* Remove from persistence the Scheduled Task.
|
||||
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
|
||||
* @param parameter
|
||||
* @throws SchedulePersistenceException
|
||||
*/
|
||||
public void removeScheduledTask(UUID uuid)throws SchedulePersistenceException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param uuid
|
||||
* Release the Scheduled Task leaving it as orphan on persistence
|
||||
* @param uuid the uuid which (will) identify the task on the SmartExecutor
|
||||
* instance
|
||||
* @throws SchedulePersistenceException
|
||||
*/
|
||||
public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException;
|
||||
|
|
|
@ -164,7 +164,6 @@ public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguratio
|
|||
public void reserveScheduledTask(UUID uuid, String consumerID)
|
||||
throws SchedulePersistenceException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -172,7 +171,6 @@ public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguratio
|
|||
public void removeScheduledTask(UUID uuid)
|
||||
throws SchedulePersistenceException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -180,7 +178,16 @@ public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguratio
|
|||
public void releaseScheduledTask(UUID uuid)
|
||||
throws SchedulePersistenceException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration#getScheduledTask(java.util.UUID)
|
||||
*/
|
||||
@Override
|
||||
public LaunchParameter getScheduledTask(UUID uuid)
|
||||
throws SchedulePersistenceException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -14,6 +14,8 @@ import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
|
|||
import org.json.JSONArray;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
|
@ -21,14 +23,12 @@ import org.json.JSONObject;
|
|||
*/
|
||||
public class JSONLaunchParameter extends LaunchParameter {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(JSONLaunchParameter.class);
|
||||
|
||||
public static final String PLUGIN_NAME = "pluginName";
|
||||
public static final String PLUGIN_CAPABILITIES = "pluginCapabilites";
|
||||
|
||||
public static final String INPUTS = "inputs";
|
||||
|
||||
public static final String SCHEDULING = "scheduling";
|
||||
public static final String PERSIST = "persist";
|
||||
|
||||
public static final String USED_BY = "usedBy";
|
||||
public static final String SCOPE = "SCOPE";
|
||||
|
||||
|
@ -48,20 +48,26 @@ public class JSONLaunchParameter extends LaunchParameter {
|
|||
|
||||
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs) {
|
||||
super(pluginName, pluginCapabilities, inputs);
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
}
|
||||
|
||||
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
||||
super(pluginName, inputs, scheduling);
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
}
|
||||
|
||||
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
||||
super(pluginName, pluginCapabilities, inputs, scheduling);
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling, boolean persist) throws ParseException {
|
||||
super(pluginName, inputs, scheduling, persist);
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling, boolean persist) throws ParseException {
|
||||
super(pluginName, pluginCapabilities, inputs, scheduling, persist);
|
||||
this.scheduling = new JSONScheduling(scheduling);
|
||||
|
@ -69,7 +75,7 @@ public class JSONLaunchParameter extends LaunchParameter {
|
|||
}
|
||||
|
||||
public JSONLaunchParameter(LaunchParameter parameter) throws ParseException {
|
||||
super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling(), parameter.isPersist());
|
||||
super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling());
|
||||
this.scheduling = new JSONScheduling(parameter.getScheduling());
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
}
|
||||
|
@ -103,17 +109,19 @@ public class JSONLaunchParameter extends LaunchParameter {
|
|||
this.scheduling = new JSONScheduling(schedulingJsonObject);
|
||||
}
|
||||
|
||||
this.persist = true;
|
||||
if(jsonObject.has(PERSIST)){
|
||||
this.persist = jsonObject.getBoolean(PERSIST);
|
||||
}
|
||||
|
||||
if(jsonObject.has(USED_BY)){
|
||||
this.usedBy = jsonObject.getString(USED_BY);
|
||||
}
|
||||
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
if(jsonObject.has(SCOPE)){
|
||||
this.scope = jsonObject.getString(SCOPE);
|
||||
String jsonScope = jsonObject.getString(SCOPE);
|
||||
if(jsonScope.compareTo(scope)!=0){
|
||||
String message = String.format("The current scope %s differs from the one provide in %s provided as argument %s.",
|
||||
scope, JSONObject.class.getSimpleName(), jsonScope);
|
||||
logger.error(message);
|
||||
throw new ScopeNotMatchException(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -154,13 +162,51 @@ public class JSONLaunchParameter extends LaunchParameter {
|
|||
obj.put(SCHEDULING, getScheduling().toJSON());
|
||||
}
|
||||
|
||||
obj.put(PERSIST, true);
|
||||
|
||||
if(usedBy!=null){
|
||||
obj.put(USED_BY, usedBy);
|
||||
}
|
||||
|
||||
if(scope!=null){
|
||||
obj.put(SCOPE, scope);
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
public String toString(){
|
||||
try {
|
||||
return toJSON().toString();
|
||||
} catch (JSONException e) {
|
||||
return String.format("{} : {}", this.getClass().getSimpleName(),
|
||||
super.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the usedBy
|
||||
*/
|
||||
public String getUsedBy() {
|
||||
return usedBy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param usedBy the usedBy to set
|
||||
*/
|
||||
public void setUsedBy(String usedBy) {
|
||||
this.usedBy = usedBy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the scope
|
||||
*/
|
||||
public String getScope() {
|
||||
return scope;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param scope the scope to set
|
||||
*/
|
||||
public void setScope(String scope) {
|
||||
this.scope = scope;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
*/
|
||||
package org.gcube.vremanagement.executor.configuration.jsonbased;
|
||||
|
||||
import java.security.InvalidParameterException;
|
||||
import java.text.ParseException;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
||||
import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
|
||||
import org.json.JSONException;
|
||||
|
@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
class JSONScheduling extends Scheduling {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(JSONScheduling.class);
|
||||
protected static Logger logger = LoggerFactory.getLogger(JSONScheduling.class);
|
||||
|
||||
public static final String CRON_EXPRESSION = "cronExpression";
|
||||
public static final String DELAY = "delay";
|
||||
|
@ -28,16 +28,22 @@ class JSONScheduling extends Scheduling {
|
|||
public static final String FIRST_START_TIME = "firstStartTime";
|
||||
public static final String END_TIME = "endTime";
|
||||
public static final String PREVIUOS_EXECUTION_MUST_BE_COMPLETED = "previuosExecutionsMustBeCompleted";
|
||||
|
||||
public static final String SCOPE = "scope";
|
||||
protected final String scope;
|
||||
public static final String GLOBAL = "global";
|
||||
|
||||
public JSONScheduling(Scheduling scheduling) throws ParseException {
|
||||
super();
|
||||
init(new CronExpression(scheduling.getCronExpression()), scheduling.getDelay(), scheduling.getSchedulingTimes(),
|
||||
if(scheduling==null){
|
||||
throw new InvalidParameterException("Scheduling null");
|
||||
}
|
||||
CronExpression cronExpression = null;
|
||||
if(scheduling.getCronExpression()!=null){
|
||||
cronExpression = new CronExpression(scheduling.getCronExpression());
|
||||
}
|
||||
|
||||
init(cronExpression, scheduling.getDelay(),
|
||||
scheduling.getSchedulingTimes(),
|
||||
scheduling.getFirtStartTime(), scheduling.getEndTime(),
|
||||
scheduling.mustPreviousExecutionsCompleted());
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
scheduling.mustPreviousExecutionsCompleted(), scheduling.getGlobal());
|
||||
}
|
||||
|
||||
public JSONScheduling(JSONObject jsonObject) throws JSONException,
|
||||
|
@ -66,7 +72,7 @@ class JSONScheduling extends Scheduling {
|
|||
}
|
||||
|
||||
Long endTime = null;
|
||||
if (jsonObject.has(FIRST_START_TIME)) {
|
||||
if (jsonObject.has(END_TIME)) {
|
||||
endTime = jsonObject.getLong(END_TIME);
|
||||
}
|
||||
|
||||
|
@ -75,23 +81,18 @@ class JSONScheduling extends Scheduling {
|
|||
previuosExecutionsMustBeCompleted = jsonObject
|
||||
.getBoolean(PREVIUOS_EXECUTION_MUST_BE_COMPLETED);
|
||||
}
|
||||
|
||||
init(cronExpression, delay, schedulingTimes.intValue(), firstStartTime,
|
||||
endTime, previuosExecutionsMustBeCompleted);
|
||||
|
||||
this.scope = ScopeProvider.instance.get();
|
||||
if(jsonObject.has(SCOPE)){
|
||||
String jsonScope = jsonObject.getString(SCOPE);
|
||||
if(jsonScope.compareTo(scope)!=0){
|
||||
String message = String.format("The current scope %s differs from the one provide in %s provided as argument %s.",
|
||||
scope, JSONObject.class.getSimpleName(), jsonScope);
|
||||
logger.error(message);
|
||||
throw new ScopeNotMatchException(message);
|
||||
}
|
||||
Boolean global = false;
|
||||
if (jsonObject.has(GLOBAL)) {
|
||||
global = jsonObject
|
||||
.getBoolean(PREVIUOS_EXECUTION_MUST_BE_COMPLETED);
|
||||
}
|
||||
|
||||
}
|
||||
init(cronExpression, delay, schedulingTimes.intValue(), firstStartTime,
|
||||
endTime, previuosExecutionsMustBeCompleted, global);
|
||||
|
||||
}
|
||||
|
||||
public JSONObject toJSON() throws JSONException {
|
||||
JSONObject obj = new JSONObject();
|
||||
if (this.cronExpression != null) {
|
||||
|
@ -114,7 +115,7 @@ class JSONScheduling extends Scheduling {
|
|||
|
||||
obj.put(PREVIUOS_EXECUTION_MUST_BE_COMPLETED, this.previuosExecutionsMustBeCompleted);
|
||||
|
||||
obj.put(SCOPE, scope);
|
||||
obj.put(GLOBAL, global==null ? false : global.booleanValue());
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
|
|
@ -25,30 +25,6 @@ public abstract class SmartExecutorPersistenceConnector implements PluginStateNo
|
|||
*/
|
||||
public abstract void close() throws Exception;
|
||||
|
||||
/* *
|
||||
* Retrieve the status of the iterationNumber (passed as parameter) of a
|
||||
* running/run plugin (using {@link PluginDeclaration}) which is/was
|
||||
* identified by the UUID passed as parameter.
|
||||
* @param pluginDeclaration The Plugin Declaration class
|
||||
* @param uuid the execution identifier of the running/run {@link Plugin}
|
||||
* @param iterationNumber the iteration number
|
||||
* @return the actual/last {@link PluginState} of the Plugin
|
||||
* @throws Exception if fails
|
||||
* /
|
||||
public abstract PluginState getPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid, int iterationNumber)
|
||||
throws Exception;
|
||||
/* *
|
||||
* Retrieve the status of the iterationNumber of the last running/run {@link Plugin} which is/was identified
|
||||
* by the UUID passed as parameter
|
||||
* @param pluginDeclaration The Plugin Declaration class
|
||||
* @param uuid the execution identifier of the running/run {@link Plugin}
|
||||
* @return the actual/last {@link PluginState} of the Plugin
|
||||
* @throws Exception if fails
|
||||
* /
|
||||
public abstract PluginState getLastPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid)
|
||||
throws Exception;
|
||||
*/
|
||||
|
||||
/**
|
||||
* Retrieve the status of the iterationNumber (passed as parameter) of a running/run {@link Plugin} which is/was identified
|
||||
* by the UUID passed as parameter
|
||||
|
|
|
@ -25,11 +25,7 @@ public abstract class SmartExecutorPersistenceFactory {
|
|||
persistenceConnectors = new HashMap<String, SmartExecutorPersistenceConnector>();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the persistenceConnector
|
||||
*/
|
||||
public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception {
|
||||
String scope = ScopeProvider.instance.get();
|
||||
private static SmartExecutorPersistenceConnector getPersistenceConnector(String scope){
|
||||
if(scope==null){
|
||||
String error = "No Scope available.";
|
||||
logger.error(error);
|
||||
|
@ -39,7 +35,15 @@ public abstract class SmartExecutorPersistenceFactory {
|
|||
logger.trace("Retrieving {} for scope {}",
|
||||
SmartExecutorPersistenceConnector.class.getSimpleName(), scope);
|
||||
|
||||
SmartExecutorPersistenceConnector persistence = persistenceConnectors.get(scope);
|
||||
return persistenceConnectors.get(scope);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the persistenceConnector
|
||||
*/
|
||||
public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception {
|
||||
String scope = ScopeProvider.instance.get();
|
||||
SmartExecutorPersistenceConnector persistence = getPersistenceConnector(scope);
|
||||
|
||||
if(persistence==null){
|
||||
logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.",
|
||||
|
@ -53,5 +57,14 @@ public abstract class SmartExecutorPersistenceFactory {
|
|||
|
||||
return persistence;
|
||||
}
|
||||
|
||||
public static synchronized void closePersistenceConnector() throws Exception {
|
||||
String scope = ScopeProvider.instance.get();
|
||||
SmartExecutorPersistenceConnector persistence = getPersistenceConnector(scope);
|
||||
if(persistence!=null){
|
||||
persistence.close();
|
||||
persistenceConnectors.remove(scope);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.codehaus.jackson.node.ArrayNode;
|
|||
import org.codehaus.jackson.node.ObjectNode;
|
||||
import org.ektorp.CouchDbConnector;
|
||||
import org.ektorp.CouchDbInstance;
|
||||
import org.ektorp.DocumentNotFoundException;
|
||||
import org.ektorp.UpdateConflictException;
|
||||
import org.ektorp.ViewQuery;
|
||||
import org.ektorp.ViewResult;
|
||||
|
@ -104,7 +105,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
return obj;
|
||||
}
|
||||
|
||||
protected void createItem(String id, JSONObject obj) throws Exception {
|
||||
protected void createItem(JSONObject obj, String id) throws Exception {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
JsonNode node = mapper.readTree(obj.toString());
|
||||
createItem(node, id);
|
||||
|
@ -185,6 +186,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
protected static final String ORPHAN_VIEW = "orphan";
|
||||
|
||||
protected static final String USED_BY_FIELD = "usedBy";
|
||||
protected static final String STOPPED = "stopped";
|
||||
|
||||
protected static final String RESERVED_BY = "reservedBy";
|
||||
protected static final String PREVIOUSLY_USED_BY = "previouslyUsedBy";
|
||||
|
@ -271,6 +273,16 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT));
|
||||
query = query.viewName(ORPHAN_VIEW);
|
||||
|
||||
String scope = ScopeProvider.instance.get();
|
||||
ArrayNode startKey = new ObjectMapper().createArrayNode();
|
||||
startKey.add(scope);
|
||||
ArrayNode endKey = new ObjectMapper().createArrayNode();
|
||||
endKey.add(scope);
|
||||
endKey.add("{}");
|
||||
query.startKey(startKey);
|
||||
query.endKey(endKey);
|
||||
|
||||
|
||||
List<LaunchParameter> ret = new ArrayList<LaunchParameter>();
|
||||
|
||||
ViewResult viewResult = query(query);
|
||||
|
@ -292,6 +304,8 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
|
@ -302,8 +316,11 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
JSONObject obj = jlp.toJSON();
|
||||
obj.append(TYPE_JSON_FIELD, SCHEDULED_TASK_TYPE);
|
||||
obj.append(USED_BY_FIELD, consumerID);
|
||||
createItem(null, obj);
|
||||
obj.append(ScheduledTaskConfiguration.SCOPE, ScopeProvider.instance.get());
|
||||
createItem(obj, uuid.toString());
|
||||
} catch (Exception e) {
|
||||
logger.error("Error Adding Scheduled Task UUID : {}, Consumer : {}, LaunchParameter : {}",
|
||||
uuid, consumerID, parameter, e);
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
@ -313,6 +330,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException {
|
||||
try {
|
||||
JSONObject obj = getObjectByID(uuid.toString());
|
||||
// TODO change it
|
||||
String previousConsumerID = obj.getString(USED_BY_FIELD);
|
||||
obj.put(PREVIOUSLY_USED_BY, previousConsumerID);
|
||||
obj.remove(USED_BY_FIELD);
|
||||
|
@ -320,7 +338,9 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
obj.put(RESERVATION_TIMESTAMP, Calendar.getInstance().getTimeInMillis());
|
||||
updateItem(obj);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error Releasing Scheduled Task", e.getCause());
|
||||
logger.error("Error Reserving Scheduled Task UUID : {} Consumer : {}",
|
||||
uuid, consumerID, e);
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -329,8 +349,12 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
@Override
|
||||
public void removeScheduledTask(UUID uuid) throws SchedulePersistenceException {
|
||||
try {
|
||||
deleteItem(uuid.toString(), null);
|
||||
JSONObject obj = getObjectByID(uuid.toString());
|
||||
obj.remove(USED_BY_FIELD);
|
||||
obj.put(STOPPED, true);
|
||||
updateItem(obj);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error Removing Scheduled Task UUID : {}", uuid, e);
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
@ -343,7 +367,21 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
|||
obj.remove(USED_BY_FIELD);
|
||||
updateItem(obj);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error Releasing Scheduled Task", e.getCause());
|
||||
logger.error("Error Releasing Scheduled Task UUID : {}", uuid, e);
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException {
|
||||
try {
|
||||
JSONObject jsonObject = getObjectByID(uuid.toString());
|
||||
return new JSONLaunchParameter(jsonObject);
|
||||
} catch (DocumentNotFoundException e) {
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,13 +38,14 @@ public class PluginManager {
|
|||
*/
|
||||
private Map<String, PluginDeclaration> availablePlugins;
|
||||
|
||||
|
||||
public static Plugin<? extends PluginDeclaration> instantiatePlugin(
|
||||
String pluginName) throws InputsNullException,
|
||||
PluginNotFoundException {
|
||||
|
||||
// Retrieve the PluginDeclaration class representing the plugin which
|
||||
// have the name provided as input
|
||||
/**
|
||||
* Retrieve the PluginDeclaration class representing the plugin which
|
||||
* have the name provided as input
|
||||
* @param pluginName the name of the plugin
|
||||
* @return the PluginDeclaration
|
||||
* @throws PluginNotFoundException if the plugin is not available
|
||||
*/
|
||||
public static PluginDeclaration getPluginDeclaration(String pluginName) throws PluginNotFoundException {
|
||||
logger.debug(String.format("Trying to instantiate a Plugin named %s",
|
||||
pluginName));
|
||||
PluginDeclaration pluginDeclaration = PluginManager.getInstance()
|
||||
|
@ -52,6 +53,14 @@ public class PluginManager {
|
|||
if (pluginDeclaration == null) {
|
||||
throw new PluginNotFoundException();
|
||||
}
|
||||
return pluginDeclaration;
|
||||
}
|
||||
|
||||
public static Plugin<? extends PluginDeclaration> instantiatePlugin(
|
||||
String pluginName) throws InputsNullException,
|
||||
PluginNotFoundException {
|
||||
|
||||
PluginDeclaration pluginDeclaration = getPluginDeclaration(pluginName);
|
||||
|
||||
// Retrieving the plugin instance class to be run from PluginDeclaration
|
||||
Class<? extends Plugin<? extends PluginDeclaration>> plugin = pluginDeclaration
|
||||
|
|
|
@ -22,6 +22,9 @@ import org.gcube.vremanagement.executor.exception.LaunchException;
|
|||
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
|
||||
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
||||
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
|
||||
import org.gcube.vremanagement.executor.exception.SchedulerRemoveException;
|
||||
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
|
||||
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
import org.quartz.JobBuilder;
|
||||
import org.quartz.JobDataMap;
|
||||
|
@ -82,7 +85,6 @@ public class SmartExecutorScheduler {
|
|||
.cronSchedule(scheduling.getCronExpression());
|
||||
|
||||
return createTriggerBuilder(uuid, cronScheduleBuilder);
|
||||
// TODO Limit number of times
|
||||
}
|
||||
|
||||
if (scheduling.getDelay() != null) {
|
||||
|
@ -103,16 +105,26 @@ public class SmartExecutorScheduler {
|
|||
|
||||
}
|
||||
|
||||
protected Scheduler reallySchedule(final UUID uuid, Scheduler scheduler,
|
||||
LaunchParameter parameter) throws LaunchException, InputsNullException, PluginNotFoundException {
|
||||
/**
|
||||
* Create the Scheduler using the strategy provided by LaunchParameter
|
||||
* @param uuid the UUID will be used to identify the task
|
||||
* @param parameter LaunchParameter requested in service invocation
|
||||
* @return the created scheduler
|
||||
* @throws LaunchException if the LaunchParameter does not contains a valid
|
||||
* scheduling strategy
|
||||
* @throws SchedulerException if the scheduler cannot be created by the
|
||||
* scheduler factory
|
||||
*/
|
||||
protected Scheduler reallySchedule(final UUID uuid, LaunchParameter parameter) throws LaunchException, SchedulerException {
|
||||
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
|
||||
Scheduler scheduler = schedulerFactory.getScheduler();
|
||||
|
||||
JobKey jobKey = new JobKey(uuid.toString());
|
||||
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorJob.class).
|
||||
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorTask.class).
|
||||
withIdentity(jobKey).build();
|
||||
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
||||
jobDataMap.put(SmartExecutorJob.UUID, uuid);
|
||||
jobDataMap.put(SmartExecutorJob.LAUNCH_PARAMETER, parameter);
|
||||
|
||||
jobDataMap.put(SmartExecutorTask.UUID, uuid);
|
||||
jobDataMap.put(SmartExecutorTask.LAUNCH_PARAMETER, parameter);
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger()
|
||||
|
@ -138,11 +150,13 @@ public class SmartExecutorScheduler {
|
|||
}
|
||||
|
||||
try {
|
||||
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
||||
String runningInstanceID = SmartExecutorImpl.getCtx().profile(GCoreEndpoint.class).id();
|
||||
logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ",
|
||||
uuid.toString(), runningInstanceID, parameter);
|
||||
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
||||
stc.addScheduledTask(uuid, runningInstanceID, parameter);
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to persist the scheduling", e.getCause());
|
||||
logger.error("Unable to persist Scheduled Task {}", uuid.toString(), e.getCause());
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -150,7 +164,7 @@ public class SmartExecutorScheduler {
|
|||
}
|
||||
|
||||
try {
|
||||
SmartExecutorJobListener sejl = new SmartExecutorJobListener();
|
||||
SmartExecutorTaskListener sejl = new SmartExecutorTaskListener();
|
||||
scheduler.getListenerManager().addJobListener(sejl);
|
||||
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
|
||||
} catch (SchedulerException e) {
|
||||
|
@ -160,20 +174,33 @@ public class SmartExecutorScheduler {
|
|||
return scheduler;
|
||||
}
|
||||
|
||||
public synchronized UUID schedule(LaunchParameter parameter) throws LaunchException,
|
||||
InputsNullException, PluginNotFoundException {
|
||||
/**
|
||||
* Schedule a task execution
|
||||
* @param parameter LaunchParameter requested in service invocation
|
||||
* @return the UUID which will identify the task
|
||||
* @throws LaunchException if the LaunchParameter does not contains a valid
|
||||
* scheduling strategy
|
||||
* @throws InputsNullException if provided input map is null
|
||||
* @throws PluginNotFoundException if the request plugin is not available on
|
||||
* this smart executor instance
|
||||
*/
|
||||
public synchronized UUID schedule(LaunchParameter parameter)
|
||||
throws InputsNullException, PluginNotFoundException, LaunchException {
|
||||
Map<String, Object> inputs = parameter.getInputs();
|
||||
if (inputs == null) {
|
||||
throw new RuntimeException();
|
||||
throw new InputsNullException();
|
||||
}
|
||||
|
||||
/*
|
||||
* Checking if the requested plugin is available on this smart executor
|
||||
* instance
|
||||
*/
|
||||
PluginManager.getPluginDeclaration(parameter.getPluginName());
|
||||
|
||||
final UUID uuid = UUID.randomUUID();
|
||||
|
||||
Scheduler scheduler;
|
||||
try {
|
||||
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
|
||||
scheduler = schedulerFactory.getScheduler();
|
||||
reallySchedule(uuid, scheduler, parameter);
|
||||
Scheduler scheduler = reallySchedule(uuid, parameter);
|
||||
activeSchedulers.put(uuid, scheduler);
|
||||
scheduler.start();
|
||||
} catch (SchedulerException e) {
|
||||
|
@ -187,68 +214,166 @@ public class SmartExecutorScheduler {
|
|||
return activeSchedulers.get(key);
|
||||
}
|
||||
|
||||
public synchronized void stop(UUID uuid) throws SchedulerException, SchedulePersistenceException {
|
||||
Scheduler scheduler = activeSchedulers.get(uuid);
|
||||
if(scheduler==null){
|
||||
throw new SchedulerNotFoundException("Scheduler Not Found");
|
||||
}
|
||||
protected void stopLastcurrentExecution(Scheduler scheduler, UUID uuid)
|
||||
throws UnableToInterruptTaskException{
|
||||
|
||||
JobKey jobKey = new JobKey(uuid.toString());
|
||||
boolean exist = scheduler.checkExists(jobKey);
|
||||
if(!exist){
|
||||
logger.trace("Job {} does not exist. Terminating the stop method", uuid);
|
||||
return;
|
||||
}else{
|
||||
logger.trace("Job {} exist", uuid);
|
||||
}
|
||||
|
||||
try {
|
||||
logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid);
|
||||
if(!scheduler.checkExists(jobKey)){
|
||||
logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid);
|
||||
throw new SchedulerNotFoundException("Scheduler Not Found");
|
||||
}
|
||||
boolean interrupted = scheduler.interrupt(jobKey);
|
||||
if (interrupted) {
|
||||
logger.debug("SmartExecutor Task {} interrupted successfully.", uuid);
|
||||
} else {
|
||||
logger.debug("SmartExecutor Task {} was not interrupted.", uuid);
|
||||
throw new UnableToInterruptTaskException(uuid);
|
||||
}
|
||||
} catch (UnableToInterruptTaskException e) {
|
||||
throw e;
|
||||
} catch(Exception e1){
|
||||
throw new UnableToInterruptTaskException(uuid, e1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void deleteScheduler(Scheduler scheduler, UUID uuid) throws SchedulerRemoveException {
|
||||
|
||||
JobKey jobKey = new JobKey(uuid.toString());
|
||||
|
||||
try {
|
||||
logger.debug("Going to delete SmartExecutor Scheduled Task {}", uuid);
|
||||
boolean deleted = scheduler.deleteJob(jobKey);
|
||||
if (deleted) {
|
||||
logger.debug("SmartExecutor Task {} deleted successfully", uuid);
|
||||
} else {
|
||||
logger.debug("SmartExecutor Task {} was not deleted", uuid);
|
||||
throw new SchedulerRemoveException(uuid);
|
||||
}
|
||||
} catch(SchedulerRemoveException e){
|
||||
throw e;
|
||||
} catch(Exception e1){
|
||||
throw new SchedulerRemoveException(uuid, e1);
|
||||
} finally {
|
||||
activeSchedulers.remove(uuid);
|
||||
try {
|
||||
scheduler.clear();
|
||||
} catch(SchedulerException e){
|
||||
throw new SchedulerRemoveException(uuid, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected List<JobExecutionContext> getCurrentlyExecutingJobs(Scheduler scheduler) throws SchedulerException{
|
||||
logger.trace("Getting {} list", JobExecutionContext.class.getSimpleName());
|
||||
List<JobExecutionContext> cej = scheduler.getCurrentlyExecutingJobs();
|
||||
while (cej.isEmpty()){
|
||||
cej = scheduler.getCurrentlyExecutingJobs();
|
||||
}
|
||||
logger.trace("{} list got", JobExecutionContext.class.getSimpleName());
|
||||
logger.trace("{} list got {}", JobExecutionContext.class.getSimpleName(), cej);
|
||||
return cej;
|
||||
}
|
||||
|
||||
public LaunchParameter getLaunchParameter(Scheduler scheduler, JobKey jobKey) throws SchedulerException{
|
||||
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
|
||||
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
||||
return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Stop the execution of the Task identified by UUID
|
||||
* @param uuid which identify the Task
|
||||
* @param stopOnly
|
||||
* @param remove : when the Task is a Scheduled one indicate if the Task
|
||||
* has to be released or to be removed (the argument is set to true when
|
||||
* an explicit request arrive to remove the scheduled task)
|
||||
* @throws UnableToInterruptTaskException
|
||||
* @throws SchedulerRemoveException
|
||||
* @throws SchedulePersistenceException
|
||||
* @throws SchedulerNotFoundException
|
||||
* @throws SchedulerException
|
||||
*/
|
||||
public synchronized void stop(UUID uuid, boolean stopOnly, boolean remove)
|
||||
throws UnableToInterruptTaskException, SchedulerRemoveException,
|
||||
SchedulePersistenceException, SchedulerException {
|
||||
|
||||
Scheduler scheduler = activeSchedulers.get(uuid);
|
||||
if(scheduler==null){
|
||||
logger.debug("No SmartExecutor Task {} was found. That's all folk.", uuid);
|
||||
return;
|
||||
}
|
||||
|
||||
JobKey jobKey = new JobKey(uuid.toString());
|
||||
boolean exist = scheduler.checkExists(jobKey);
|
||||
if(!exist){
|
||||
logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the envoronment. That's all folk.", uuid);
|
||||
activeSchedulers.remove(uuid);
|
||||
return;
|
||||
}else{
|
||||
logger.trace("SmartExecutor Task {} exist", uuid);
|
||||
}
|
||||
|
||||
// TODO Check if this call is needed
|
||||
//getCurrentlyExecutingJobs(scheduler);
|
||||
|
||||
stopLastcurrentExecution(scheduler, uuid);
|
||||
|
||||
LaunchParameter launchParameter = getLaunchParameter(scheduler, jobKey);
|
||||
Scheduling scheduling = launchParameter.getScheduling();
|
||||
boolean scheduled = launchParameter.getScheduling() != null ? true : false;
|
||||
|
||||
|
||||
if(stopOnly){
|
||||
|
||||
|
||||
/*
|
||||
* When the Task was not Scheduled, also the quartz scheduler
|
||||
* must be removed.
|
||||
* If the Task was scheduled, the inputs argument request to stop
|
||||
* only the last running execution, so that the quartz scheduler
|
||||
* must be keep alive to run the next execution.
|
||||
*/
|
||||
if(scheduled){
|
||||
deleteScheduler(scheduler, uuid);
|
||||
}
|
||||
|
||||
|
||||
boolean interrupted = scheduler.interrupt(jobKey);
|
||||
if (interrupted) {
|
||||
logger.debug("Job {} interrupted successfully. Going to delete it.", uuid);
|
||||
} else {
|
||||
logger.debug("Job {} was not interrupted, going to delete it", uuid);
|
||||
logger.debug("The request was only to stop the last execution (if any). That's all folk.");
|
||||
return;
|
||||
}
|
||||
|
||||
boolean deleted = scheduler.deleteJob(jobKey);
|
||||
if (deleted) {
|
||||
logger.debug("Job {} deleted successfully", uuid);
|
||||
} else {
|
||||
logger.debug("Job {} was not deleted", uuid);
|
||||
}
|
||||
|
||||
activeSchedulers.remove(uuid);
|
||||
scheduler.clear();
|
||||
|
||||
|
||||
// TODO check if are
|
||||
boolean remove = true;
|
||||
try {
|
||||
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
||||
if(remove){
|
||||
stc.removeScheduledTask(uuid);
|
||||
}else{
|
||||
stc.releaseScheduledTask(uuid);
|
||||
if(scheduled){
|
||||
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
||||
if(remove){
|
||||
logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid);
|
||||
stc.removeScheduledTask(uuid);
|
||||
}else{
|
||||
if(scheduling.getGlobal()){
|
||||
logger.debug("Going to release the SmartExecutor Scheduled Task {}. The Task can be take in charge from another SmartExecutor instance", uuid);
|
||||
stc.releaseScheduledTask(uuid);
|
||||
}else{
|
||||
logger.debug("Going to remove the SmartExecutor Scheduled Task {} from local scheduling", uuid);
|
||||
stc.removeScheduledTask(uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}catch(Exception e){
|
||||
throw new SchedulePersistenceException(e.getCause());
|
||||
} finally {
|
||||
deleteScheduler(scheduler, uuid);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void stopAll() {
|
||||
List<UUID> set = new ArrayList<UUID>(activeSchedulers.keySet());
|
||||
|
||||
for (UUID uuid : set) {
|
||||
try {
|
||||
stop(uuid);
|
||||
stop(uuid, true, false);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error stopping plugin instace with UUID {}",
|
||||
uuid, e);
|
||||
|
|
|
@ -15,6 +15,8 @@ import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException;
|
|||
import org.gcube.vremanagement.executor.exception.InputsNullException;
|
||||
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
|
||||
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
||||
import org.gcube.vremanagement.executor.exception.SchedulerRemoveException;
|
||||
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
|
||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
|
||||
import org.gcube.vremanagement.executor.plugin.Plugin;
|
||||
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
|
||||
|
@ -35,12 +37,12 @@ import org.slf4j.LoggerFactory;
|
|||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
*
|
||||
*/
|
||||
public class SmartExecutorJob implements InterruptableJob {
|
||||
public class SmartExecutorTask implements InterruptableJob {
|
||||
|
||||
/**
|
||||
* Logger
|
||||
*/
|
||||
private static Logger logger = LoggerFactory.getLogger(SmartExecutorJob.class);
|
||||
private static Logger logger = LoggerFactory.getLogger(SmartExecutorTask.class);
|
||||
|
||||
public static final String UUID = "UUID";
|
||||
public static final String LAUNCH_PARAMETER = "LAUNCH_PARAMETER";
|
||||
|
@ -123,7 +125,7 @@ public class SmartExecutorJob implements InterruptableJob {
|
|||
protected Boolean interrupted;
|
||||
|
||||
|
||||
public SmartExecutorJob() throws Exception {
|
||||
public SmartExecutorTask() throws Exception {
|
||||
this.interrupted = false;
|
||||
this.initialized = false;
|
||||
pluginStateNotifications = new ArrayList<PluginStateNotification>();
|
||||
|
@ -156,7 +158,7 @@ public class SmartExecutorJob implements InterruptableJob {
|
|||
|
||||
synchronized(this){
|
||||
if(interrupted){
|
||||
logger.info("A job interruption has been called before that this {} has been executed for the first time", SmartExecutorJob.class.getSimpleName());
|
||||
logger.info("A job interruption has been called before that this {} has been executed for the first time", SmartExecutorTask.class.getSimpleName());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -177,10 +179,10 @@ public class SmartExecutorJob implements InterruptableJob {
|
|||
executionsCount.put(uuid, executionCount);
|
||||
|
||||
if(isMaxExecutionNumberReached()){
|
||||
logger.debug("The Scheduled Max Number of execution ({}) is reached. The Job {} will be descheduled", maxExecutionNumber, uuid);
|
||||
logger.debug("The Scheduled Max Number of execution ({}) is reached. The SmartExecutor Task {} will be descheduled", maxExecutionNumber, uuid);
|
||||
try {
|
||||
deschedule();
|
||||
} catch (SchedulerException | SchedulePersistenceException e) {
|
||||
deschedule(true);
|
||||
} catch (Exception e) {
|
||||
throw new JobExecutionException(e);
|
||||
}
|
||||
return;
|
||||
|
@ -234,7 +236,7 @@ public class SmartExecutorJob implements InterruptableJob {
|
|||
@Override
|
||||
public synchronized void interrupt() throws UnableToInterruptJobException {
|
||||
if(!initialized){
|
||||
logger.info("{} does not need to be interrupted, because the execute method is not still called.", SmartExecutorJob.class.getSimpleName());
|
||||
logger.info("{} does not need to be interrupted, because the execute method is not still called.", SmartExecutorTask.class.getSimpleName());
|
||||
interrupted = true;
|
||||
return;
|
||||
}
|
||||
|
@ -265,11 +267,12 @@ public class SmartExecutorJob implements InterruptableJob {
|
|||
return false;
|
||||
}
|
||||
|
||||
protected void deschedule() throws SchedulerException, SchedulePersistenceException{
|
||||
SmartExecutorScheduler.getInstance().stop(uuid);
|
||||
protected void deschedule(boolean globally)
|
||||
throws UnableToInterruptTaskException, SchedulerRemoveException,
|
||||
SchedulePersistenceException, SchedulerException {
|
||||
SmartExecutorScheduler.getInstance().stop(uuid, false, globally);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
if(!initialized){
|
|
@ -11,7 +11,7 @@ import org.quartz.JobListener;
|
|||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
*
|
||||
*/
|
||||
public class SmartExecutorJobListener implements JobListener {
|
||||
public class SmartExecutorTaskListener implements JobListener {
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
|
@ -43,7 +43,7 @@ public class SmartExecutorJobListener implements JobListener {
|
|||
@Override
|
||||
public synchronized void jobWasExecuted(JobExecutionContext context,
|
||||
JobExecutionException jobException) {
|
||||
SmartExecutorJob smartExecutorJob = (SmartExecutorJob) context.getJobInstance();
|
||||
SmartExecutorTask smartExecutorJob = (SmartExecutorTask) context.getJobInstance();
|
||||
smartExecutorJob.finished(context);
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package org.gcube.vremanagement.executor.persistence;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
@ -38,7 +39,7 @@ public class SmartExecutorPersistenceConnectorTest {
|
|||
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
|
||||
Assert.assertNotNull(persistenceConnector);
|
||||
Assert.assertEquals(CouchDBPersistenceConnector.class, persistenceConnector.getClass());
|
||||
persistenceConnector.close();
|
||||
SmartExecutorPersistenceFactory.closePersistenceConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -53,11 +54,18 @@ public class SmartExecutorPersistenceConnectorTest {
|
|||
long timestamp = new Date().getTime();
|
||||
PluginStateEvolution pluginStateEvolution = new PluginStateEvolution(uuid, 1, timestamp, HelloWorldPluginDeclaration.class.newInstance(), states[i]);
|
||||
persistenceConnector.pluginStateEvolution(pluginStateEvolution);
|
||||
|
||||
long startTime = Calendar.getInstance().getTimeInMillis();
|
||||
long endTime = startTime;
|
||||
while(endTime <= (startTime + 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
|
||||
PluginState ps = persistenceConnector.getPluginInstanceState(uuid, 1);
|
||||
Assert.assertEquals(states[i], ps);
|
||||
}
|
||||
|
||||
persistenceConnector.close();
|
||||
|
||||
SmartExecutorPersistenceFactory.closePersistenceConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.gcube.vremanagement.executor.pluginmanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -93,10 +94,20 @@ public class RunnablePluginTest {
|
|||
pluginStateNotifications.add(persistenceConnector);
|
||||
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd);
|
||||
RunnablePlugin<HelloWorldPlugin> rp = new RunnablePlugin<HelloWorldPlugin>(helloWorldPlugin, inputs, uuid, 1, pluginStateNotifications);
|
||||
long startTime = Calendar.getInstance().getTimeInMillis();
|
||||
long endTime = startTime;
|
||||
while(endTime <= (startTime + 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
Assert.assertEquals(PluginState.CREATED, persistenceConnector.getPluginInstanceState(uuid, 1));
|
||||
|
||||
rp.run();
|
||||
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
while(endTime <= (startTime + 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
Assert.assertEquals(PluginState.DONE, persistenceConnector.getPluginInstanceState(uuid, 1));
|
||||
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.gcube.common.scope.api.ScopeProvider;
|
|||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
||||
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
||||
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
|
||||
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
|
||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
|
||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnectorTest;
|
||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
|
||||
|
@ -57,7 +58,7 @@ public class SmartExecutorSchedulerTest {
|
|||
@AfterClass
|
||||
public static void close() throws Exception{
|
||||
try {
|
||||
pc.close();
|
||||
SmartExecutorPersistenceFactory.closePersistenceConnector();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to Close JDBCPersistenceConnector");
|
||||
throw e;
|
||||
|
@ -125,7 +126,13 @@ public class SmartExecutorSchedulerTest {
|
|||
public void earlyStopTest() throws Exception {
|
||||
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
|
||||
UUID uuid = scheduleTest(null, null);
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
try {
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
} catch(UnableToInterruptTaskException e){
|
||||
logger.error("UnableToInterruptTaskException this is the normal behaviour.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
long startTime = Calendar.getInstance().getTimeInMillis();
|
||||
long endTime = startTime;
|
||||
while(endTime <= (startTime + 12000)){
|
||||
|
@ -153,7 +160,7 @@ public class SmartExecutorSchedulerTest {
|
|||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -173,7 +180,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 12000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
PluginState pluginState = pc.getLastPluginInstanceState(uuid);
|
||||
Assert.assertEquals(PluginState.DONE, pluginState);
|
||||
|
@ -192,7 +199,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 2000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(first);
|
||||
smartExecutorScheduler.stop(first, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -218,7 +225,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 83 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -247,7 +254,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 80 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -280,7 +287,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 1.5 * 60 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -294,7 +301,6 @@ public class SmartExecutorSchedulerTest {
|
|||
PluginState.DISCARDED,
|
||||
PluginState.STOPPED
|
||||
};
|
||||
|
||||
|
||||
for(int i=0; i<expectedStates.length; i++){
|
||||
PluginState pluginState = pc.getPluginInstanceState(uuid, i+1);
|
||||
|
@ -368,7 +374,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 1.5 * 60 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -402,7 +408,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 1.5 * 60 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
@ -439,7 +445,7 @@ public class SmartExecutorSchedulerTest {
|
|||
while(endTime <= (startTime + 1.5 * 60 * 1000)){
|
||||
endTime = Calendar.getInstance().getTimeInMillis();
|
||||
}
|
||||
smartExecutorScheduler.stop(uuid);
|
||||
smartExecutorScheduler.stop(uuid, true, false);
|
||||
|
||||
startTime = Calendar.getInstance().getTimeInMillis();
|
||||
endTime = startTime;
|
||||
|
|
Loading…
Reference in New Issue