refs #111: Add Recurrent and scheduled Task support

https://support.d4science.org/issues/111
Merging from private branch

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor-api@117513 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-07-28 13:41:49 +00:00
parent 6dbb7a7da0
commit a87b7d4e03
12 changed files with 459 additions and 48 deletions

22
pom.xml
View File

@ -9,7 +9,7 @@
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<description>Smart Executor Service API Library</description>
<properties>
@ -39,6 +39,26 @@
<artifactId>xstream</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<!-- Test Dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -60,4 +60,16 @@ public interface SmartExecutor {
public PluginState getState(String executionIdentifier)
throws PluginInstanceNotFoundException, ExecutorException;
/**
* The method use the provided UUID as String and the iteration number
* to retrieve the status of the associated execution
* @param executionIdentifier UUID as String which identify the execution
* @param iterationNumber iteration number
* @return {@link PluginState} which contains the state of the execution
* @throws Exception if there is no execution identified by the provided
* UUID execution identifier as String
*/
@SOAPBinding(parameterStyle=ParameterStyle.WRAPPED)
public PluginState getIterationState(String executionIdentifier, int iterationNumber)
throws PluginInstanceNotFoundException, ExecutorException;
}

View File

@ -15,7 +15,6 @@ import org.gcube.vremanagement.executor.api.types.adapter.MapAdapter;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
@XmlRootElement()
@XmlAccessorType(XmlAccessType.FIELD)
@ -26,6 +25,9 @@ public class LaunchParameter {
@XmlJavaTypeAdapter(MapAdapter.class)
private Map<String, Object> inputs;
private Scheduling scheduling;
@SuppressWarnings("unused")
private LaunchParameter(){}
@ -39,6 +41,17 @@ public class LaunchParameter {
this.inputs = inputs;
}
/**
* @param name
* @param inputs
* @param scheduling
*/
public LaunchParameter(String name, Map<String, Object> inputs, Scheduling scheduling) {
this.name = name;
this.inputs = inputs;
this.scheduling = scheduling;
}
/**
* @return the name
*/
@ -53,4 +66,25 @@ public class LaunchParameter {
return inputs;
}
/**
* @return the scheduling
*/
public Scheduling getScheduling() {
return scheduling;
}
/**
* @param scheduling the scheduling
*/
public void setScheduling(Scheduling scheduling) {
this.scheduling = scheduling;
}
@Override
public String toString(){
return String.format("%s : { Scheduling : %s, Inputs : %s}",
this.getClass().getSimpleName(), scheduling, inputs);
}
}

View File

