package org.gcube.vremanagement.executor; import java.util.List; import java.util.Map; import org.gcube.smartgears.ApplicationManager; import org.gcube.smartgears.ContextProvider; import org.gcube.smartgears.context.application.ApplicationContext; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.ispublisher.ISPublisher; import org.gcube.vremanagement.executor.ispublisher.RestISPublisher; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public class SmartExecutorInitializator implements ApplicationManager { /** * Logger */ private static Logger logger = LoggerFactory.getLogger(SmartExecutorInitializator.class); public static final long JOIN_TIMEOUT = 1000; /** * {@inheritDoc} * The method discover the plugins available on classpath and their own * supported capabilities and publish a ServiceEndpoint with the * discovered information. * Furthermore create/connect to DB */ @Override public void onInit() { String scope = ContextUtility.getCurrentScope(); logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor is Starting on scope {}\n" + "-------------------------------------------------------", scope); logger.debug("Getting Available Plugins and their own supported capabilities"); PluginManager pluginManager = PluginManager.getInstance(); Map> availablePlugins = pluginManager.getAvailablePlugins(); ApplicationContext applicationContext = ContextProvider.get(); List isPublishers = ISPublisher.getISPublishers(applicationContext); for(ISPublisher isPublisher : isPublishers) { try { isPublisher.unpublishPlugins(true); }catch (Exception e) { logger.error("unable to unpublish plugind from IS using {}. Trying to continue.", isPublisher.getClass().getName()); } try { isPublisher.publishPlugins(availablePlugins); }catch (Exception e) { logger.error("Unable to create plugins is resources in context {}. The Service will be aborted", scope, e); if(!(isPublisher instanceof RestISPublisher)) { throw new RuntimeException(e); } } } final SmartExecutorPersistenceConnector smartExecutorPersistenceConnector; try { smartExecutorPersistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); } catch (Exception e) { logger.error("Unable to instantiate {} for scope {}. The Service will be aborted", SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e); throw new RuntimeException(e); } // TODO set task that are still on running state to FAILED state on // Persistence to clean previous situation of a failure of HostingNode try { logger.debug("Going to get Orphan Scheduled Tasks in scope {}", scope); List scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(pluginManager.getAvailablePlugins().keySet()); if(scheduledTasks.size()==0){ logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", scope); } for(final ScheduledTask scheduledTask : scheduledTasks){ String taskAsString = ExtendedSEMapper.getInstance().marshal(scheduledTask); try { // Reserving the task. smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask); }catch (Exception e) { logger.debug("({}) Someone else is going to take in charge the scheduled task {}. Skipping.", scope, taskAsString); continue; } Thread thread = new Thread(){ @Override public void run(){ LaunchParameter launchParameter = scheduledTask.getLaunchParameter(); try { logger.info("({}) Going to schedule an already scheduled task with the following parameters {}", scope, ExtendedSEMapper.getInstance().marshal(launchParameter)); } catch (Exception e1) { } String scheduledTasktoken = scheduledTask.getToken(); try { ContextUtility.setContext(scheduledTasktoken); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); // A new Scheduled Task will be persisted due to launch. Removing it smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask); smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID()); } catch (Exception e) { logger.error("({}) Error while trying to relaunch scheduled task.", scope, e); try { smartExecutorPersistenceConnector.addScheduledTask(scheduledTask); } catch (Exception ex) { logger.error("({}) Unable to add back scheduled task {}", scope, taskAsString); } } } }; thread.start(); } } catch (Exception e) { logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", scope, e); return; } logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor Started Successfully on scope {}\n" + "-------------------------------------------------------", scope); } /** * {@inheritDoc} * This function is invoked before the service will stop and unpublish the * resource from the IS to maintain the infrastructure integrity. * Furthermore close the connection to DB. */ @Override public void onShutdown(){ logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor is Stopping on scope {}\n" + "-------------------------------------------------------", ContextUtility.getCurrentScope()); SmartExecutorScheduler scheduler; try { scheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); scheduler.stopAll(); SmartExecutorSchedulerFactory.remove(); } catch (SchedulerException e) { logger.error("", e); } ApplicationContext applicationContext = ContextProvider.get(); List isPublishers = ISPublisher.getISPublishers(applicationContext); for(ISPublisher isPublisher : isPublishers) { try { isPublisher.unpublishPlugins(false); }catch (Exception e) { logger.error("unable to unpublish plugind from IS using {}", isPublisher.getClass().getName()); } } try { SmartExecutorPersistenceFactory.closePersistenceConnector(); } catch (Throwable e) { logger.error("Unable to correctly close {} for scope {}", SmartExecutorPersistenceConnector.class.getSimpleName(), ContextUtility.getCurrentScope(), e); } logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor Stopped Successfully on scope {}\n" + "-------------------------------------------------------", ContextUtility.getCurrentScope()); } }