refs #111: Add Recurrent and scheduled Task support

https://support.d4science.org/issues/111
Merging from private branch

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@117512 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-07-28 13:41:35 +00:00
parent f01c6360b5
commit 7d7ab372d8
24 changed files with 1626 additions and 380 deletions

View File

@ -8,7 +8,7 @@
<dependent-module archiveName="common-smartgears-app-1.0.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/common-smartgears-app/common-smartgears-app">
<dependency-type>uses</dependency-type>
</dependent-module>
<dependent-module archiveName="smart-executor-api-1.0.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-api/smart-executor-api">
<dependent-module archiveName="smart-executor-api-1.2.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-api/smart-executor-api">
<dependency-type>uses</dependency-type>
</dependent-module>
<property name="java-output-path" value="/smart-executor/target/classes"/>

View File

@ -10,7 +10,7 @@
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<name>SmartExecutor</name>
<description>Smart Executor Service</description>
<packaging>war</packaging>

View File

@ -1,7 +1,5 @@
package org.gcube.vremanagement.executor;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.UUID;
import javax.jws.WebService;
@ -13,14 +11,9 @@ import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.persistence.JDBCPersistence;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.Persistence;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.pluginmanager.PluginThread;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,68 +38,11 @@ public class SmartExecutorImpl implements SmartExecutor {
public String launch(LaunchParameter parameter) throws InputsNullException,
PluginNotFoundException, LaunchException, ExecutorException {
Map<String, Object> inputs = parameter.getInputs();
if(inputs==null){
throw new InputsNullException();
}
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorInitalizator.getSmartExecutorScheduler();
UUID uuid = smartExecutorScheduler.schedule(parameter);
logger.debug(String.format("The Plugin named %s with UUID %s has been launched with the provided inputs", parameter.getName(), uuid));
String name = parameter.getName();
// Retrieve the PluginDeclaration class representing the plugin which
// have the name provided as input
logger.debug(String.format("Trying to instatiate a Plugin named %s", name));
PluginDeclaration pluginDeclaration = PluginManager.getInstance().getPlugin(name);
if(pluginDeclaration == null){
throw new PluginNotFoundException();
}
// Creating the UUID to associate to plugin instance to be run
UUID executionIdentifier = UUID.randomUUID();
// Retrieving the plugin instance class to be run from PluginDeclaration
Class<? extends Plugin<? extends PluginDeclaration>> plugin = pluginDeclaration.getPluginImplementation();
logger.debug(String.format("The class wich will run the execution will be %s", plugin.getName()));
// Retrieve the Constructor of Plugin to instantiate it
@SuppressWarnings("rawtypes")
Class[] argTypes = { pluginDeclaration.getClass() , Persistence.class };
logger.debug(String.format("Plugin named %s once instatiated will be identified by the UUID %s", name, executionIdentifier));
Constructor<? extends Plugin<? extends PluginDeclaration>> executorPluginConstructor;
try {
executorPluginConstructor = plugin.getDeclaredConstructor(argTypes);
} catch (Exception e) {
throw new LaunchException();
}
JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector();
// Create and instance of DB connection used to persist plugin evolution
JDBCPersistence jdbcEvolutionPersistence = new JDBCPersistence(jdbcPersistenceConnector, name, executionIdentifier);
// Create the Argument to pass to contructor
Object[] arguments = { pluginDeclaration, jdbcEvolutionPersistence};
// Instancing the plugin
Plugin<? extends PluginDeclaration> instantiatedPlugin;
try {
instantiatedPlugin = executorPluginConstructor.newInstance(arguments);
} catch(Exception e) {
throw new LaunchException();
}
logger.debug(String.format("Plugin named %s identified by the UUID %s has been instantiated", name, executionIdentifier));
// Creating the thread used to launch the plugin execution
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
new PluginThread<Plugin<? extends PluginDeclaration>>(instantiatedPlugin, inputs, executionIdentifier);
// Adding the thread to the pluginInstances
SmartExecutorInitalizator.getPluginInstances().put(executionIdentifier, pluginThread);
// Launching Thread from initially created pool
SmartExecutorInitalizator.getPool().execute(pluginThread);
logger.debug(String.format("The Plugin named %s with UUID %s has been launched with the provided inputs", name, executionIdentifier));
// TODO join the thread
return executionIdentifier.toString();
return uuid.toString();
}
/**{@inheritDoc}*/
@ -115,11 +51,22 @@ public class SmartExecutorImpl implements SmartExecutor {
throws PluginInstanceNotFoundException, ExecutorException {
try {
JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector();
return jdbcPersistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier));
return jdbcPersistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier));
} catch (Exception e) {
throw new PluginInstanceNotFoundException();
}
}
/**{@inheritDoc}*/
@Override
public PluginState getIterationState(String executionIdentifier, int iterationNumber)
throws PluginInstanceNotFoundException, ExecutorException {
try {
JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector();
return jdbcPersistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber);
} catch (Exception e) {
throw new PluginInstanceNotFoundException();
}
}
}

View File