@ -0,0 +1,175 @@
/**
*
*/
package org.gcube.vremanagement.executor.api.types;
import java.util.Calendar;
import javax.xml.bind.annotation.XmlElement;
import org.quartz.CronExpression;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class Scheduling {
@XmlElement
/**
* CRON like expression for a repetitive task.
* This field is not valid when using delay
*/
private String cronExpression;
@XmlElement
/**
* Delay between subsequent execution in seconds.
* This field is not valid when using cronExpression
*/
private Integer delay;
@XmlElement
/**
* Indicates the number of times the scheduling pattern must be applied.
* 0 means indefinitely.
*/
private int schedulingTimes;
@XmlElement
/**
* When using cronExpression run the subsequent task only if the previous
* are terminated otherwise this execution is discarded and the subsequent
* execution will start when fired by the the next scheduling.
* The discarded execution is counted in the total number of executions
* happened.
*/
private boolean previuosExecutionsMustBeCompleted;
@XmlElement
/**
* The first instant when the scheduling can start
*/
private Long firstStartTime; // O or null means immediately
/**
* Time at which the Trigger will no longer fire even if it's schedule
* has remaining repeats.
*/
@XmlElement
private Long endTime; // O or null means never
private void init(CronExpression cronExpression, Integer delay, int schedulingTimes, Long firstStartTime, Long endTime, boolean previuosExecutionsMustBeCompleted){
if(cronExpression!=null){
this.cronExpression = cronExpression.getCronExpression();
}else{
this.cronExpression = null;
}
this.delay = delay;
this.schedulingTimes = schedulingTimes;
this.firstStartTime = firstStartTime;
this.endTime = endTime;
this.previuosExecutionsMustBeCompleted = previuosExecutionsMustBeCompleted;
}
public Scheduling(CronExpression cronExpression) {
init(cronExpression, null, 0, null, null, false);
}
public Scheduling(CronExpression cronExpression, boolean previuosExecutionsMustBeCompleted) {
init(cronExpression, null, 0, null, null, previuosExecutionsMustBeCompleted);
}
public Scheduling(CronExpression cronExpression, int schedulingTimes) {
init(cronExpression, null, schedulingTimes, null, null, false);
}
public Scheduling(CronExpression cronExpression, int schedulingTimes, boolean previuosExecutionsMustBeCompleted ) {
init(cronExpression, null, schedulingTimes, null, null, previuosExecutionsMustBeCompleted);
}
public Scheduling(CronExpression cronExpression, int schedulingTimes, Calendar firstStartTime, Calendar endTime) {
init(cronExpression, null, schedulingTimes, firstStartTime.getTimeInMillis(), endTime.getTimeInMillis(), false);
}
public Scheduling(CronExpression cronExpression, int schedulingTimes, Calendar firstStartTime, Calendar endTime, boolean previuosExecutionsMustBeCompleted) {
init(cronExpression, null, schedulingTimes, firstStartTime.getTimeInMillis(), endTime.getTimeInMillis(), previuosExecutionsMustBeCompleted);
}
public Scheduling(int delay) {
init(null, delay, 0, null, null, false);
}
public Scheduling(int delay, boolean previuosExecutionsMustBeCompleted) {
init(null, delay, 0, null, null, previuosExecutionsMustBeCompleted);
}
public Scheduling(int delay, int schedulingTimes) {
init(null, delay, schedulingTimes, null, null, false);
}
public Scheduling(int delay, int schedulingTimes, boolean previuosExecutionsMustBeCompleted ) {
init(null, delay, schedulingTimes, null, null, previuosExecutionsMustBeCompleted);
}
public Scheduling(int delay, int schedulingTimes, Calendar firstStartTime, Calendar endTime) {
init(null, delay, schedulingTimes, firstStartTime.getTimeInMillis(), endTime.getTimeInMillis(), false);
}
public Scheduling(int delay, int schedulingTimes, Calendar firstStartTime, Calendar endTime, boolean previuosExecutionsMustBeCompleted) {
init(null, delay, schedulingTimes, firstStartTime.getTimeInMillis(), endTime.getTimeInMillis(), previuosExecutionsMustBeCompleted);
}
/**
* @return the cronExpression
*/
public String getCronExpression() {
return cronExpression;
}
/**
* @return the delay
*/
public Integer getDelay() {
return delay;
}
/**
* @return the schedulingTimes
*/
public int getSchedulingTimes() {
return schedulingTimes;
}
/**
* @return the previuosExecutionMustBeCompleted
*/
public boolean mustPreviousExecutionsCompleted() {
return previuosExecutionsMustBeCompleted;
}
/**
* @return the firtStartTime
*/
public Long getFirtStartTime() {
return firstStartTime;
}
/**
* @return the endTime
*/
public Long getEndTime() {
return endTime;
}
public String toString(){
return String.format("CronExpression %s, Delay %d, SchedulingTimes %d, FirstStartTime %d, EndTime %d, PreviuosExecutionsMustBeCompleted %b",
cronExpression, delay, schedulingTimes, firstStartTime, endTime, previuosExecutionsMustBeCompleted);
}
}

View File

