smart-executor/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitalizator.java

334 lines
12 KiB
Java
Raw Normal View History

/**
*
*/
package org.gcube.vremanagement.executor;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.bind.annotation.XmlRootElement;
import org.gcube.common.resources.gcore.Resource;
import org.gcube.common.resources.gcore.Resources;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
import org.gcube.common.resources.gcore.ServiceEndpoint.Runtime;
import org.gcube.common.resources.gcore.common.Platform;
import org.gcube.common.resources.gcore.utils.Group;
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
import org.gcube.informationsystem.publisher.ScopedPublisher;
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.pluginmanager.PluginThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
@XmlRootElement(name = "plugin-registration-handler")
public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(SmartExecutorInitalizator.class);
/**
* Pool for thread execution
*/
private static ExecutorService pool;
/**
* Contains running plugin instances. The key is the associated random UUID.
* This is needed to correctly stop the running plugin execution if the
* container is stopped in the proper way
*/
private static Map<UUID,
PluginThread<Plugin<? extends PluginDeclaration>>> pluginInstances;
/**
* Contains the ServiceEnpoint Resource to be published/unpublished on IS
*/
private static ServiceEndpoint serviceEndpoint;
/**
* Represent the connector to DB
*/
private static JDBCPersistenceConnector jdbcPersistenceConnector;
private static ApplicationContext ctx;
/**
* @return the pool
*/
public static ExecutorService getPool() {
return pool;
}
/**
* @return the pluginInstances
*/
public static Map<UUID, PluginThread<Plugin<? extends PluginDeclaration>>> getPluginInstances() {
return pluginInstances;
}
/**
* @return the jdbcPersistenceConnector
*/
public static JDBCPersistenceConnector getJdbcPersistenceConnector() {
return jdbcPersistenceConnector;
}
/**
* @return the ctx
*/
public static ApplicationContext getCtx() {
return ctx;
}
/**
* Publish the provided resource on all Service Scopes retrieved from
* Context
* @param resource to be published
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void publishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to publish\n%s", stringWriter.toString()));
scopedPublisher.create(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not published", e);
throw e;
}
}
/**
* Remove the resource from IS
* @param resource to be unpublished
* @throws RegistryNotFoundException if the Registry is not found so the
* resource has not be published
*/
private static void unPublishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
StringWriter stringWriter = new StringWriter();
Resources.marshal(resource, stringWriter);
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
try {
logger.debug(String.format("Trying to unpublish\n%s", stringWriter.toString()));
scopedPublisher.remove(resource, scopes);
} catch (RegistryNotFoundException e) {
logger.error("The resource was not unpublished", e);
throw e;
}
}
/**
* Create the Service Endpoint using information related to discovered
* available plugins and their own discoverd capabilities
* @return the created {@link ServiceEndpoint}
*/
protected static ServiceEndpoint createServiceEndpoint(){
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
Profile profile = serviceEndpoint.newProfile();
profile.category(ctx.configuration().serviceClass());
profile.name(ctx.configuration().name());
profile.version(ctx.configuration().version());
profile.description(ctx.configuration().description());
Platform platform = profile.newPlatform();
ctx.container().configuration().site();
platform.name(ctx.container().configuration().hostname());
short version = 1;
platform.version(version);
Runtime runtime = profile.newRuntime();
runtime.hostedOn(ctx.container().configuration().hostname());
runtime.status(ctx.configuration().mode().toString());
Group<AccessPoint> accessPoints = profile.accessPoints();
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
for(String pluginName : availablePlugins.keySet()){
AccessPoint accessPointElement = new AccessPoint();
accessPointElement.name(pluginName);
PluginDeclaration pluginDeclaration = availablePlugins.get(pluginName);
accessPointElement.description(pluginDeclaration.getDescription());
Group<Property> properties = accessPointElement.properties();
Property propertyVersionElement = new Property();
propertyVersionElement.nameAndValue("Version", pluginDeclaration.getVersion());
properties.add(propertyVersionElement);
Map<String, String> pluginCapablities = pluginDeclaration.getSupportedCapabilities();
for(String capabilityName : pluginCapablities.keySet()){
Property propertyElement = new Property();
propertyElement.nameAndValue(capabilityName, pluginCapablities.get(capabilityName));
properties.add(propertyElement);
}
accessPoints.add(accessPointElement);
}
StringWriter stringWriter = new StringWriter();
Resources.marshal(serviceEndpoint, stringWriter);
logger.debug(String.format("The created ServiceEndpoint profile is \n%s", stringWriter.toString()));
return serviceEndpoint;
}
public static List<String> getScopes(ApplicationContext applicationContext){
Set<String> applicationScopes = applicationContext.configuration().startScopes();
Set<String> containerScopes = applicationContext.container().configuration().startScopes();
Set<String> scopes;
if(applicationScopes==null || applicationScopes.isEmpty()){
scopes = containerScopes;
logger.debug(String.format("Application Scopes (%s). The Container Scopes (%s) will be used.", applicationScopes, scopes));
} else{
logger.debug(String.format("Container Scopes (%s). Application Scopes (%s) will be used.", containerScopes, applicationScopes));
scopes = new HashSet<String>(applicationScopes);
}
return new ArrayList<String>(scopes);
}
/** {@inheritDoc} */
/*
* The method discover the plugins available on classpath and their own
* supported capabilities and publish a ServiceEndpoint with the
* discovered information.
* Furthermore create/connect to DB
*/
@Override
public void onStart(ApplicationLifecycleEvent.Start applicationLifecycleEventStart) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Starting");
logger.debug("-------------------------------------------------------");
ctx = applicationLifecycleEventStart.context();
pool = Executors.newCachedThreadPool();
serviceEndpoint = createServiceEndpoint();
pluginInstances = new HashMap<UUID, PluginThread<Plugin<? extends PluginDeclaration>>>();
// TODO Before publishing the new Resource check if on IS there is
// an old published ServiceEndpoint resource that where not unpublished
// correctly
// TODO set task that are still on running state on DB to have a clear
// room
try {
publishScopedResource(serviceEndpoint, getScopes(ctx));
} catch (RegistryNotFoundException e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
} catch (Exception e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
}
try {
jdbcPersistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location());
} catch (Exception e) {
logger.error("Unable to initialize PersistenceConnector. The Service will be aborted", e);
return;
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Started Successfully");
logger.debug("-------------------------------------------------------");
}
/** {@inheritDoc} */
/*
* This function is invoked before the service will stop and unpublish the
* resource from the IS to maintain the infrastructure integrity.
* Furthermore close the connection to DB.
*/
@Override
public void onStop(ApplicationLifecycleEvent.Stop applicationLifecycleEventStop) {
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor is Stopping");
logger.debug("-------------------------------------------------------");
for(UUID uuid : pluginInstances.keySet()){
PluginThread<Plugin<? extends PluginDeclaration>> pluginThread =
pluginInstances.get(uuid);
Plugin<? extends PluginDeclaration> pluginInstace =
pluginThread.getPlugin();
try {
logger.debug(String.format("Requesting Stop to plugin instance "
+ "identified by the UUID %s of Plugin named %s", uuid,
pluginInstace.getPluginDeclaration().getName()));
pluginInstace.stop();
logger.debug(String.format("Plugin instance identified by the"
+ "UUID %s of Plugin named %s stopped coorectly itself",
uuid, pluginInstace.getPluginDeclaration().getName()));
} catch (Exception e) {
logger.debug(String.format("Running plugin instance identified "
+ "by the UUID %s of Plugin named %s failed to request "
+ "of being stopped", uuid,
pluginInstace.getPluginDeclaration().getName()));
} finally {
pluginInstace.setState(PluginState.SUSPENDED);
pluginInstances.remove(uuid);
}
}
// Forcing shutdown of all threads
pool.shutdown();
try {
unPublishScopedResource(serviceEndpoint, getScopes(ctx));
} catch (RegistryNotFoundException e) {
logger.error("Unable to unpublish Service Endpoint.", e);
} catch (Exception e) {
logger.error("Unable to unpublish Service Endpoint.", e);
}
try {
jdbcPersistenceConnector.close();
} catch (Exception e) {
logger.error("Unable to close Persistence", e);
}
logger.debug("-------------------------------------------------------");
logger.debug("Smart Executor Stopped Successfully");
logger.debug("-------------------------------------------------------");
}
}