@ -6,12 +6,8 @@ package org.gcube.vremanagement.executor;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.bind.annotation.XmlRootElement;
@ -39,11 +35,9 @@ import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.pluginmanager.PluginThread;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,18 +53,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
*/
private static Logger logger = LoggerFactory.getLogger(SmartExecutorInitalizator.class);
/**
* Pool for thread execution
*/
private static ExecutorService pool;
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
private static Map<UUID,
PluginThread<Plugin<? extends PluginDeclaration>>> pluginInstances;
public static final long JOIN_TIMEOUT = 1000;
/**
* Contains the ServiceEnpoint Resource to be published/unpublished on IS
@ -80,25 +63,18 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
/**
* Represent the connector to DB
*/
private static JDBCPersistenceConnector jdbcPersistenceConnector;
private static ApplicationContext ctx;
protected static JDBCPersistenceConnector jdbcPersistenceConnector;
/**
* @return the pool
* The application context
*/
public static ExecutorService getPool() {
return pool;
}
protected static ApplicationContext ctx;
/**
* @return the pluginInstances
* the Smart executor Scheduler used for task execution
*/
public static Map<UUID, PluginThread<Plugin<? extends PluginDeclaration>>> getPluginInstances() {
return pluginInstances;
}
protected static SmartExecutorScheduler smartExecutorScheduler;
/**
* @return the jdbcPersistenceConnector
*/
@ -113,6 +89,13 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
return ctx;
}
/**
* @return the smartExecutorScheduler
*/
public static SmartExecutorScheduler getSmartExecutorScheduler() {
return smartExecutorScheduler;
}
/**
* Publish the provided resource on all Service Scopes retrieved from
* Context
@ -303,13 +286,12 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
ctx = applicationLifecycleEventStart.context();
pool = Executors.newCachedThreadPool();
serviceEndpoint = createServiceEndpoint();
pluginInstances = new HashMap<UUID, PluginThread<Plugin<? extends PluginDeclaration>>>();
// checking if there are old unpublished ServiceEndpoint related to this ghn
// and try to unpublish them
smartExecutorScheduler = SmartExecutorScheduler.getInstance();
// checking if there are old unpublished ServiceEndpoint related to this
// vHN and try to unpublish them
List<String> scopes = getScopes(ctx);
for(String scope : scopes){
@ -381,33 +363,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
+ "Smart Executor is Stopping\n"
+ "-------------------------------------------------------");
for(UUID uuid : pluginInstances.keySet()){
try {
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
pluginInstances.get(uuid);
Plugin<? extends PluginDeclaration> pluginInstace =
pluginThread.getPlugin();
try {
logger.debug("Requesting Stop to plugin instance identified by the UUID %s of Plugin named {}",
uuid, pluginInstace.getPluginDeclaration().getName());
pluginInstace.stop();
logger.debug("Plugin instance identified by the UUID {} of Plugin named {} stopped coorectly itself",
uuid, pluginInstace.getPluginDeclaration().getName());
} catch (Exception e) {
logger.debug("Running plugin instance identified by the UUID {} of Plugin named {} failed to request of being stopped",
uuid, pluginInstace.getPluginDeclaration().getName());
} finally {
pluginInstace.setState(PluginState.SUSPENDED);
pluginInstances.remove(uuid);
}
} catch (Exception e) {
logger.error("Error stopping plugin instace with UUID {}", uuid, e);
}
}
// Trying to force shutdown of all threads
pool.shutdown();
smartExecutorScheduler.stopAll();
try {
List<String> scopes = getScopes(ctx);

View File

@ -0,0 +1,33 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AlreadyInFinalStateException extends Exception {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -7730594422282391883L;
public AlreadyInFinalStateException() {
super();
}
public AlreadyInFinalStateException(String message) {
super(message);
}
public AlreadyInFinalStateException(Throwable throwable){
super(throwable);
}
public AlreadyInFinalStateException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,33 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class MaxIterationRuntimeException extends RuntimeException {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -7730594422282391883L;
public MaxIterationRuntimeException() {
super();
}
public MaxIterationRuntimeException(String message) {
super(message);
}
public MaxIterationRuntimeException(Throwable throwable){
super(throwable);
}
public MaxIterationRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,34 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
import org.quartz.SchedulerException;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class SchedulerNotFoundException extends SchedulerException {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -7108678230246937588L;
public SchedulerNotFoundException() {
super();
}
public SchedulerNotFoundException(String message) {
super(message);
}
public SchedulerNotFoundException(Throwable throwable){
super(throwable);
}
public SchedulerNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,33 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class StopRuntimeException extends RuntimeException {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -7730594422282391883L;
public StopRuntimeException() {
super();
}
public StopRuntimeException(String message) {
super(message);
}
public StopRuntimeException(Throwable throwable){
super(throwable);
}
public StopRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -1,63 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.persistence;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.UUID;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class JDBCPersistence extends Persistence<JDBCPersistenceConnector> {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(JDBCPersistence.class);
private final Connection connection;
public JDBCPersistence(JDBCPersistenceConnector jdbcPersistenceConnector,
String name, UUID uuid){
super(jdbcPersistenceConnector, name, uuid);
this.connection = jdbcPersistenceConnector.getConnection();
}
/**{@inheritDoc}*/
@Override
public void addEvolution(long timestamp, PluginState pluginState)
throws Exception {
connection.setAutoCommit(false); // transaction block start
String insertPluginEvolution = String.format(
"INSERT INTO `%s` (`%s`,`%s`,`%s`,`%s`) VALUES (?,?,?,?)",
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_UUID_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_PLUGIN_NAME_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD);
logger.info(String.format("Base Query : %s. Parameters : %s,%s%s,%s",
insertPluginEvolution, uuid.toString(), name, timestamp,
pluginState.name()));
PreparedStatement psInsertPluginEvolution = connection
.prepareStatement(insertPluginEvolution);
psInsertPluginEvolution.setString(1, uuid.toString());
psInsertPluginEvolution.setString(2, name);
psInsertPluginEvolution.setLong(3, timestamp);
psInsertPluginEvolution.setInt(4, pluginState.ordinal());
psInsertPluginEvolution.executeUpdate();
connection.commit(); //transaction block end
}
}

View File

