Moved initialization funciton in a separated class

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@111761 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-02-06 09:35:18 +00:00
parent b801e9f155
commit 83468087e3
3 changed files with 292 additions and 240 deletions

View File

@ -1,30 +1,11 @@
package org.gcube.vremanagement.executor;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jws.WebService;
import javax.xml.bind.annotation.XmlRootElement;
import org.gcube.common.resources.gcore.Resource;
import org.gcube.common.resources.gcore.Resources;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
import org.gcube.common.resources.gcore.utils.Group;
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
import org.gcube.informationsystem.publisher.ScopedPublisher;
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
import org.gcube.smartgears.context.container.ContainerContext;
import org.gcube.smartgears.handlers.container.ContainerHandler;
import org.gcube.smartgears.handlers.container.ContainerLifecycleEvent;
import org.gcube.vremanagement.executor.api.Executor;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.ExecutorException;
@ -51,230 +32,13 @@ import org.slf4j.LoggerFactory;
serviceName = Executor.SERVICE_NAME,
targetNamespace = Executor.TNS,
endpointInterface = "org.gcube.vremanagement.executor.api.Executor" )
@XmlRootElement(name = "plugin-registration-handler")
public class ExecutorImpl extends ContainerHandler implements Executor {
public class ExecutorImpl implements Executor {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(ExecutorImpl.class);
/**
* Pool for thread execution
*/
private ExecutorService pool;
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
private Map<UUID,
PluginThread<Plugin<? extends PluginDeclaration>>> pluginInstances;
/**
* Contains the ServiceEnpoint Resource to be published/unpublished on IS
*/
private ServiceEndpoint serviceEndpoint;
/**
* Represent the connector to DB
*/
private JDBCPersistenceConnector jdbcPersistenceConnector;
private ContainerContext ctx;
/**
* Publish the provided resource on all Service Scopes retrieved from
* Context
* @param resource to be published
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void publishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to publish %s", stringWriter.toString()));
scopedPublisher.create(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not published", e);
throw e;
}
}
/**
* Remove the resource from IS
* @param resource to be unpublished
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void unPublishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to publish %s", stringWriter.toString()));
scopedPublisher.remove(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not unpublished", e);
throw e;
}
}
/**
* Create the Service Endpoint using information related to discovered
* available plugins and their own discoverd capabilities
* @return the created {@link ServiceEndpoint}
*/
protected static ServiceEndpoint createServiceEndpoint(){
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
Profile profile = serviceEndpoint.newProfile();
profile.category(Executor.SERVICE_NAME);
profile.name(Executor.class.getSimpleName());
Group<AccessPoint> accessPoints = profile.accessPoints();
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
for(String pluginName : availablePlugins.keySet()){
AccessPoint accessPointElement = new AccessPoint();
accessPointElement.name(pluginName);
Group<Property> properties = accessPointElement.properties();
Map<String, String> pluginCapablities = availablePlugins.get(pluginName).getSupportedCapabilities();
for(String capabilityName : pluginCapablities.keySet()){
Property propertyElement = new Property();
propertyElement.nameAndValue(capabilityName, pluginCapablities.get(capabilityName));
properties.add(propertyElement);
}
accessPoints.add(accessPointElement);
}
StringWriter stringWriter = new StringWriter();
Resources.marshal(serviceEndpoint, stringWriter);
logger.debug(String.format("The created ServiceEndpoint profile is \n%s", stringWriter.toString()));
return serviceEndpoint;
}
/** {@inheritDoc} */
/*
* The method discover the plugins available on classpath and their own
* supported capabilities and publish a ServiceEndpoint with the
* discovered information.
* Furthermore create/connect to DB
*/
@Override
public void onStart(ContainerLifecycleEvent.Start containerLifecycleEventStart) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Starting");
logger.debug("-------------------------------------------------------");
pool = Executors.newCachedThreadPool();
serviceEndpoint = ExecutorImpl.createServiceEndpoint();
pluginInstances = new HashMap<UUID, PluginThread<Plugin<? extends PluginDeclaration>>>();
ctx = containerLifecycleEventStart.context();
// TODO Before publishing the new Resource check if on IS there is
// an old published ServiceEndpoint resource that where not unpublished
// correctly
// TODO set task that are still on running state on DB to have a clear
// room
try {
ExecutorImpl.publishScopedResource(serviceEndpoint, ctx.configuration().startScopes());
} catch (RegistryNotFoundException e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
} catch (Exception e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
}
try {
jdbcPersistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location());
} catch (Exception e) {
logger.error("Unable to initialize PersistenceConnector. The Service will be aborted", e);
return;
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Started Successfully");
logger.debug("-------------------------------------------------------");
}
/** {@inheritDoc} */
/*
* This function is invoked before the service will stop and unpublish the
* resource from the IS to maintain the infrastructure integrity.
* Furthermore close the connection to DB.
*/
@Override
public void onStop(ContainerLifecycleEvent.Stop containerLifecycleEventStop) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Stoppin");
logger.debug("-------------------------------------------------------");
for(UUID uuid : pluginInstances.keySet()){
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
pluginInstances.get(uuid);
Plugin<? extends PluginDeclaration> pluginInstace =
pluginThread.getPlugin();
try {
logger.debug(String.format("Requesting Stop to plugin instance "
+ "identified by the UUID %s of Plugin named %s", uuid,
pluginInstace.getPluginDeclaration().getName()));
pluginInstace.stop();
logger.debug(String.format("Plugin instance identified by the"
+ "UUID %s of Plugin named %s stopped coorectly itself",
uuid, pluginInstace.getPluginDeclaration().getName()));
} catch (Exception e) {
logger.debug(String.format("Running plugin instance identified "
+ "by the UUID %s of Plugin named %s failed to request "
+ "of being stopped", uuid,
pluginInstace.getPluginDeclaration().getName()));
} finally {
pluginInstace.setState(PluginState.SUSPENDED);
pluginInstances.remove(uuid);
}
}
// Forcing shutdown of all threads
pool.shutdown();
try {
ExecutorImpl.unPublishScopedResource(serviceEndpoint, ctx.configuration().startScopes());
} catch (RegistryNotFoundException e) {
logger.error("Unable to unpublish Service Endpoint.", e);
} catch (Exception e) {
logger.error("Unable to unpublish Service Endpoint.", e);
}
try {
jdbcPersistenceConnector.close();
} catch (Exception e) {
logger.error("Unable to close Persistence", e);
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Stopped Successfully");
logger.debug("-------------------------------------------------------");
}
/**{@inheritDoc}*/
@Override
public String launch(LaunchParameter parameter) throws InputsNullException,
@ -312,6 +76,7 @@ public class ExecutorImpl extends ContainerHandler implements Executor {
throw new LaunchException();
}
JDBCPersistenceConnector jdbcPersistenceConnector = ExecutorInitalizator.getJdbcPersistenceConnector();
// Create and instance of DB connection used to persist plugin evolution
JDBCPersistence jdbcEvolutionPersistence = new JDBCPersistence(jdbcPersistenceConnector, name, executionIdentifier);
@ -331,10 +96,10 @@ public class ExecutorImpl extends ContainerHandler implements Executor {
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
new PluginThread<Plugin<? extends PluginDeclaration>>(instantiatedPlugin, inputs, executionIdentifier);
// Adding the thread to the pluginInstances
pluginInstances.put(executionIdentifier, pluginThread);
ExecutorInitalizator.getPluginInstances().put(executionIdentifier, pluginThread);
// Launching Thread from initially created pool
pool.execute(pluginThread);
ExecutorInitalizator.getPool().execute(pluginThread);
logger.debug(String.format("The Plugin named %s with UUID %s has been launched with the provided inputs", name, executionIdentifier));
@ -348,6 +113,7 @@ public class ExecutorImpl extends ContainerHandler implements Executor {
public PluginState getState(String executionIdentifier)
throws PluginInstanceNotFoundException, ExecutorException {
try {
JDBCPersistenceConnector jdbcPersistenceConnector = ExecutorInitalizator.getJdbcPersistenceConnector();
return jdbcPersistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier));
} catch (Exception e) {
throw new PluginInstanceNotFoundException();

View File

@ -0,0 +1,286 @@
/**
*
*/
package org.gcube.vremanagement.executor;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.bind.annotation.XmlRootElement;
import org.gcube.common.resources.gcore.Resource;
import org.gcube.common.resources.gcore.Resources;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
import org.gcube.common.resources.gcore.utils.Group;
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
import org.gcube.informationsystem.publisher.ScopedPublisher;
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
import org.gcube.smartgears.context.container.ContainerContext;
import org.gcube.smartgears.handlers.container.ContainerHandler;
import org.gcube.smartgears.handlers.container.ContainerLifecycleEvent;
import org.gcube.vremanagement.executor.api.Executor;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.pluginmanager.PluginThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
@XmlRootElement(name = "plugin-registration-handler")
public class ExecutorInitalizator extends ContainerHandler {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(ExecutorInitalizator.class);
/**
* Pool for thread execution
*/
private static ExecutorService pool;
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
private static Map<UUID,
PluginThread<Plugin<? extends PluginDeclaration>>> pluginInstances;
/**
* Contains the ServiceEnpoint Resource to be published/unpublished on IS
*/
private static ServiceEndpoint serviceEndpoint;
/**
* Represent the connector to DB
*/
private static JDBCPersistenceConnector jdbcPersistenceConnector;
private static ContainerContext ctx;
/**
* @return the pool
*/
public static ExecutorService getPool() {
return pool;
}
/**
* @return the pluginInstances
*/
public static Map<UUID, PluginThread<Plugin<? extends PluginDeclaration>>> getPluginInstances() {
return pluginInstances;
}
/**
* @return the jdbcPersistenceConnector
*/
public static JDBCPersistenceConnector getJdbcPersistenceConnector() {
return jdbcPersistenceConnector;
}
/**
* Publish the provided resource on all Service Scopes retrieved from
* Context
* @param resource to be published
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void publishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to publish %s", stringWriter.toString()));
scopedPublisher.create(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not published", e);
throw e;
}
}
/**
* Remove the resource from IS
* @param resource to be unpublished
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void unPublishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to publish %s", stringWriter.toString()));
scopedPublisher.remove(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not unpublished", e);
throw e;
}
}
/**
* Create the Service Endpoint using information related to discovered
* available plugins and their own discoverd capabilities
* @return the created {@link ServiceEndpoint}
*/
protected static ServiceEndpoint createServiceEndpoint(){
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
Profile profile = serviceEndpoint.newProfile();
profile.category(Executor.SERVICE_NAME);
profile.name(Executor.class.getSimpleName());
Group<AccessPoint> accessPoints = profile.accessPoints();
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
for(String pluginName : availablePlugins.keySet()){
AccessPoint accessPointElement = new AccessPoint();
accessPointElement.name(pluginName);
Group<Property> properties = accessPointElement.properties();
Map<String, String> pluginCapablities = availablePlugins.get(pluginName).getSupportedCapabilities();
for(String capabilityName : pluginCapablities.keySet()){
Property propertyElement = new Property();
propertyElement.nameAndValue(capabilityName, pluginCapablities.get(capabilityName));
properties.add(propertyElement);
}
accessPoints.add(accessPointElement);
}
StringWriter stringWriter = new StringWriter();
Resources.marshal(serviceEndpoint, stringWriter);
logger.debug(String.format("The created ServiceEndpoint profile is \n%s", stringWriter.toString()));
return serviceEndpoint;
}
/** {@inheritDoc} */
/*
* The method discover the plugins available on classpath and their own
* supported capabilities and publish a ServiceEndpoint with the
* discovered information.
* Furthermore create/connect to DB
*/
@Override
public void onStart(ContainerLifecycleEvent.Start containerLifecycleEventStart) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Starting");
logger.debug("-------------------------------------------------------");
pool = Executors.newCachedThreadPool();
serviceEndpoint = createServiceEndpoint();
pluginInstances = new HashMap<UUID, PluginThread<Plugin<? extends PluginDeclaration>>>();
ctx = containerLifecycleEventStart.context();
// TODO Before publishing the new Resource check if on IS there is
// an old published ServiceEndpoint resource that where not unpublished
// correctly
// TODO set task that are still on running state on DB to have a clear
// room
try {
publishScopedResource(serviceEndpoint, ctx.configuration().startScopes());
} catch (RegistryNotFoundException e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
} catch (Exception e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
}
try {
jdbcPersistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location());
} catch (Exception e) {
logger.error("Unable to initialize PersistenceConnector. The Service will be aborted", e);
return;
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Started Successfully");
logger.debug("-------------------------------------------------------");
}
/** {@inheritDoc} */
/*
* This function is invoked before the service will stop and unpublish the
* resource from the IS to maintain the infrastructure integrity.
* Furthermore close the connection to DB.
*/
@Override
public void onStop(ContainerLifecycleEvent.Stop containerLifecycleEventStop) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Stopping");
logger.debug("-------------------------------------------------------");
for(UUID uuid : pluginInstances.keySet()){
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
pluginInstances.get(uuid);
Plugin<? extends PluginDeclaration> pluginInstace =
pluginThread.getPlugin();
try {
logger.debug(String.format("Requesting Stop to plugin instance "
+ "identified by the UUID %s of Plugin named %s", uuid,
pluginInstace.getPluginDeclaration().getName()));
pluginInstace.stop();
logger.debug(String.format("Plugin instance identified by the"
+ "UUID %s of Plugin named %s stopped coorectly itself",
uuid, pluginInstace.getPluginDeclaration().getName()));
} catch (Exception e) {
logger.debug(String.format("Running plugin instance identified "
+ "by the UUID %s of Plugin named %s failed to request "
+ "of being stopped", uuid,
pluginInstace.getPluginDeclaration().getName()));
} finally {
pluginInstace.setState(PluginState.SUSPENDED);
pluginInstances.remove(uuid);
}
}
// Forcing shutdown of all threads
pool.shutdown();
try {
unPublishScopedResource(serviceEndpoint, ctx.configuration().startScopes());
} catch (RegistryNotFoundException e) {
logger.error("Unable to unpublish Service Endpoint.", e);
} catch (Exception e) {
logger.error("Unable to unpublish Service Endpoint.", e);
}
try {
jdbcPersistenceConnector.close();
} catch (Exception e) {
logger.error("Unable to close Persistence", e);
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Stopped Successfully");
logger.debug("-------------------------------------------------------");
}
}

View File

@ -23,7 +23,7 @@ public class ExecutorImplTest {
@Test
public void createServiceEndpointTest() {
ServiceEndpoint serviceEndpoint = ExecutorImpl.createServiceEndpoint();
ServiceEndpoint serviceEndpoint = ExecutorInitalizator.createServiceEndpoint();
Profile profile = serviceEndpoint.profile();
Assert.assertEquals(Executor.SERVICE_NAME, profile.category());