diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java index c1001bb..b3f1250 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java @@ -48,7 +48,7 @@ public class SmartExecutorImpl implements SmartExecutor { logger.info("Launch requested {}", parameter); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); - UUID uuid = smartExecutorScheduler.schedule(parameter); + UUID uuid = smartExecutorScheduler.schedule(parameter, null); logger.info( String.format( "The Plugin named %s with UUID %s has been launched %s", diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java index f40ca37..c85cd7b 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java @@ -29,10 +29,12 @@ import org.gcube.resources.discovery.icclient.ICFactory; import org.gcube.smartgears.ApplicationManager; import org.gcube.smartgears.ContextProvider; import org.gcube.smartgears.configuration.container.ContainerConfiguration; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,10 +188,7 @@ public class SmartExecutorInitializator implements ApplicationManager { * available plugins and their own discovered capabilities * @return the created {@link ServiceEndpoint} */ - protected static ServiceEndpoint createServiceEndpoint(){ - logger.debug("Getting Available Plugins and their own supported capabilities"); - PluginManager pluginManager = PluginManager.getInstance(); - + protected static ServiceEndpoint createServiceEndpoint(Map availablePlugins){ logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities"); ServiceEndpoint serviceEndpoint = new ServiceEndpoint(); Profile profile = serviceEndpoint.newProfile(); @@ -214,7 +213,6 @@ public class SmartExecutorInitializator implements ApplicationManager { runtime.status(ContextProvider.get().configuration().mode().toString()); Group accessPoints = profile.accessPoints(); - Map availablePlugins = pluginManager.getAvailablePlugins(); for(String pluginName : availablePlugins.keySet()){ AccessPoint accessPointElement = new AccessPoint(); @@ -288,14 +286,25 @@ public class SmartExecutorInitializator implements ApplicationManager { + "-------------------------------------------------------", scope); - ServiceEndpoint serviceEndpoint = createServiceEndpoint(); + logger.debug("Getting Available Plugins and their own supported capabilities"); + PluginManager pluginManager = PluginManager.getInstance(); + Map availablePlugins = pluginManager.getAvailablePlugins(); + ServiceEndpoint serviceEndpoint = createServiceEndpoint(availablePlugins); cleanServiceEndpoints(); try { - SmartExecutorPersistenceFactory.getPersistenceConnector(scope); + publishResource(serviceEndpoint); } catch (Exception e) { - logger.error("Unable to instantiate {} for scope {}", + logger.error("Unable to Create ServiceEndpoint for scope {}. The Service will be aborted", scope, e); + throw new RuntimeException(e); + } + + final SmartExecutorPersistenceConnector smartExecutorPersistenceConnector; + try { + smartExecutorPersistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); + } catch (Exception e) { + logger.error("Unable to instantiate {} for scope {}. The Service will be aborted", SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e); throw new RuntimeException(e); } @@ -304,12 +313,51 @@ public class SmartExecutorInitializator implements ApplicationManager { // Persistence to clean previous situation of a failure of HostingNode try { - publishResource(serviceEndpoint); - } catch (RegistryNotFoundException e) { - logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e); - return; + logger.debug("Going to get Orphan Scheduled Tasks"); + + List scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(availablePlugins.values()); + for(final ScheduledTask scheduledTask : scheduledTasks){ + try { + // Reserving the task. + smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask); + }catch (Exception e) { + logger.debug("someone else is going to take in charge the scheduled task. Skipping."); + continue; + } + + Thread thread = new Thread(){ + + @Override + public void run(){ + LaunchParameter launchParameter = scheduledTask.getLaunchParameter(); + + SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); + + String scheduledTasktoken = scheduledTask.getToken(); + SecurityTokenProvider.instance.set(scheduledTasktoken); + try { + // A new Scheduled Task will be persisted due to launch. Removing it + smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask); + smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID()); + } catch (Exception e) { + logger.error("Error while trying to relaunch scheduled task.", e); + try { + smartExecutorPersistenceConnector.addScheduledTask(scheduledTask); + } catch (Exception ex) { + logger.error("Unable to "); + } + } + + } + + }; + + thread.start(); + + } + } catch (Exception e) { - logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e); + logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", scope, e); return; } @@ -318,7 +366,7 @@ public class SmartExecutorInitializator implements ApplicationManager { + "Smart Executor Started Successfully on scope {}\n" + "-------------------------------------------------------", scope); - // TODO Launch repetitive thread for global task take over + } /** @@ -337,6 +385,7 @@ public class SmartExecutorInitializator implements ApplicationManager { + "-------------------------------------------------------", getCurrentScope()); + // TODO release scheduled tasks SmartExecutorScheduler.getInstance().stopAll(); diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java index 2f9854c..195d0ee 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java @@ -6,11 +6,21 @@ package org.gcube.vremanagement.executor.persistence; import java.util.HashMap; import java.util.UUID; +import org.gcube.common.resources.gcore.HostingNode; +import org.gcube.smartgears.ContextProvider; +import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; +import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; +import org.gcube.vremanagement.executor.json.ObjectMapperManager; import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.plugin.PluginStateNotification; +import org.gcube.vremanagement.executor.plugin.RunOn; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Model the connector which create or open the connection to DB. @@ -18,6 +28,9 @@ import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; */ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification implements ScheduledTaskPersistence { + private static final Logger logger = LoggerFactory + .getLogger(SmartExecutorPersistenceConnector.class); + public SmartExecutorPersistenceConnector() { super(new HashMap()); } @@ -47,4 +60,57 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif */ public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception; + + protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception { + try { + UUID uuid = scheduledTask.getUUID(); + + + RunOn runOn = scheduledTask.getRunOn(); + if(runOn==null){ + return true; + } + + try { + HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class); + String hnAddress = hostingNode.profile().description().name(); + + if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){ + return true; + } + }catch (Exception e) { + logger.error("Unable to chack if current hosting node is the same of the one in ScheduledTask", e); + + } + + String address = runOn.getEService().getAddress(); + + SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter( + address); + + String pluginName = scheduledTask.getLaunchParameter() + .getPluginName(); + + try { + SmartExecutorProxy proxy = ExecutorPlugin + .getExecutorProxy(pluginName, null, null, + specificEndpointDiscoveryFilter).build(); + proxy.getStateEvolution(uuid.toString()); + } catch (Exception e) { + // The instance was not found or the request failed. + // The scheduledTask is considered orphan + logger.trace("{} is considered orphan.", ObjectMapperManager + .getObjectMapper().writeValueAsString(scheduledTask)); + return true; + } + } catch (Exception e) { + String string = ObjectMapperManager.getObjectMapper() + .writeValueAsString(scheduledTask); + logger.error("Error while checking orphanity of " + string + + ". Considering as not orphan."); + } + + return false; + } + } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java index c146871..8aa8054 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java @@ -5,15 +5,14 @@ package org.gcube.vremanagement.executor.persistence.orientdb; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.gcube.vremanagement.executor.SmartExecutorInitializator; -import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; -import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.json.ObjectMapperManager; @@ -215,52 +214,40 @@ public class OrientDBPersistenceConnector extends } - protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception { - try { - UUID uuid = scheduledTask.getUUID(); - - RunOn runOn = scheduledTask.getRunOn(); - String address = runOn.getEService().getAddress(); - SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter( - address); - - String pluginName = scheduledTask.getLaunchParameter() - .getPluginName(); - - try { - SmartExecutorProxy proxy = ExecutorPlugin - .getExecutorProxy(pluginName, null, null, - specificEndpointDiscoveryFilter).build(); - proxy.getStateEvolution(uuid.toString()); - } catch (Exception e) { - // The instance was not found or the request failed. - // The scheduledTask is considered orphan - logger.trace("{} is considered orphan.", ObjectMapperManager - .getObjectMapper().writeValueAsString(scheduledTask)); - return true; - } - } catch (Exception e) { - String string = ObjectMapperManager.getObjectMapper() - .writeValueAsString(scheduledTask); - logger.error("Error while checking orphanity of " + string - + ". Considering as not orphan."); - } - - return false; - } - @Override public List getOrphanScheduledTasks( - List pluginDeclarations) + Collection pluginDeclarations) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); String type = ScheduledTask.class.getSimpleName(); - + + + + String queryString = String.format("SELECT * FROM %s", type); + if(pluginDeclarations!=null && pluginDeclarations.size()!=0){ + boolean first = true; + for(PluginDeclaration pluginDeclaration : pluginDeclarations){ + if(first){ + first = false; + queryString = String.format("%s WHERE ( (%s = '%s') ", + queryString, + ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, + pluginDeclaration.getName()); + }else{ + queryString = String.format("%s OR (%s = '%s') ", + queryString, + ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, + pluginDeclaration.getName()); + } + } + queryString = queryString + ")"; + } + + OSQLSynchQuery query = new OSQLSynchQuery( - String.format("SELECT * FROM %s", type) - // TODO filter for task the instance can run + queryString ); List result = query.execute(); diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java index f8d7662..e78a45b 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java @@ -6,6 +6,7 @@ package org.gcube.vremanagement.executor.scheduledtask; import java.util.UUID; import org.gcube.common.authorization.library.provider.ClientInfo; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint; import org.gcube.common.resources.gcore.HostingNode; @@ -18,6 +19,7 @@ import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.plugin.Ref; import org.gcube.vremanagement.executor.plugin.RunOn; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; /** @@ -26,12 +28,16 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY) public class ScheduledTask { + public static final String LAUNCH_PARAMETER = "launchParameter"; + protected Long timestamp; protected UUID uuid; + @JsonProperty(value=LAUNCH_PARAMETER) protected LaunchParameter launchParameter; protected String scope; + protected String token; protected ClientInfo clientInfo; protected RunOn runOn; @@ -45,6 +51,7 @@ public class ScheduledTask { public ScheduledTask(UUID uuid, LaunchParameter launchParameter, RunOn runOn) { this.uuid = uuid; this.launchParameter = launchParameter; + this.token = SecurityTokenProvider.instance.get(); this.scope = SmartExecutorInitializator.getCurrentScope(); this.clientInfo = SmartExecutorInitializator.getClientInfo(); this.runOn = runOn; @@ -71,6 +78,13 @@ public class ScheduledTask { return scope; } + /** + * @return the token + */ + public String getToken() { + return token; + } + /** * @return the clientInfo */ @@ -86,7 +100,7 @@ public class ScheduledTask { } - private static final String LOCALHOST = "localhost"; + public static final String LOCALHOST = "localhost"; /** * @param runOn the runOn to set @@ -94,7 +108,7 @@ public class ScheduledTask { public static RunOn generateRunOn() { Ref hostingNodeRef = null; try { - HostingNode hostingNode = ContextProvider.get().profile(HostingNode.class); + HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class); hostingNodeRef = new Ref(hostingNode.id(), hostingNode.profile().description().name()); }catch (Exception e) { // diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java index dd2b0eb..dc97056 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java @@ -3,6 +3,7 @@ */ package org.gcube.vremanagement.executor.scheduledtask; +import java.util.Collection; import java.util.List; import java.util.UUID; @@ -25,7 +26,7 @@ public interface ScheduledTaskPersistence { * if fails */ public List getOrphanScheduledTasks( - List pluginDeclarations) + Collection pluginDeclarations) throws SchedulePersistenceException; /** diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java index 6f4c66c..a644719 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java @@ -183,7 +183,7 @@ public class SmartExecutorScheduler { * @throws PluginNotFoundException if the request plugin is not available on * this smart executor instance */ - public synchronized UUID schedule(LaunchParameter parameter) + public synchronized UUID schedule(LaunchParameter parameter, UUID uuid) throws InputsNullException, PluginNotFoundException, LaunchException { Map inputs = parameter.getInputs(); if (inputs == null) { @@ -196,7 +196,9 @@ public class SmartExecutorScheduler { */ PluginManager.getPluginDeclaration(parameter.getPluginName()); - final UUID uuid = UUID.randomUUID(); + if(uuid==null){ + uuid = UUID.randomUUID(); + } try { Scheduler scheduler = reallySchedule(uuid, parameter); diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java index 7482234..2e22284 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java @@ -51,7 +51,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest { parameter.setScheduling(scheduling); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance(); - UUID uuid = smartExecutorScheduler.schedule(parameter); + UUID uuid = smartExecutorScheduler.schedule(parameter, null); logger.debug("Scheduled Job with ID {}", uuid); pc = SmartExecutorPersistenceFactory.getPersistenceConnector();