@ -36,10 +36,12 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE = "PluginInstanceEvolution";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_ID_FIELD = "id";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_UUID_FIELD = "uuid";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_ITERATION_FIELD = "iteration";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_PLUGIN_NAME_FIELD = "pluginName";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD = "timestamp";
public final static String PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD = "state";
public final static String CATALINA_HOME = "CATALINA_HOME";
@SuppressWarnings("unused")
@ -82,12 +84,14 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
"CREATE TABLE IF NOT EXISTS `%s` ("
+ "`%s` INT PRIMARY KEY AUTO_INCREMENT NOT NULL,"
+ "`%s` VARCHAR(36) NOT NULL,"
+ "`%s` INT NOT NULL,"
+ "`%s` VARCHAR(255) NOT NULL,"
+ "`%s` BIGINT NOT NULL,"
+ "`%s` INT NOT NULL);",
PLUGIN_INSTANCE_EVOLUTION_TABLE,
PLUGIN_INSTANCE_EVOLUTION_TABLE_ID_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_UUID_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_ITERATION_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_PLUGIN_NAME_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD);
@ -115,7 +119,28 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
/**{@inheritDoc} */
@Override
public PluginState getPluginInstanceState(UUID uuid) throws Exception {
public PluginState getPluginInstanceState(UUID uuid, int iterationNumber) throws Exception {
String query = String.format(
"SELECT `%s`,`%s` FROM `%s` WHERE `%s`=? AND `%s`=? ORDER BY `%s` DESC LIMIT 1",
PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE,
PLUGIN_INSTANCE_EVOLUTION_TABLE_UUID_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_ITERATION_FIELD,
PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD);
logger.debug(query);
PreparedStatement ps = connection.prepareStatement(query);
ps.setString(1, uuid.toString());
ps.setInt(2, iterationNumber);
ResultSet resultSet = ps.executeQuery();
resultSet.first();
int stateOrdinal = resultSet.getInt(PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD);
return PluginState.values()[stateOrdinal];
}
/**{@inheritDoc} */
@Override
public PluginState getLastPluginInstanceState(UUID uuid) throws Exception {
String query = String.format(
"SELECT `%s`,`%s` FROM `%s` WHERE `%s`=? ORDER BY `%s` DESC LIMIT 1",
PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD,
@ -131,6 +156,36 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
int stateOrdinal = resultSet.getInt(PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD);
return PluginState.values()[stateOrdinal];
}
@Override
public void addEvolution(UUID uuid, int iteration, long timestamp, String pluginName, PluginState pluginState)
throws Exception {
connection.setAutoCommit(false); // transaction block start
String insertPluginEvolution = String.format(
"INSERT INTO `%s` (`%s`,`%s`,`%s`,`%s`,`%s`) VALUES (?,?,?,?,?)",
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_UUID_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_ITERATION_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_PLUGIN_NAME_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_TIMESTAMP_FIELD,
JDBCPersistenceConnector.PLUGIN_INSTANCE_EVOLUTION_TABLE_STATE_FIELD);
logger.info(String.format("Base Query : %s. Parameters : %s,%d,%s,%s,%s",
insertPluginEvolution, uuid, iteration, pluginName, timestamp, pluginState.name()));
PreparedStatement psInsertPluginEvolution = connection
.prepareStatement(insertPluginEvolution);
psInsertPluginEvolution.setString(1, uuid.toString());
psInsertPluginEvolution.setInt(2, iteration);
psInsertPluginEvolution.setString(3, pluginName);
psInsertPluginEvolution.setLong(4, timestamp);
psInsertPluginEvolution.setInt(5, pluginState.ordinal());
psInsertPluginEvolution.executeUpdate();
connection.commit(); //transaction block end
}
/**{@inheritDoc} */
@Override
@ -138,4 +193,11 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
connection.close();
}
/**{@inheritDoc} */
@Override
public void pluginStateEvolution(UUID uuid, int iteration,
long timestamp, String pluginName, PluginState pluginState) throws Exception {
addEvolution(uuid, iteration, timestamp, pluginName, pluginState);
}
}

View File

@ -1,9 +1,13 @@
package org.gcube.vremanagement.executor.pluginmanager;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,6 +36,57 @@ public class PluginManager {
*/
private Map<String, PluginDeclaration> availablePlugins;
public static Plugin<? extends PluginDeclaration> instantiatePlugin(
String pluginName) throws InputsNullException,
PluginNotFoundException {
// Retrieve the PluginDeclaration class representing the plugin which
// have the name provided as input
logger.debug(String.format("Trying to instantiate a Plugin named %s",
pluginName));
PluginDeclaration pluginDeclaration = PluginManager.getInstance()
.getPlugin(pluginName);
if (pluginDeclaration == null) {
throw new PluginNotFoundException();
}
// Retrieving the plugin instance class to be run from PluginDeclaration
Class<? extends Plugin<? extends PluginDeclaration>> plugin = pluginDeclaration
.getPluginImplementation();
logger.debug(String.format(
"The class which will run the execution will be %s",
plugin.getName()));
// Retrieve the Constructor of Plugin to instantiate itPLUGIN
@SuppressWarnings("rawtypes")
Class[] argTypes = { pluginDeclaration.getClass() };
// logger.debug(String.format("Plugin named %s once instatiated will be identified by the UUID %s",
// name, executionIdentifier));
Constructor<? extends Plugin<? extends PluginDeclaration>> executorPluginConstructor;
try {
executorPluginConstructor = plugin.getDeclaredConstructor(argTypes);
} catch (Exception e) {
throw new PluginNotFoundException();
}
// Create the Argument to pass to constructor
Object[] arguments = { pluginDeclaration };
// Instancing the plugin
Plugin<? extends PluginDeclaration> instantiatedPlugin;
try {
instantiatedPlugin = executorPluginConstructor
.newInstance(arguments);
} catch (Exception e) {
throw new PluginNotFoundException();
}
logger.debug(String
.format("Plugin named %s has been instantiated", pluginName));
return instantiatedPlugin;
}
/**
* Get the singleton instance of {@link #PluginManager}.
* The first time this function is invoked the instance is null

View File

@ -1,76 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.pluginmanager;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.SmartExecutorInitalizator;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class PluginThread<T extends Plugin<? extends PluginDeclaration>> extends Thread {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(PluginThread.class);
protected static final String SEPARATOR = "---";
public static final String getThreadName(Plugin<? extends PluginDeclaration> plugin,
Map<String, Object> inputs, UUID uuid){
return String.format("%s%s%s%s%s",
plugin.getPluginDeclaration().getName(),SEPARATOR,
inputs.hashCode(), SEPARATOR, uuid);
}
protected final T plugin;
protected final UUID uuid;
protected final Map<String, Object> inputs;
public PluginThread(T plugin, Map<String, Object> inputs, UUID uuid){
super(getThreadName(plugin, inputs, uuid));
this.plugin = plugin;
this.uuid = uuid;
this.inputs = inputs;
this.plugin.setState(PluginState.CREATED);
}
@Override
public void run(){
try {
plugin.setState(PluginState.RUNNING);
plugin.launch(inputs);
plugin.setState(PluginState.DONE);
} catch(Exception e) {
logger.trace(String.format("Thread %s failed", getName()),e);
plugin.setState(PluginState.FAILED);
} finally{
SmartExecutorInitalizator.getPluginInstances().remove(uuid);
}
}
/**
* @return the plugin
*/
public T getPlugin() {
return plugin;
}
/**
* @return the launchInputs
*/
public Map<String, Object> getInputs() {
return inputs;
}
}

View File