@ -0,0 +1,60 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
import javax.xml.ws.WebFault;
import org.gcube.vremanagement.executor.exception.beans.ExceptionBean;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
@WebFault
public class InvalidInputsException extends ExecutorException {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = 7578814354528161119L;
private static final String DEFAULT_MESSAGE = "Inputs cannot be null. Use an Empty Map instead.";
public InvalidInputsException(){
super(DEFAULT_MESSAGE);
this.faultInfo = new ExceptionBean(DEFAULT_MESSAGE);
}
public InvalidInputsException(String message) {
super(message);
}
public InvalidInputsException(Throwable cause) {
this(DEFAULT_MESSAGE, cause);
}
public InvalidInputsException(ExceptionBean faultInfo){
super(faultInfo);
}
public InvalidInputsException(String message, Throwable cause){
super(message, cause);
}
public InvalidInputsException(String message, ExceptionBean faultInfo){
super(message, faultInfo);
}
public InvalidInputsException(String message, ExceptionBean faultInfo, Throwable cause){
super(message, faultInfo, cause);
}
@Override
public ExceptionBean getFaultInfo(){
return faultInfo;
}
}

View File

@ -10,6 +10,7 @@ import org.gcube.vremanagement.executor.plugin.PluginState;
* This class is used to persist the execution state
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
@Deprecated
public abstract class Persistence<P extends PersistenceConnector> {
protected final String name;
@ -24,6 +25,7 @@ public abstract class Persistence<P extends PersistenceConnector> {
* running {@link Plugin}
* @param uuid the execution identifier
*/
@Deprecated
public Persistence(P persistenceConnector, String name, UUID uuid){
this.name = name;
this.persistenceConnector = persistenceConnector;
@ -37,6 +39,7 @@ public abstract class Persistence<P extends PersistenceConnector> {
* @param pluginState the {@link PluginState} value
* @throws Exception if fails
*/
@Deprecated
public abstract void addEvolution(long timestamp, PluginState pluginState)
throws Exception;
@ -45,8 +48,9 @@ public abstract class Persistence<P extends PersistenceConnector> {
* @return the actual (or the last temporal) {@link PluginState} value
* @throws Exception if fails to retrieve the {@link PluginState} from DB
*/
@Deprecated
public PluginState getState() throws Exception {
return persistenceConnector.getPluginInstanceState(uuid);
return persistenceConnector.getLastPluginInstanceState(uuid);
}
}

View File

@ -7,20 +7,38 @@ import java.util.UUID;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
/**
* Model the connector which create or open the connection to DB
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public abstract class PersistenceConnector {
public abstract class PersistenceConnector implements PluginStateNotification {
protected static PersistenceConnector persistenceConnector;
/**
* @return the persistenceConnector
*/
public static PersistenceConnector getPersistenceConnector() {
return persistenceConnector;
}
/**
* @param persistenceConnector the persistenceConnector to set
*/
public static void setPersistenceConnector(
PersistenceConnector persistenceConnector) {
PersistenceConnector.persistenceConnector = persistenceConnector;
}
/**
* Default Constructor
*/
public PersistenceConnector(){
}
/**
* This constructor is used to provide a location where creating persistence
@ -37,13 +55,35 @@ public abstract class PersistenceConnector {
public abstract void close() throws Exception;
/**
* Retrieve the status a running/run {@link Plugin} which is/was identified
* Retrieve the status of the iterationNumber (passed as parameter) of a running/run {@link Plugin} which is/was identified
* by the UUID passed as parameter
* @param uuid the execution identifier of the running/run {@link Plugin}
* @param iterationNumber the
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginState getPluginInstanceState(UUID uuid, int iterationNumber)
throws Exception;
/**
* Retrieve the status of the iterationNumber of the last running/run {@link Plugin} which is/was identified
* by the UUID passed as parameter
* @param uuid the execution identifier of the running/run {@link Plugin}
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginState getPluginInstanceState(UUID uuid)
public abstract PluginState getLastPluginInstanceState(UUID uuid)
throws Exception;
/**
* Persist the new state of plugin
* @param uuid the UUID which identify the current execution
* @param timestamp the time of the new {@link PluginState}
* @param pluginDeclaration the pluginDeclaration
* @param pluginState the {@link PluginState} value
* @throws Exception if fails
*/
public abstract void addEvolution(UUID uuid, int lauchedtimes,
long timestamp, String pluginName, PluginState pluginState)
throws Exception;
}

View File

@ -1,12 +1,9 @@
package org.gcube.vremanagement.executor.plugin;
import java.util.Date;
import java.util.Map;
import org.gcube.vremanagement.executor.persistence.Persistence;
import org.gcube.vremanagement.executor.persistence.PersistenceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This interface represent the contract for a plugin runnable by the executor.
@ -15,21 +12,15 @@ import org.slf4j.LoggerFactory;
*/
public abstract class Plugin<T extends PluginDeclaration> {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(Plugin.class);
/**
* Instance of the class which log the plugin state evolution
*/
private Persistence<? extends PersistenceConnector> persistence;
private T pluginDeclaration;
@Deprecated
public Plugin(T pluginDeclaration, Persistence<? extends PersistenceConnector> persistence){
this.pluginDeclaration = pluginDeclaration;
this.persistence = persistence;
}
public Plugin(T pluginDeclaration){
this.pluginDeclaration = pluginDeclaration;
}
/**
@ -39,21 +30,6 @@ public abstract class Plugin<T extends PluginDeclaration> {
return pluginDeclaration;
}
/**
* It is up to the plugin update the State of the Running Plugin using
* this facilities function.
* @param pluginState
* @throws Exception
*/
public void setState(PluginState pluginState) {
long timestamp = new Date().getTime();
try {
persistence.addEvolution(timestamp, pluginState);
} catch(Exception e) {
logger.error(String.format("Unable to persist State : %d,%s", timestamp, pluginState.name()));
}
}
/**
* Launch the plugin with the provided input.
* @param inputs
@ -68,21 +44,11 @@ public abstract class Plugin<T extends PluginDeclaration> {
protected abstract void onStop() throws Exception;
/**
* Stop the Plugin setting state to {@link PluginState#SUSPENDED}
* Invoke onStop() function to allow the plugin to safely stop the execution
* @throws Exception
*/
public void stop() throws Exception {
setState(PluginState.SUSPENDED);
onStop();
}
/**
* Used to retrieve the state of the execution
* @return the {@link PluginState}
* @throws Exception
*/
public PluginState getState() throws Exception {
return persistence.getState();
}
}

View File

@ -4,5 +4,45 @@ package org.gcube.vremanagement.executor.plugin;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public enum PluginState {
CREATED, RUNNING, SUSPENDED, DONE, FAILED
/**
* The Job is created
*/
CREATED(false),
/**
* The job is running
*/
RUNNING(false),
/**
* The job has been stopped
*/
STOPPED(true),
/**
* The job terminated successfully
*/
DONE(true),
/**
* The failed the execution
*/
FAILED(true),
/**
* the job has been discarded by the scheduler. this happen when the launch
* parameter require the previous completed for repetitive or recurrent jobs
*/
DISCARDED(true);
boolean finalState;
PluginState(boolean finalState){
this.finalState = finalState;
}
/**
* Return true when the state a is a final state and the job cannot move
* in any other state
* @return if is a Final State
*/
public boolean isFinalState() {
return finalState;
}
}

View File

@ -0,0 +1,18 @@
/**
*
*/
package org.gcube.vremanagement.executor.plugin;
import java.util.UUID;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface PluginStateNotification {
public void pluginStateEvolution(UUID uuid, int iteration,
long timestamp, String pluginName, PluginState pluginState)
throws Exception;
}

View File

@ -0,0 +1,26 @@
/**
*
*/
package org.gcube.vremanagement.executor.api.types;
import org.junit.Test;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class LaunchParameterTest {
public static final String EVERY_FIVE_MINUTE_PATTERN = "5 * * * *";
@Test
public void testSchedulingPattern(){
/*
SchedulingPattern schedulingPattern = new SchedulingPattern(EVERY_FIVE_MINUTE_PATTERN);
Scheduling scheduling = new Scheduling(schedulingPattern);
LaunchParameter launchParameter = new LaunchParameter("Test-Plugin", new HashMap<String, Object>(), scheduling);
Assert.assertTrue(EVERY_FIVE_MINUTE_PATTERN.compareTo(launchParameter.getScheduling().getSchedulingPattern().toString())==0);
Assert.assertTrue(launchParameter.getScheduling().getSchedulingTimes()==0);
*/
}
}

View File

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{0}: %msg%n</pattern>
</encoder>
</appender>
<logger name="org.gcube" level="TRACE" />
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>