smart-executor/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator....

214 lines
7.6 KiB
Java

package org.gcube.vremanagement.executor;
import java.util.List;
import java.util.Map;
import org.gcube.smartgears.ApplicationManager;
import org.gcube.smartgears.ContextProvider;
import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.ispublisher.ISPublisher;
import org.gcube.vremanagement.executor.ispublisher.RestISPublisher;
import org.gcube.vremanagement.executor.json.ExtendedSEMapper;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class SmartExecutorInitializator implements ApplicationManager {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(SmartExecutorInitializator.class);
public static final long JOIN_TIMEOUT = 1000;
/**
* {@inheritDoc}
* The method discover the plugins available on classpath and their own
* supported capabilities and publish a ServiceEndpoint with the
* discovered information.
* Furthermore create/connect to DB
*/
@Override
public void onInit() {
String context = ContextUtility.getCurrentContext();
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor is Starting on context {}\n"
+ "-------------------------------------------------------",
context);
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
Map<String, Class<? extends Plugin>> availablePlugins = pluginManager.getAvailablePlugins();
ApplicationContext applicationContext = ContextProvider.get();
List<ISPublisher> isPublishers = ISPublisher.getISPublishers(applicationContext);
for(ISPublisher isPublisher : isPublishers) {
try {
isPublisher.unpublishPlugins(true);
}catch (Exception e) {
logger.error("unable to unpublish plugind from IS using {}. Trying to continue.", isPublisher.getClass().getName());
}
try {
isPublisher.publishPlugins(availablePlugins);
}catch (Exception e) {
if(isPublisher instanceof RestISPublisher) {
logger.warn("Unable to create RunningPlugin in context {}. {}", context, e.getMessage());
} else {
logger.error("Unable to create ServiceEndpoint in context {}. The Service will be aborted", context, e);
throw new RuntimeException(e);
}
}
}
final SmartExecutorPersistenceConnector smartExecutorPersistenceConnector;
try {
smartExecutorPersistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
} catch (Exception e) {
logger.error("Unable to instantiate {} for scope {}. The Service will be aborted",
SmartExecutorPersistenceConnector.class.getSimpleName(), context, e);
throw new RuntimeException(e);
}
// TODO set task that are still on running state to FAILED state on
// Persistence to clean previous situation of a failure of HostingNode
try {
logger.debug("Going to get Orphan Scheduled Tasks in scope {}", context);
List<ScheduledTask> scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(pluginManager.getAvailablePlugins().keySet());
if(scheduledTasks.size()==0){
logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", context);
}
for(final ScheduledTask scheduledTask : scheduledTasks){
String taskAsString = ExtendedSEMapper.getInstance().marshal(scheduledTask);
try {
// Reserving the task.
smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask);
}catch (Exception e) {
logger.debug("({}) Someone else is going to take in charge the scheduled task {}. Skipping.", context, taskAsString);
continue;
}
Thread thread = new Thread(){
@Override
public void run(){
LaunchParameter launchParameter = scheduledTask.getLaunchParameter();
try {
logger.info("({}) Going to schedule an already scheduled task with the following parameters {}", context,
ExtendedSEMapper.getInstance().marshal(launchParameter));
} catch (Exception e1) {
}
String scheduledTasktoken = scheduledTask.getToken();
try {
ContextUtility.setContext(scheduledTasktoken);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler();
// A new Scheduled Task will be persisted due to launch. Removing it
smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask);
smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID());
} catch (Exception e) {
logger.error("({}) Error while trying to relaunch scheduled task.", context, e);
try {
smartExecutorPersistenceConnector.addScheduledTask(scheduledTask);
} catch (Exception ex) {
logger.error("({}) Unable to add back scheduled task {}", context, taskAsString);
}
}
}
};
thread.start();
}
} catch (Exception e) {
logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", context, e);
return;
}
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor Started Successfully on context {}\n"
+ "-------------------------------------------------------", context);
}
/**
* {@inheritDoc}
* This function is invoked before the service will stop and unpublish the
* resource from the IS to maintain the infrastructure integrity.
* Furthermore close the connection to DB.
*/
@Override
public void onShutdown(){
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor is Stopping on context {}\n"
+ "-------------------------------------------------------",
ContextUtility.getCurrentContext());
SmartExecutorScheduler scheduler;
try {
scheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler();
scheduler.stopAll();
SmartExecutorSchedulerFactory.removeCurrentSmartExecutorScheduler();
} catch (SchedulerException e) {
logger.error("", e);
}
ApplicationContext applicationContext = ContextProvider.get();
List<ISPublisher> isPublishers = ISPublisher.getISPublishers(applicationContext);
for(ISPublisher isPublisher : isPublishers) {
try {
isPublisher.unpublishPlugins(false);
}catch (Exception e) {
logger.error("unable to unpublish plugins from IS using {}", isPublisher.getClass().getName());
}
}
try {
SmartExecutorPersistenceFactory.closeCurrentPersistenceConnector();
} catch (Throwable e) {
logger.error("Unable to correctly close {} for context {}",
SmartExecutorPersistenceConnector.class.getSimpleName(),
ContextUtility.getCurrentContext(), e);
}
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor Stopped Successfully on context {}\n"
+ "-------------------------------------------------------",
ContextUtility.getCurrentContext());
}
}