@ -0,0 +1,146 @@
/**
*
*/
package org.gcube.vremanagement.executor.pluginmanager;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> implements Runnable {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(RunnablePlugin.class);
public static final String MAX_LAUNCH_TIMES = "___max_launch_times___";
protected static final String SEPARATOR = "---";
protected final T plugin;
protected final Map<String, Object> inputs;
protected final UUID uuid;
protected final int iterationNumber;
protected final List<PluginStateNotification> pluginStateNotifications;
protected PluginState actualState;
public RunnablePlugin(T plugin, Map<String, Object> inputs,
UUID uuid, int iterationNumber, List<PluginStateNotification> pluginStateNotifications){
this.plugin = plugin;
this.inputs = inputs;
this.uuid = uuid;
this.iterationNumber = iterationNumber;
this.pluginStateNotifications = pluginStateNotifications;
try {
setState(PluginState.CREATED);
} catch (AlreadyInFinalStateException e) {
logger.error(" --- You should not be here. Seem that the {} is suspended before the istance is created. This is really STRANGE.",
uuid);
throw new RuntimeException(e);
}
}
@Override
public void run(){
try {
setState(PluginState.RUNNING);
plugin.launch(inputs);
setState(PluginState.DONE);
} catch (AlreadyInFinalStateException e1) {
return;
} catch(Exception e) {
logger.trace(String.format("Thread %s failed", this.toString()),e);
try {
setState(PluginState.FAILED);
} catch (AlreadyInFinalStateException e1) {
return;
}
throw new RuntimeException(e);
}
}
/**
* @return the plugin
*/
public T getPlugin() {
return plugin;
}
/**
* @return the launchInputs
*/
public Map<String, Object> getInputs() {
return inputs;
}
/**
* It is up to the plugin update the State of the Running Plugin using
* this facilities function.
* @param pluginState
* @throws Exception
*/
public synchronized void setState(PluginState pluginState) throws AlreadyInFinalStateException {
long timestamp = new Date().getTime();
if(actualState!=null && actualState.isFinalState()){
logger.trace("At {} Trying to set {} in {} state, but it was already in the final state {}", timestamp,
uuid, pluginState.toString(), actualState.toString());
throw new AlreadyInFinalStateException();
}
actualState = pluginState;
String pluginName = plugin.getPluginDeclaration().getName();
for(PluginStateNotification pluginStateNotification : pluginStateNotifications){
String pluginStateNotificationName = pluginStateNotification.getClass().getSimpleName();
try {
logger.debug("Adding State Evolution with {} : {}, {}, {}, {}, {}",
pluginStateNotificationName, uuid, iterationNumber,
timestamp, pluginName, pluginState.name());
pluginStateNotification.pluginStateEvolution(uuid, iterationNumber, timestamp, pluginName, pluginState);
} catch(Exception e) {
logger.error("Unable to persist State with {} : {}, {}, {}, {}, {}",
pluginStateNotificationName, uuid, iterationNumber,
timestamp, pluginName, pluginState.name());
}
}
}
@Override
public String toString(){
return String.format("UUID : %s, Iteration : %d, Plugin : %s",
uuid.toString(), iterationNumber,
plugin.getPluginDeclaration().getName());
}
/**
* Stop the Plugin setting state to {@link PluginState#CANCELLED}
* @throws Exception
*/
public void stop() throws Exception {
try{
setState(PluginState.STOPPED);
plugin.stop();
}catch(AlreadyInFinalStateException e){}
Thread.currentThread().interrupt();
}
}

View File

@ -0,0 +1,31 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduler;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class JobCompletedNotification implements PluginStateNotification {
protected final Map<Integer, PluginState> executionsState;
public JobCompletedNotification(Map<Integer, PluginState> executionsState){
this.executionsState = executionsState;
}
@Override
public void pluginStateEvolution(UUID uuid, int iteration, long timestamp,
String pluginName, PluginState pluginState) throws Exception {
executionsState.put(iteration, pluginState);
}
}

View File

