Refs #5109: Provide REST interface for Smart Executor

Task-Url: https://support.d4science.org/issues/5109

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@162071 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2018-01-10 16:33:26 +00:00
parent 679f37cd3a
commit 549edd6a30
16 changed files with 350 additions and 175 deletions

View File

@ -1,20 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<listener>
<listener-class>com.sun.xml.ws.transport.http.servlet.WSServletContextListener</listener-class>
</listener>
<servlet>
<servlet-name>smart-executor</servlet-name>
<servlet-class>com.sun.xml.ws.transport.http.servlet.WSServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>smart-executor</servlet-name>
<url-pattern>/gcube/vremanagement/smart-executor</url-pattern>
</servlet-mapping>
version="3.0">
<listener>
<listener-class>com.sun.xml.ws.transport.http.servlet.WSServletContextListener</listener-class>
</listener>
<servlet>
<servlet-name>smart-executor</servlet-name>
<servlet-class>com.sun.xml.ws.transport.http.servlet.WSServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>smart-executor</servlet-name>
<url-pattern>/gcube/vremanagement/smart-executor</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>org.gcube.vremanagement.executor.ResourceInitializer</servlet-name>
</servlet>
<servlet-mapping>
<servlet-name>org.gcube.vremanagement.executor.ResourceInitializer</servlet-name>
<url-pattern>/rest/*</url-pattern>
</servlet-mapping>
</web-app>

22
pom.xml
View File

@ -107,7 +107,7 @@
<dependency>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-client</artifactId>
<version>[1.4.0-SNAPSHOT,2.0.0-SNAPSHOT]</version>
<version>[1.6.0-SNAPSHOT,2.0.0-SNAPSHOT]</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@ -142,18 +142,28 @@
<dependency>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-api</artifactId>
<version>[1.4.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
<version>[1.8.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
<!--
<!-- Jersey -->
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>2.13</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>[3.0.1, 4.0.0)</version>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
-->
<!-- END Jersey -->
<dependency>
<groupId>com.sun.xml.ws</groupId>
<artifactId>jaxws-rt</artifactId>

View File

@ -0,0 +1,25 @@
package org.gcube.vremanagement.executor;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.MediaType;
import org.gcube.smartgears.annotations.ManagedBy;
import org.gcube.vremanagement.executor.api.rest.RestConstants;
import org.gcube.vremanagement.executor.rest.RestSmartExecutor;
import org.glassfish.jersey.server.ResourceConfig;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@ApplicationPath(RestConstants.REST_PATH_PART)
// UnComment this when the SOAP API will be dismissed
@ManagedBy(SmartExecutorInitializator.class)
public class ResourceInitializer extends ResourceConfig {
public static final String APPLICATION_JSON_CHARSET_UTF_8 = MediaType.APPLICATION_JSON + ";charset=UTF-8";
public ResourceInitializer() {
packages(RestSmartExecutor.class.getPackage().toString());
}
}

View File

@ -4,7 +4,6 @@ import java.util.UUID;
import javax.jws.WebService;
import org.gcube.smartgears.annotations.ManagedBy;
import org.gcube.vremanagement.executor.api.SmartExecutor;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.ExecutorException;
@ -33,7 +32,8 @@ portName = "SmartExecutorPort",
serviceName = SmartExecutor.WEB_SERVICE_SERVICE_NAME,
targetNamespace = SmartExecutor.TARGET_NAMESPACE,
endpointInterface = "org.gcube.vremanagement.executor.api.SmartExecutor" )
@ManagedBy(SmartExecutorInitializator.class)
//@ManagedBy(SmartExecutorInitializator.class)
@Deprecated
public class SmartExecutorImpl implements SmartExecutor {
/**
@ -101,6 +101,8 @@ public class SmartExecutorImpl implements SmartExecutor {
} catch(SchedulePersistenceException e){
// currentStopped = true;
logger.error("Error removing scheduled task from persistence.", e);
} catch (ExecutorException e) {
throw e;
} catch (Exception e) {
// currentStopped = false;
logger.error("Error unscheduling task {}", executionIdentifier, e);
@ -124,11 +126,13 @@ public class SmartExecutorImpl implements SmartExecutor {
logger.info("getStateEvolution() requested for {}", executionIdentifier);
try {
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
PluginStateEvolution pluginStateEvolution = persistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier));
PluginStateEvolution pluginStateEvolution = persistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), null);
logger.info("getState() for {} is : {}", executionIdentifier, pluginStateEvolution);
return pluginStateEvolution;
} catch (ExecutorException e) {
throw e;
} catch (Exception e) {
throw new PluginInstanceNotFoundException(e);
throw new ExecutorException(e);
}
}
@ -150,10 +154,11 @@ public class SmartExecutorImpl implements SmartExecutor {
PluginStateEvolution pluginStateEvolution = persistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber);
logger.info("getIterationState() for {} (iteration n. {}) is : {}", executionIdentifier, iterationNumber, pluginStateEvolution);
return pluginStateEvolution;
} catch (ExecutorException e) {
throw e;
} catch (Exception e) {
throw new PluginInstanceNotFoundException(e);
throw new ExecutorException(e);
}
}
}

View File

@ -31,7 +31,7 @@ import org.gcube.smartgears.ApplicationManager;
import org.gcube.smartgears.ContextProvider;
import org.gcube.smartgears.configuration.container.ContainerConfiguration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
@ -345,7 +345,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
}
for(final ScheduledTask scheduledTask : scheduledTasks){
final ObjectMapper mapper = ObjectMapperManager.getObjectMapper();
final ObjectMapper mapper = SEMapper.getObjectMapper();
String taskAsString = mapper.writeValueAsString(scheduledTask);

View File

@ -1,33 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class PluginStateNotRetrievedException extends Exception {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = -5828105692475375250L;
public PluginStateNotRetrievedException() {
super();
}
public PluginStateNotRetrievedException(String message) {
super(message);
}
public PluginStateNotRetrievedException(Throwable throwable){
super(throwable);
}
public PluginStateNotRetrievedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -8,15 +8,9 @@ import org.gcube.common.authorization.library.provider.ContainerInfo;
import org.gcube.common.authorization.library.provider.ExternalServiceInfo;
import org.gcube.common.authorization.library.provider.ServiceInfo;
import org.gcube.common.authorization.library.provider.UserInfo;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
@ -25,35 +19,23 @@ import com.fasterxml.jackson.databind.ObjectMapper;
*/
public class ObjectMapperManager {
protected static final ObjectMapper mapper;
static{
mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerSubtypes(ScheduledTask.class);
mapper.registerSubtypes(RunOn.class);
mapper.registerSubtypes(Ref.class);
mapper.registerSubtypes(PluginDeclaration.class);
mapper.registerSubtypes(PluginStateEvolution.class);
mapper.addMixIn(ClientInfo.class, ClientInfoMixIn.class);
//mapper.registerSubtypes(ClientInfo.class);
mapper.registerSubtypes(UserInfo.class);
mapper.registerSubtypes(ServiceInfo.class);
mapper.registerSubtypes(ExternalServiceInfo.class);
mapper.registerSubtypes(ContainerInfo.class);
SEMapper.getObjectMapper().registerSubtypes(ScheduledTask.class);
SEMapper.getObjectMapper().addMixIn(ClientInfo.class, ClientInfoMixIn.class);
SEMapper.getObjectMapper().registerSubtypes(UserInfo.class);
SEMapper.getObjectMapper().registerSubtypes(ServiceInfo.class);
SEMapper.getObjectMapper().registerSubtypes(ExternalServiceInfo.class);
SEMapper.getObjectMapper().registerSubtypes(ContainerInfo.class);
}
public static ObjectMapper getObjectMapper() {
return mapper;
return SEMapper.getObjectMapper();
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=SEMapper.CLASS_PROPERTY)
class ClientInfoMixIn {
}

View File

@ -9,11 +9,12 @@ import java.util.UUID;
import org.gcube.common.clients.exceptions.DiscoveryException;
import org.gcube.common.resources.gcore.HostingNode;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.api.rest.SmartExecutor;
import org.gcube.vremanagement.executor.client.SmartExecutorClientFactory;
import org.gcube.vremanagement.executor.client.query.filter.impl.SpecificGCoreEndpointQueryFilter;
import org.gcube.vremanagement.executor.exception.ExecutorException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
@ -47,23 +48,13 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
* Retrieve the status of the iterationNumber (passed as parameter) of a running/run {@link Plugin} which is/was identified
* by the UUID passed as parameter
* @param uuid the execution identifier of the running/run {@link Plugin}
* @param iterationNumber the
* @param iterationNumber the iterationNumber (null to get the last)
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) throws Exception;
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException;
/**
* Retrieve the status of the iterationNumber of the last running/run {@link Plugin} which is/was identified
* by the UUID passed as parameter
* @param uuid the execution identifier of the running/run {@link Plugin}
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception;
protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception {
protected boolean isOrphan(ScheduledTask scheduledTask) throws ExecutorException {
try {
UUID uuid = scheduledTask.getUUID();
@ -85,37 +76,38 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
String address = runOn.getEService().getAddress();
SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter(
address);
SpecificGCoreEndpointQueryFilter specificGCoreEndpointDiscoveryFilter = new SpecificGCoreEndpointQueryFilter(address);
String pluginName = scheduledTask.getLaunchParameter()
.getPluginName();
try {
SmartExecutorProxy proxy = ExecutorPlugin
.getExecutorProxy(pluginName, null, null,
specificEndpointDiscoveryFilter).build();
proxy.getStateEvolution(uuid.toString());
logger.trace("{} is not orphan.", ObjectMapperManager
SmartExecutor smartExecutor = SmartExecutorClientFactory.create(pluginName, null, null, specificGCoreEndpointDiscoveryFilter);
smartExecutor.getPluginStateEvolution(uuid, null);
logger.trace("{} is not orphan.", SEMapper
.getObjectMapper().writeValueAsString(scheduledTask));
return false;
} catch (DiscoveryException | ExecutorException e) {
// The instance was not found or the request failed.
// The scheduledTask is considered orphan
logger.trace("{} is considered orphan.", ObjectMapperManager
logger.trace("{} is considered orphan.", SEMapper
.getObjectMapper().writeValueAsString(scheduledTask), e);
return true;
} catch (Throwable e) {
// The scheduledTask is NOT considered orphan
logger.trace("{} is NOT considered orphan.", ObjectMapperManager
logger.trace("{} is NOT considered orphan.", SEMapper
.getObjectMapper().writeValueAsString(scheduledTask), e);
return false;
}
} catch (Exception e) {
String string = ObjectMapperManager.getObjectMapper()
.writeValueAsString(scheduledTask);
logger.error("Error while checking orphanity of " + string
+ ". Considering as not orphan.", e);
try {
String string = SEMapper.getObjectMapper()
.writeValueAsString(scheduledTask);
logger.error("Error while checking orphanity of " + string
+ ". Considering as not orphan.", e);
}catch (Exception ex) {
logger.error("", e, ex);
}
}
return false;

View File

@ -13,7 +13,8 @@ import java.util.UUID;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.ExecutorException;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration;
@ -24,7 +25,6 @@ import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.orientechnologies.orient.core.db.OPartitionedDatabasePool;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.impl.ODocument;
@ -40,8 +40,6 @@ public class OrientDBPersistenceConnector extends
private static final Logger logger = LoggerFactory
.getLogger(OrientDBPersistenceConnector.class);
protected final static int LAST = -1;
protected final String SCOPE = "scope";
protected final String UUID = "uuid";
protected final String ITERATION = "iteration";
@ -50,13 +48,11 @@ public class OrientDBPersistenceConnector extends
protected final String RUN_ON = "runOn";
protected OPartitionedDatabasePool oPartitionedDatabasePool;
protected ObjectMapper mapper;
public OrientDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration)
throws Exception {
super();
prepareConnection(configuration);
this.mapper = ObjectMapperManager.getObjectMapper();
}
protected void prepareConnection(
@ -71,18 +67,13 @@ public class OrientDBPersistenceConnector extends
username, password);
}
protected void prepareObjectMapper() {
this.mapper = new ObjectMapper();
}
@Override
public void close() throws Exception {
oPartitionedDatabasePool.close();
}
protected PluginStateEvolution getPluginStateEvolution(UUID uuid,
int iterationNumber) throws Exception {
public PluginStateEvolution getPluginInstanceState(UUID uuid,
Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
@ -92,7 +83,7 @@ public class OrientDBPersistenceConnector extends
params.put(SCOPE, SmartExecutorInitializator.getCurrentScope());
OSQLSynchQuery<ODocument> query = null;
if (iterationNumber != LAST) {
if (iterationNumber != null) {
query = new OSQLSynchQuery<ODocument>(
String.format(
"SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1",
@ -110,7 +101,7 @@ public class OrientDBPersistenceConnector extends
ODocument resDoc = null;
if (iterationNumber != LAST) {
if (iterationNumber != null) {
resDoc = result.get(0);
} else {
// TODO manage better
@ -125,29 +116,17 @@ public class OrientDBPersistenceConnector extends
}
String json = resDoc.toJSON("class");
PluginStateEvolution pluginStateEvolution = mapper.readValue(json,
PluginStateEvolution pluginStateEvolution = ObjectMapperManager.getObjectMapper().readValue(json,
PluginStateEvolution.class);
return pluginStateEvolution;
} catch (Exception e) {
throw new PluginStateNotRetrievedException(e);
throw new PluginInstanceNotFoundException();
} finally {
db.close();
}
}
@Override
public PluginStateEvolution getPluginInstanceState(UUID uuid,
int iterationNumber) throws Exception {
return getPluginStateEvolution(uuid, iterationNumber);
}
@Override
public PluginStateEvolution getLastPluginInstanceState(UUID uuid)
throws Exception {
return getPluginStateEvolution(uuid, LAST);
}
@Override
public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution,
Exception exception) throws Exception {
@ -157,7 +136,7 @@ public class OrientDBPersistenceConnector extends
ODocument doc = new ODocument(
PluginStateEvolution.class.getSimpleName());
String json = mapper.writeValueAsString(pluginStateEvolution);
String json = ObjectMapperManager.getObjectMapper().writeValueAsString(pluginStateEvolution);
doc.fromJSON(json);
doc.field(SCOPE, SmartExecutorInitializator.getCurrentScope());
@ -187,7 +166,7 @@ public class OrientDBPersistenceConnector extends
long timestamp = Calendar.getInstance().getTimeInMillis();
doc.field(TIMESTAMP, timestamp);
String json = mapper.writeValueAsString(scheduledTask);
String json = ObjectMapperManager.getObjectMapper().writeValueAsString(scheduledTask);
doc.fromJSON(json);
doc.save();
@ -244,7 +223,7 @@ public class OrientDBPersistenceConnector extends
for (ODocument doc : result) {
String json = doc.toJSON("class");
ScheduledTask scheduledTask = mapper.readValue(json,
ScheduledTask scheduledTask = ObjectMapperManager.getObjectMapper().readValue(json,
ScheduledTask.class);
try {
if (isOrphan(scheduledTask)) {
@ -309,7 +288,7 @@ public class OrientDBPersistenceConnector extends
db = oPartitionedDatabasePool.acquire();
ODocument doc = getScheduledTaskDocument(db, uuid);
String json = doc.toJSON("class");
return mapper.readValue(json, ScheduledTask.class);
return ObjectMapperManager.getObjectMapper().readValue(json, ScheduledTask.class);
} catch (Exception e) {
throw new SchedulePersistenceException(e);
} finally {

View File

@ -0,0 +1,43 @@
package org.gcube.vremanagement.executor.rest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.gcube.vremanagement.executor.exception.ExecutorException;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SmartExecutorExceptionMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@Provider
public class ExecutorExceptionMapper implements ExceptionMapper<ExecutorException> {
@Override
public Response toResponse(ExecutorException exception) {
Status status = Status.BAD_REQUEST;
if(PluginInstanceNotFoundException.class.isAssignableFrom(exception.getClass()) || PluginNotFoundException.class.isAssignableFrom(exception.getClass())) {
status = Status.NOT_FOUND;
} else if(exception.getClass() == ExecutorException.class) {
status = Status.INTERNAL_SERVER_ERROR;
}
try {
String entity = SmartExecutorExceptionMapper.marshal(exception);
MediaType mediaType = MediaType.APPLICATION_JSON_TYPE;
return Response.status(status).entity(entity).type(mediaType).build();
} catch(Exception e) {
String entity = exception.getMessage();
MediaType mediaType = MediaType.TEXT_PLAIN_TYPE;
return Response.status(status).entity(entity).type(mediaType).build();
}
}
}

View File

@ -0,0 +1,163 @@
package org.gcube.vremanagement.executor.rest;
import java.util.UUID;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import org.gcube.vremanagement.executor.ResourceInitializer;
import org.gcube.vremanagement.executor.api.rest.RestConstants;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.ExecutorException;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.InvalidInputsException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
@Path(RestConstants.PLUGINS_PATH_PART)
public class RestSmartExecutor {
private static Logger logger = LoggerFactory.getLogger(RestSmartExecutor.class);
private static final String UUID_PATH_PARAM = "uuid";
private static final String PLUGIN_NAME_PATH_PARAM = "pluginName";
@POST
@Path("/{" + PLUGIN_NAME_PATH_PARAM + "}")
@Consumes({MediaType.TEXT_PLAIN, ResourceInitializer.APPLICATION_JSON_CHARSET_UTF_8})
@Produces(MediaType.TEXT_PLAIN)
public String launch(@PathParam(PLUGIN_NAME_PATH_PARAM) String pluginName, String launchParameterString)
throws ExecutorException {
try {
logger.info("Requested to launch {} ({})", pluginName, launchParameterString);
LaunchParameter launchParameter = SEMapper.unmarshal(LaunchParameter.class, launchParameterString);
if(pluginName == null) {
String error = String.format("Plugin Name provided in the URL (%s) cannot be null", pluginName);
logger.error(error);
throw new InputsNullException(error);
}
if(pluginName.compareTo(launchParameter.getPluginName()) != 0) {
String error = String.format(
"Plugin Name provided in the URL (%s) does not match with the one provided in %s (%s)",
pluginName, LaunchParameter.class.getSimpleName(), launchParameter.getPluginName());
logger.error(error);
throw new InvalidInputsException(error);
}
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler();
UUID uuid = smartExecutorScheduler.schedule(launchParameter, null);
logger.info("{} ({}) has been lauched with uuid {}", pluginName, launchParameterString, uuid);
return uuid.toString();
} catch(ExecutorException e) {
throw e;
} catch(Exception e) {
throw new ExecutorException(e);
}
}
@GET
@Path("/{" + PLUGIN_NAME_PATH_PARAM + "}" + "/" + "{" + UUID_PATH_PARAM + "}")
@Produces(ResourceInitializer.APPLICATION_JSON_CHARSET_UTF_8)
public String getPluginStateEvolution(@PathParam(PLUGIN_NAME_PATH_PARAM) String pluginName,
@PathParam(UUID_PATH_PARAM) String executionIdentifier,
@QueryParam(RestConstants.ITERATION_NUMBER_PARAM) Integer iterationNumber) throws ExecutorException {
PluginStateEvolution pluginStateEvolution = null;
try {
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
pluginStateEvolution = persistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber);
logger.info("{} for {} (iteration n. {}) is {}", PluginStateEvolution.class.getSimpleName(), executionIdentifier, iterationNumber, pluginStateEvolution);
} catch(ExecutorException e) {
throw e;
} catch(Exception e) {
throw new ExecutorException(e);
}
if(pluginName.compareTo(pluginStateEvolution.getPluginDeclaration().getName()) != 0) {
String error = String.format(
"Plugin Name provided in the URL (%s) does not match with the one got from %s (%s)", pluginName,
PluginStateEvolution.class.getSimpleName(), pluginStateEvolution.getPluginDeclaration().getName());
throw new InvalidInputsException(error);
}
try {
return SEMapper.marshal(pluginStateEvolution);
} catch(JsonProcessingException e) {
throw new ExecutorException(e);
}
}
@DELETE
@Path("/{" + PLUGIN_NAME_PATH_PARAM + "}" + "/" + "{" + UUID_PATH_PARAM + "}")
public boolean delete(@PathParam(PLUGIN_NAME_PATH_PARAM) String pluginName,
@PathParam(UUID_PATH_PARAM) String executionIdentifier,
@QueryParam(RestConstants.GLOBALLY_PARAM) Boolean globally) throws ExecutorException {
try {
if(globally == null) {
globally = false;
}
logger.info("Requested to delete for {} with UUID {} {}", pluginName, executionIdentifier,
globally ? RestConstants.GLOBALLY_PARAM : "");
boolean currentStopped = true;
try {
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler();
UUID uuid = UUID.fromString(executionIdentifier);
smartExecutorScheduler.stop(uuid, globally);
} catch (SchedulerNotFoundException e) {
// currentStopped = true;
logger.error("Error unscheduling task {}", executionIdentifier, e);
throw new ExecutorException(e);
} catch(SchedulerException e){
// currentStopped = false;
logger.error("Error unscheduling task {}", executionIdentifier, e);
throw new ExecutorException(e);
} catch(SchedulePersistenceException e){
// currentStopped = true;
logger.error("Error removing scheduled task from persistence.", e);
} catch (ExecutorException e) {
throw e;
} catch (Exception e) {
// currentStopped = false;
logger.error("Error unscheduling task {}", executionIdentifier, e);
throw new ExecutorException(e);
}
logger.info("{} with UUID {} was{} stopped successfully", pluginName, executionIdentifier, currentStopped? "" : " NOT");
return currentStopped;
} catch(ExecutorException e) {
throw e;
} catch(Exception e) {
throw new ExecutorException(e);
}
}
@GET
@Path(RestConstants.SCHEDULED_PATH_PART)
@Produces(ResourceInitializer.APPLICATION_JSON_CHARSET_UTF_8)
public String all(@QueryParam(RestConstants.GLOBALLY_PARAM) Boolean globally) throws ExecutorException {
return "[]";
}
}

View File

@ -15,7 +15,7 @@ import org.gcube.smartgears.Constants;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
@ -25,12 +25,13 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=SEMapper.CLASS_PROPERTY)
public class ScheduledTask {
public static final String LAUNCH_PARAMETER = "launchParameter";
protected UUID uuid;
@JsonProperty(value=LAUNCH_PARAMETER)
protected LaunchParameter launchParameter;

View File

@ -20,7 +20,7 @@ import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
@ -122,7 +122,7 @@ public class SmartExecutorScheduler {
if (scheduling != null) {
try {
logger.info("Going to schedule Taks with UUID {} with the following {} : {}", uuid, LaunchParameter.class.getSimpleName(), ObjectMapperManager.getObjectMapper().writeValueAsString(parameter));
logger.info("Going to schedule Taks with UUID {} with the following {} : {}", uuid, LaunchParameter.class.getSimpleName(), SEMapper.getObjectMapper().writeValueAsString(parameter));
} catch (Exception e) {
}
@ -158,7 +158,7 @@ public class SmartExecutorScheduler {
} else {
try {
logger.info("Starting Taks with UUID {} immediately with the following {} : {}", uuid, LaunchParameter.class.getSimpleName(), ObjectMapperManager.getObjectMapper().writeValueAsString(parameter));
logger.info("Starting Taks with UUID {} immediately with the following {} : {}", uuid, LaunchParameter.class.getSimpleName(), SEMapper.getObjectMapper().writeValueAsString(parameter));
} catch (Exception e) {
}

View File

@ -13,7 +13,7 @@ import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.json.SEMapper;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.Ref;
@ -75,7 +75,7 @@ public class SerializationTest extends ScopedTest {
logger.debug("{} to be Marshalled : {}", scheduledTask.getClass().getSimpleName(), launchParameter);
ObjectMapper mapper = ObjectMapperManager.getObjectMapper();
ObjectMapper mapper = SEMapper.getObjectMapper();
String scheduledTaskJSONString = mapper.writeValueAsString(scheduledTask);
logger.debug("Marshalled : {}", scheduledTaskJSONString);

View File

@ -60,7 +60,7 @@ public class SmartExecutorPersistenceConnectorTest extends ScopedTest {
Assert.assertEquals(states[i], ps);
}
PluginStateEvolution pse = persistenceConnector.getLastPluginInstanceState(uuid);
PluginStateEvolution pse = persistenceConnector.getPluginInstanceState(uuid, null);
PluginState ps = pse.getPluginState();
Assert.assertEquals(states[states.length-1], ps);

View File

@ -13,7 +13,7 @@ import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.ScopedTest;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
@ -94,7 +94,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
while (endTime <= (startTime + 12000)) {
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid);
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, null);
Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState());
}
@ -118,9 +118,9 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
try {
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState());
} catch (PluginStateNotRetrievedException e) {
} catch (PluginInstanceNotFoundException e) {
// OK
logger.error("PluginStateNotRetrievedException this can be acceptable in some tests", e);
logger.error("PluginInstanceNotFoundException this can be acceptable in some tests", e);
}
}
@ -144,7 +144,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid);
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, null);
Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState());
}
@ -159,7 +159,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
}
smartExecutorScheduler.stop(uuid, false);
PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(uuid);
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, null);
Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState());
}
@ -183,10 +183,10 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginStateEvolution pluginStateEvolution = pc.getLastPluginInstanceState(first);
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(first, null);
Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState());
pluginStateEvolution = pc.getLastPluginInstanceState(second);
pluginStateEvolution = pc.getPluginInstanceState(second, null);
Assert.assertEquals(PluginState.DONE, pluginStateEvolution.getPluginState());
}
@ -284,7 +284,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
}
@Test(expected = PluginStateNotRetrievedException.class)
@Test(expected = PluginInstanceNotFoundException.class)
public void delayedExpMaxtimes() throws Exception {
Scheduling scheduling = new Scheduling(20, 3);
UUID uuid = scheduleTest(scheduling, new Long(10 * 1000));
@ -304,7 +304,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
}
@Test(expected = PluginStateNotRetrievedException.class)
@Test(expected = PluginInstanceNotFoundException.class)
public void delayedExpTimeLimits() throws Exception {
Calendar firstStartTime = Calendar.getInstance();
@ -360,7 +360,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
try {
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState());
} catch (PluginStateNotRetrievedException e) {
} catch (PluginInstanceNotFoundException e) {
// OK
}
}
@ -396,7 +396,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
try {
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.STOPPED, pluginStateEvolution.getPluginState());
} catch (PluginStateNotRetrievedException e) {
} catch (PluginInstanceNotFoundException e) {
// OK
}
@ -433,12 +433,12 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
try {
PluginStateEvolution pluginStateEvolution = pc.getPluginInstanceState(uuid, 5);
Assert.assertEquals(PluginState.DISCARDED, pluginStateEvolution.getPluginState());
} catch (PluginStateNotRetrievedException e) {
} catch (PluginInstanceNotFoundException e) {
// OK
}
}
@Test(expected = PluginStateNotRetrievedException.class)
@Test(expected = PluginInstanceNotFoundException.class)
public void cronExpMaxtimes() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Scheduling scheduling = new Scheduling(cronExpression, 3);
@ -459,7 +459,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
}
@Test(expected = PluginStateNotRetrievedException.class)
@Test(expected = PluginInstanceNotFoundException.class)
public void cronExpTimeLimits() throws Exception {
CronExpression cronExpression = new CronExpression("0/20 * * ? * *");
Calendar firstStartTime = Calendar.getInstance();