@ -0,0 +1,275 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.persistence.PersistenceConnector;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.pluginmanager.RunnablePlugin;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class SmartExecutorJob implements InterruptableJob {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(SmartExecutorJob.class);
public static final String UUID = "UUID";
public static final String LAUNCH_PARAMETER = "LAUNCH_PARAMETER";
protected static Map<UUID, Integer> executionsCount;
/**
* @return the executionsCount
*/
public static Map<UUID, Integer> getExecutionsCount() {
return executionsCount;
}
protected static Map<UUID, Map<Integer, PluginState>> executionsState;
static {
executionsCount = new HashMap<UUID, Integer>();
executionsState = new HashMap<UUID, Map<Integer, PluginState>>();
}
protected List<PluginStateNotification> pluginStateNotifications;
protected boolean initialized;
protected UUID uuid;
protected LaunchParameter launchParameter;
/* Derived from launchParameter*/
protected int executionCount;
protected String pluginName;
protected Plugin<? extends PluginDeclaration> plugin;
protected Map<String, Object> inputs;
protected RunnablePlugin<Plugin<? extends PluginDeclaration>> runnablePlugin;
protected boolean mustPreviousExecutionsCompleted;
protected int maxExecutionNumber;
/**/
protected void init(JobDataMap jobDataMap) throws JobExecutionException{
uuid = (UUID) jobDataMap.get(UUID);
launchParameter = (LaunchParameter) jobDataMap.get(LAUNCH_PARAMETER);
pluginName = launchParameter.getName();
try {
plugin = PluginManager.instantiatePlugin(pluginName);
} catch (InputsNullException | PluginNotFoundException e) {
throw new JobExecutionException(e);
}
inputs = launchParameter.getInputs();
Scheduling scheduling = launchParameter.getScheduling();
if(scheduling!=null){
mustPreviousExecutionsCompleted = scheduling.mustPreviousExecutionsCompleted();
if(mustPreviousExecutionsCompleted){
Map<Integer, PluginState> executionState;
if(executionsState.containsKey(uuid)){
executionState = executionsState.get(uuid);
}else{
executionState = new HashMap<Integer, PluginState>();
executionsState.put(uuid, executionState);
executionState.put(0, PluginState.DONE);
}
pluginStateNotifications.add(new JobCompletedNotification(executionState));
}
maxExecutionNumber = scheduling.getSchedulingTimes();
}
initialized = true;
}
protected Boolean interrupted;
public SmartExecutorJob(){
this.interrupted = false;
this.initialized = false;
pluginStateNotifications = new ArrayList<PluginStateNotification>();
pluginStateNotifications.add(PersistenceConnector.getPersistenceConnector());
}
/**
* @return the uuid
*/
public UUID getUUID() {
return uuid;
}
/**
* @return the parameter
*/
public LaunchParameter getLaunchParameter() {
return launchParameter;
}
/**
* {@inheritDoc}
*/
@Override
public void execute(JobExecutionContext context)
throws JobExecutionException {
logger.debug("Execute of {}", this);
synchronized(this){
if(interrupted){
logger.info("A job interruption has been called before that this {} has been executed for the first time", SmartExecutorJob.class.getSimpleName());
return;
}
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
logger.debug("Execute of {} with {}", this, jobDataMap);
if(!initialized){
init(jobDataMap);
}
if(executionsCount.containsKey(uuid)) {
executionCount = executionsCount.get(uuid);
} else{
executionCount = 0;
}
executionCount++;
executionsCount.put(uuid, executionCount);
if(isMaxExecutionNumberReached()){
logger.debug("The Scheduled Max Number of execution ({}) is reached. The Job {} will be descheduled", maxExecutionNumber, uuid);
try {
deschedule();
} catch (SchedulerException e) {
throw new JobExecutionException(e);
}
return;
}
runnablePlugin = new RunnablePlugin<Plugin<? extends PluginDeclaration>>(
plugin, inputs, uuid, executionCount, pluginStateNotifications);
logger.debug("Going to run Job with ID {} (iteration {})", uuid, executionCount);
}
if(mustPreviousExecutionsCompleted){
Map<Integer, PluginState> executionState = executionsState.get(uuid);
boolean previousExecutionCompleted = true;
int notTerminatedExecutionNumber = -1;
for(int i=executionCount-1; i>=0; i--){
PluginState previousState = executionState.get(i);
previousExecutionCompleted = previousState.isFinalState();
if(!previousExecutionCompleted){
notTerminatedExecutionNumber = i;
break;
}
}
if(previousExecutionCompleted){
runnablePlugin.run();
}else{
logger.info("A previuos execution ({}) is still not completed. The Launch Parameters require this. This execution ({}) is discarded.",
notTerminatedExecutionNumber, executionCount);
try {
runnablePlugin.setState(PluginState.DISCARDED);
} catch (AlreadyInFinalStateException e) { }
}
}else{
runnablePlugin.run();
}
}
protected synchronized void finished(JobExecutionContext context){
logger.debug("Job with ID {} (iteration {})terminated", uuid, executionCount);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
if(!initialized){
logger.info("{} does not need to be interrupted, because the execute method is not still called.", SmartExecutorJob.class.getSimpleName());
interrupted = true;
return;
}
logger.debug("Trying to interrupt {} iteration({})", uuid, executionCount);
try {
logger.debug("Requesting Stop to plugin instance ({}) identified by the UUID {} of Plugin named {}",
executionCount, uuid, pluginName);
runnablePlugin.stop();
logger.debug("Plugin instance ({}) identified by the UUID {} of Plugin named {} stopped itself correctly.",
executionCount, uuid, pluginName);
} catch (Exception e) {
logger.error("Running plugin instance ({}) identified by the UUID {} of Plugin named {} failed to request of being stopped.",
executionCount, uuid, pluginName);
}
}
protected boolean isMaxExecutionNumberReached(){
if(maxExecutionNumber==0){
return false;
}
if(executionCount>maxExecutionNumber){
return true;
}
return false;
}
protected void deschedule() throws SchedulerException{
SmartExecutorScheduler.getInstance().stop(uuid);
}
@Override
public String toString(){
if(!initialized){
return super.toString();
}
return String.format("JOb with ID %s (iteration %d). Parameters : %s", uuid, executionCount, launchParameter);
}
}

View File

@ -0,0 +1,51 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduler;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class SmartExecutorJobListener implements JobListener {
/**
* {@inheritDoc}
*/
@Override
public String getName() {
return this.getClass().getSimpleName();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void jobToBeExecuted(JobExecutionContext context) {
//SmartExecutorJob smartExecutorJob = (SmartExecutorJob) context.getJobInstance();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void jobExecutionVetoed(JobExecutionContext context) {
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
SmartExecutorJob smartExecutorJob = (SmartExecutorJob) context.getJobInstance();
smartExecutorJob.finished(context);
}
}

View File

@ -0,0 +1,231 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduler;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class SmartExecutorScheduler {
private static Logger logger = LoggerFactory
.getLogger(SmartExecutorScheduler.class);
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
protected Map<UUID, Scheduler> activeSchedulers;
private static SmartExecutorScheduler smartExecutorScheduler;
public synchronized static SmartExecutorScheduler getInstance() {
if (smartExecutorScheduler == null) {
smartExecutorScheduler = new SmartExecutorScheduler();
}
return smartExecutorScheduler;
}
private SmartExecutorScheduler() {
activeSchedulers = new HashMap<UUID, Scheduler>();
}
protected TriggerBuilder<? extends Trigger> createTriggerBuilder(UUID uuid, ScheduleBuilder<? extends Trigger> sb){
return TriggerBuilder.newTrigger().withIdentity(uuid.toString())
.withSchedule(sb);
}
protected TriggerBuilder<? extends Trigger> getTriggerBuilderWithScheduling(UUID uuid, Scheduling scheduling) throws LaunchException{
final int times = scheduling.getSchedulingTimes();
if (scheduling.getCronExpression() != null) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
.cronSchedule(scheduling.getCronExpression());
return createTriggerBuilder(uuid, cronScheduleBuilder);
// TODO Limit number of times
}
if (scheduling.getDelay() != null) {
SimpleScheduleBuilder simpleScheduleBuilder;
if (times != 0) {
simpleScheduleBuilder = SimpleScheduleBuilder
.repeatSecondlyForTotalCount(times, scheduling.getDelay());
}else{
simpleScheduleBuilder = SimpleScheduleBuilder.
repeatSecondlyForever(scheduling.getDelay());
}
return createTriggerBuilder(uuid, simpleScheduleBuilder);
}
throw new LaunchException("Invalid Scheduling");
}
protected Scheduler reallySchedule(final UUID uuid, Scheduler scheduler, LaunchParameter parameter) throws LaunchException,
InputsNullException, PluginNotFoundException {
JobKey jobKey = new JobKey(uuid.toString());
JobDetail jobDetail = JobBuilder.newJob(SmartExecutorJob.class).
withIdentity(jobKey).build();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put(SmartExecutorJob.UUID, uuid);
jobDataMap.put(SmartExecutorJob.LAUNCH_PARAMETER, parameter);
@SuppressWarnings("rawtypes")
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger()
.withIdentity(uuid.toString());
Scheduling scheduling = parameter.getScheduling();
if (scheduling != null) {
triggerBuilder = getTriggerBuilderWithScheduling(uuid, scheduling);
if (scheduling.getFirtStartTime() != null && scheduling.getFirtStartTime().longValue()!=0) {
Date triggerStartTime = new Date(scheduling.getFirtStartTime());
triggerBuilder.startAt(triggerStartTime);
} else {
triggerBuilder.startNow();
}
if (scheduling.getEndTime() != null && scheduling.getEndTime().longValue()!=0) {
Date triggerEndTime = new Date(scheduling.getEndTime());
triggerBuilder.endAt(triggerEndTime);
}
} else {
triggerBuilder.startNow();
}
try {
SmartExecutorJobListener sejl = new SmartExecutorJobListener();
scheduler.getListenerManager().addJobListener(sejl);
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
return scheduler;
}
public synchronized UUID schedule(LaunchParameter parameter) throws LaunchException,
InputsNullException, PluginNotFoundException {
Map<String, Object> inputs = parameter.getInputs();
if (inputs == null) {
throw new RuntimeException();
}
final UUID uuid = UUID.randomUUID();
Scheduler scheduler;
try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.getScheduler();
reallySchedule(uuid, scheduler, parameter);
activeSchedulers.put(uuid, scheduler);
scheduler.start();
} catch (SchedulerException e) {
throw new LaunchException(e);
}
return uuid;
}
public Scheduler getScheduler(UUID key){
return activeSchedulers.get(key);
}
public synchronized void stop(UUID uuid) throws SchedulerException {
Scheduler scheduler = activeSchedulers.get(uuid);
if(scheduler==null){
throw new SchedulerNotFoundException("Scheduler Not Found");
}
JobKey jobKey = new JobKey(uuid.toString());
boolean exist = scheduler.checkExists(jobKey);
if(!exist){
logger.trace("Job {} does not exist. Terminating the stop method", uuid);
return;
}else{
logger.trace("Job {} exist", uuid);
}
logger.trace("Getting {} list", JobExecutionContext.class.getSimpleName());
List<JobExecutionContext> cej = scheduler.getCurrentlyExecutingJobs();
while (cej.isEmpty()){
cej = scheduler.getCurrentlyExecutingJobs();
}
logger.trace("{} list got", JobExecutionContext.class.getSimpleName());
boolean interrupted = scheduler.interrupt(jobKey);
if (interrupted) {
logger.debug("Job {} interrupted successfully. Going to delete it.", uuid);
} else {
logger.debug("Job {} was not interrupted, going to delete it", uuid);
}
boolean deleted = scheduler.deleteJob(jobKey);
if (deleted) {
logger.debug("Job {} deleted successfully", uuid);
} else {
logger.debug("Job {} was not deleted", uuid);
}
activeSchedulers.remove(uuid);
scheduler.clear();
}
public void stopAll() {
List<UUID> set = new ArrayList<UUID>(activeSchedulers.keySet());
for (UUID uuid : set) {
try {
stop(uuid);
} catch (Exception e) {
logger.error("Error stopping plugin instace with UUID {}",
uuid, e);
}
}
}
}

View File

@ -25,7 +25,7 @@
</fileSets>
<files>
<file>
<source>target/smart-executor-1.0.0-SNAPSHOT.war</source>
<source>target/smart-executor-1.1.0-SNAPSHOT.war</source>
<outputDirectory>/smart-executor</outputDirectory>
</file>
<file>

View File

@ -6,20 +6,20 @@
<Class>VREManagement</Class>
<Name>SmartExecutor</Name>
<Description>Smart Executor Service</Description>
<Version>1.0.0-SNAPSHOT</Version>
<Version>1.1.0-SNAPSHOT</Version>
<Packages>
<Software>
<Description>Smart Executor Service</Description>
<Name>smart-executor</Name>
<Version>1.0.0-SNAPSHOT</Version>
<Version>1.1.0-SNAPSHOT</Version>
<MavenCoordinates>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</MavenCoordinates>
<Type>library</Type>
<Files>
<File>smart-executor-1.0.0-SNAPSHOT.jar</File>
<File>smart-executor-1.1.0-SNAPSHOT.jar</File>
</Files>
</Software>
</Packages>

View File

@ -3,16 +3,16 @@
*/
package org.gcube.vremanagement.executor;
import java.util.HashMap;
import java.util.Map;
import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
import org.gcube.common.resources.gcore.utils.Group;
import org.gcube.vremanagement.executor.api.SmartExecutor;
import org.junit.Assert;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.junit.Test;
/**
@ -21,9 +21,10 @@ import org.junit.Test;
*/
public class ExecutorImplTest {
//@Test
/*
@Test
public void createServiceEndpointTest() {
/* TODO Redesign this test
TODO Redesign this test
ServiceEndpoint serviceEndpoint = SmartExecutorInitalizator.createServiceEndpoint();
Profile profile = serviceEndpoint.profile();
@ -43,14 +44,20 @@ public class ExecutorImplTest {
Assert.assertEquals(supportedCapabilities.get(propertyName), property.value());
}
}
*/
}
*/
@Test
public void myJavaTest(){
// Use this function to implements snippet of code to test before
// inserting in the service code.
public void myJavaTest() throws Exception{
SmartExecutorInitalizator.jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
Map<String, Object> inputs = new HashMap<String, Object>();
long sleepTime = 10000;
inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime);
Plugin<? extends PluginDeclaration> runnablePlugin = PluginManager.instantiatePlugin(HelloWorldPluginDeclaration.NAME);
runnablePlugin.launch(inputs);
}
}

View File

@ -32,13 +32,12 @@ public class JDBCPersistenceConnectorTest {
UUID uuid = UUID.randomUUID();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
JDBCPersistence jdbcPersistence = new JDBCPersistence(jdbcPersistenceConnector, HelloWorldPluginDeclaration.NAME, uuid);
PluginState[] states = PluginState.values();
for(int i=0; i<states.length; i++){
long timestamp = new Date().getTime();
jdbcPersistence.addEvolution(timestamp, states[i]);
PluginState ps = jdbcPersistenceConnector.getPluginInstanceState(uuid);
jdbcPersistenceConnector.addEvolution(uuid, 1, timestamp, HelloWorldPluginDeclaration.NAME, states[i]);
PluginState ps = jdbcPersistenceConnector.getPluginInstanceState(uuid, 1);
Assert.assertEquals(states[i], ps);
}

View File

@ -1,81 +0,0 @@
package org.gcube.vremanagement.executor.pluginmanager;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.persistence.JDBCPersistence;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.junit.Assert;
import org.junit.Test;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class PluginThreadTest {
@Test
public void launchNullInputsTest() throws Exception {
System.out.println("Testing Null inputs");
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
JDBCPersistence jdbcPersistence = new JDBCPersistence(jdbcPersistenceConnector, hwpd.getName(), uuid);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd, jdbcPersistence);
try {
new PluginThread<HelloWorldPlugin>(helloWorldPlugin, null, uuid);
} catch(Exception e){
Assert.assertEquals(NullPointerException.class, e.getClass());
}
jdbcPersistenceConnector.close();
}
@Test
public void launchEmptyInputsTest() throws Exception {
System.out.println("Testing Empty inputs");
Map<String, Object> inputs = new HashMap<String, Object>();
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
JDBCPersistence jdbcPersistence = new JDBCPersistence(jdbcPersistenceConnector, hwpd.getName(), uuid);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd, jdbcPersistence);
PluginThread<HelloWorldPlugin> pt = new PluginThread<HelloWorldPlugin>(helloWorldPlugin, inputs, uuid);
pt.start();
Thread.sleep(1000);
Assert.assertEquals(PluginState.FAILED, jdbcPersistence.getState());
jdbcPersistenceConnector.close();
}
@Test
public void launchValidInputsTest() throws Exception {
System.out.println("Testing Some inputs");
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("Test", "Test");
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
JDBCPersistence jdbcPersistence = new JDBCPersistence(jdbcPersistenceConnector, hwpd.getName(), uuid);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd, jdbcPersistence);
PluginThread<HelloWorldPlugin> pt = new PluginThread<HelloWorldPlugin>(helloWorldPlugin, inputs, uuid);
Assert.assertEquals(PluginState.CREATED, jdbcPersistence.getState());
pt.start();
Thread.sleep(1000);
Assert.assertEquals(PluginState.RUNNING, jdbcPersistence.getState());
Thread.sleep(4000);
Assert.assertEquals(PluginState.RUNNING, jdbcPersistence.getState());
Thread.sleep(6000);
Assert.assertEquals(PluginState.DONE, jdbcPersistence.getState());
jdbcPersistenceConnector.close();
}
}

View File

@ -0,0 +1,86 @@
package org.gcube.vremanagement.executor.pluginmanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.InvalidInputsException;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.junit.Assert;
import org.junit.Test;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class RunnablePluginTest {
@Test
public void launchNullInputsTest() throws Exception {
System.out.println("Testing Null inputs");
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
List<PluginStateNotification> pluginStateNotifications = new ArrayList<PluginStateNotification>();
pluginStateNotifications.add(jdbcPersistenceConnector);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd);
try {
RunnablePlugin<HelloWorldPlugin> runnablePlugin = new RunnablePlugin<HelloWorldPlugin>(helloWorldPlugin, null, uuid, 1, pluginStateNotifications);
runnablePlugin.run();
} catch(Exception e){
Assert.assertEquals(InputsNullException.class, e.getCause().getClass());
}
jdbcPersistenceConnector.close();
}
@Test
public void launchEmptyInputsTest() throws Exception {
System.out.println("Testing Empty inputs");
Map<String, Object> inputs = new HashMap<String, Object>();
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
List<PluginStateNotification> pluginStateNotifications = new ArrayList<PluginStateNotification>();
pluginStateNotifications.add(jdbcPersistenceConnector);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd);
RunnablePlugin<HelloWorldPlugin> pt = new RunnablePlugin<HelloWorldPlugin>(helloWorldPlugin, inputs, uuid, 1, pluginStateNotifications);
try {
pt.run();
} catch(RuntimeException e) {
Assert.assertEquals(InvalidInputsException.class, e.getCause().getClass());
}
}
@Test
public void launchValidInputsTest() throws Exception {
System.out.println("Testing Some inputs");
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("Test", "Test");
long sleepTime = 10000;
inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime);
UUID uuid = UUID.randomUUID();
HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration();
JDBCPersistenceConnector jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
List<PluginStateNotification> pluginStateNotifications = new ArrayList<PluginStateNotification>();
pluginStateNotifications.add(jdbcPersistenceConnector);
HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd);
RunnablePlugin<HelloWorldPlugin> rp = new RunnablePlugin<HelloWorldPlugin>(helloWorldPlugin, inputs, uuid, 1, pluginStateNotifications);
Assert.assertEquals(PluginState.CREATED, jdbcPersistenceConnector.getPluginInstanceState(uuid, 1));
rp.run();
Assert.assertEquals(PluginState.DONE, jdbcPersistenceConnector.getPluginInstanceState(uuid, 1));
jdbcPersistenceConnector.close();
}
}

View File

@ -0,0 +1,487 @@
/**
*
*/
package org.gcube.vremanagement.executor.pluginmanager;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.PersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.h2.jdbc.JdbcSQLException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class SmartExecutorSchedulerTest {
private static Logger logger = LoggerFactory.getLogger(SmartExecutorSchedulerTest.class);
public static final String START = "START";
public static final String END = "END";
public static PersistenceConnector pc;
@BeforeClass
public static void prepare() throws Exception{
try {
pc = new JDBCPersistenceConnector(".");
PersistenceConnector.setPersistenceConnector(pc);
} catch (Exception e) {
logger.error("Unable to Create JDBCPersistenceConnector");
throw e;
}
}
@AfterClass
public static void close() throws Exception{
try {
pc.close();
} catch (Exception e) {
logger.error("Unable to Close JDBCPersistenceConnector");
throw e;
}
}
public UUID scheduleTest(Scheduling scheduling, Long sleepTime) throws Exception {
Map<String, Object> inputs = new HashMap<String, Object>();
if(sleepTime==null){
sleepTime = new Long(10*1000); // 10 sec = 10 * 1000 millisec
}
inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime);
inputs.put("Test UUID", UUID.randomUUID());
logger.debug("Inputs : {}", inputs);
LaunchParameter parameter = new LaunchParameter(HelloWorldPluginDeclaration.NAME, inputs);
parameter.setScheduling(scheduling);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
UUID uuid = smartExecutorScheduler.schedule(parameter);
logger.debug("Scheduled Job with ID {}", uuid);
return uuid;
}
@Test
public void schedulingTest() throws Exception {
UUID uuid = scheduleTest(null, null);
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 12000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState pluginState = pc.getLastPluginInstanceState(uuid);
Assert.assertEquals(PluginState.DONE, pluginState);
}
@Test
public void earlyStopTest() throws Exception {
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
UUID uuid = scheduleTest(null, null);
smartExecutorScheduler.stop(uuid);
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 12000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
try{
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}catch(JdbcSQLException e){
// OK
}
}
@Test
public void middleStopTest() throws Exception {
UUID uuid = scheduleTest(null, null);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 2000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 10000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState pluginState = pc.getLastPluginInstanceState(uuid);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}
@Test
public void lateStopTest() throws Exception {
UUID uuid = scheduleTest(null, null);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 12000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
PluginState pluginState = pc.getLastPluginInstanceState(uuid);
Assert.assertEquals(PluginState.DONE, pluginState);
}
@Test
public void doubleLaunchfirstStoppedSchedulingTest() throws Exception {
UUID first = scheduleTest(null, null);
logger.debug("First scheduled id {}", first);
UUID second = scheduleTest(null, null);
logger.debug("Second scheduled id {}", second);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 2000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(first);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 12000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState pluginState = pc.getLastPluginInstanceState(first);
Assert.assertEquals(PluginState.STOPPED, pluginState);
pluginState = pc.getLastPluginInstanceState(second);
Assert.assertEquals(PluginState.DONE, pluginState);
}
@Test
public void delayed() throws Exception {
Scheduling scheduling = new Scheduling(20);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 80 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=1; i<5; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i);
Assert.assertEquals(PluginState.DONE, pluginState);
}
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}
@Test
public void delayedPreviousMustBeTerminated() throws Exception {
Scheduling scheduling = new Scheduling(20, true);
UUID uuid = scheduleTest(scheduling, new Long(22 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 80 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=1; i<5; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i);
if(i%2==1){
Assert.assertEquals(PluginState.DONE, pluginState);
}else{
Assert.assertEquals(PluginState.DISCARDED, pluginState);
}
}
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}
@Test
public void delayedAllPreviousMustBeTerminated() throws Exception {
Scheduling scheduling = new Scheduling(20, true);
UUID uuid = scheduleTest(scheduling, new Long(45 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState[] expectedStates = new PluginState[]{
PluginState.DONE,
PluginState.DISCARDED,
PluginState.DISCARDED,
PluginState.STOPPED
};
for(int i=0; i<expectedStates.length; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i+1);
Assert.assertEquals(expectedStates[i], pluginState);
}
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.DISCARDED, pluginState);
}
@Test(expected=JdbcSQLException.class)
public void delayedExpMaxtimes() throws Exception {
Scheduling scheduling = new Scheduling(20, 3);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=0; i<3; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i+1);
Assert.assertEquals(PluginState.DONE, pluginState);
}
pc.getPluginInstanceState(uuid, 4);
}
@Test(expected=JdbcSQLException.class)
public void delayedExpTimeLimits() throws Exception {
Calendar firstStartTime = Calendar.getInstance();
Calendar stopTime = Calendar.getInstance();
stopTime.setTimeInMillis(firstStartTime.getTimeInMillis());
stopTime.add(Calendar.SECOND, 10);
firstStartTime.add(Calendar.SECOND, 5);
Scheduling scheduling = new Scheduling(20, 0, firstStartTime, stopTime);
logger.debug("Scheduling : {}", scheduling);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState pluginState = pc.getPluginInstanceState(uuid, 1);
Assert.assertEquals(PluginState.DONE, pluginState);
pc.getPluginInstanceState(uuid, 2);
}
@Test
public void cronExp() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Scheduling scheduling = new Scheduling(cronExpression);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=1; i<5; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i);
Assert.assertEquals(PluginState.DONE, pluginState);
}
try{
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}catch(JdbcSQLException e){
// OK
}
}
@Test
public void cronExpPreviousMustBeTerminated() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Scheduling scheduling = new Scheduling(cronExpression, true);
UUID uuid = scheduleTest(scheduling, new Long(30 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=1; i<5; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i);
if(i%2==1){
Assert.assertEquals(PluginState.DONE, pluginState);
}else{
Assert.assertEquals(PluginState.DISCARDED, pluginState);
}
}
try{
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginState);
}catch(JdbcSQLException e){
// OK
}
}
@Test
public void cronExpAllPreviousTerminated() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Scheduling scheduling = new Scheduling(cronExpression, true);
UUID uuid = scheduleTest(scheduling, new Long(45 * 1000));
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
smartExecutorScheduler.stop(uuid);
startTime = Calendar.getInstance().getTimeInMillis();
endTime = startTime;
while(endTime <= (startTime + 20 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState[] expectedStates = new PluginState[]{
PluginState.DONE,
PluginState.DISCARDED,
PluginState.DISCARDED,
PluginState.STOPPED
};
for(int i=0; i<expectedStates.length; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i+1);
Assert.assertEquals(expectedStates[i], pluginState);
}
try{
PluginState pluginState = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.DISCARDED, pluginState);
}catch(JdbcSQLException e){
// OK
}
}
@Test(expected=JdbcSQLException.class)
public void cronExpMaxtimes() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Scheduling scheduling = new Scheduling(cronExpression, 3);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 1.5 * 60 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
for(int i=0; i<3; i++){
PluginState pluginState = pc.getPluginInstanceState(uuid, i+1);
Assert.assertEquals(PluginState.DONE, pluginState);
}
pc.getPluginInstanceState(uuid, 4);
}
@Test(expected=JdbcSQLException.class)
public void cronExpTimeLimits() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Calendar firstStartTime = Calendar.getInstance();
Calendar stopTime = Calendar.getInstance();
stopTime.setTimeInMillis(firstStartTime.getTimeInMillis());
stopTime.add(Calendar.SECOND, 20);
Scheduling scheduling = new Scheduling(cronExpression, 0, firstStartTime, stopTime);
logger.debug("Scheduling : {}", scheduling);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = startTime;
while(endTime <= (startTime + 30 * 1000)){
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState pluginState = pc.getPluginInstanceState(uuid, 1);
Assert.assertEquals(PluginState.DONE, pluginState);
pc.getPluginInstanceState(uuid, 2);
}
}