Support Scheduled Task take in charge from Orphaned RIs


git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@142457 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-02-10 15:57:10 +00:00
parent fcca317da7
commit 24a9b093cc
8 changed files with 181 additions and 62 deletions

View File

@ -48,7 +48,7 @@ public class SmartExecutorImpl implements SmartExecutor {
logger.info("Launch requested {}", parameter);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
UUID uuid = smartExecutorScheduler.schedule(parameter);
UUID uuid = smartExecutorScheduler.schedule(parameter, null);
logger.info(
String.format(
"The Plugin named %s with UUID %s has been launched %s",

View File

@ -29,10 +29,12 @@ import org.gcube.resources.discovery.icclient.ICFactory;
import org.gcube.smartgears.ApplicationManager;
import org.gcube.smartgears.ContextProvider;
import org.gcube.smartgears.configuration.container.ContainerConfiguration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -186,10 +188,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
* available plugins and their own discovered capabilities
* @return the created {@link ServiceEndpoint}
*/
protected static ServiceEndpoint createServiceEndpoint(){
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
protected static ServiceEndpoint createServiceEndpoint(Map<String, PluginDeclaration> availablePlugins){
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
Profile profile = serviceEndpoint.newProfile();
@ -214,7 +213,6 @@ public class SmartExecutorInitializator implements ApplicationManager {
runtime.status(ContextProvider.get().configuration().mode().toString());
Group<AccessPoint> accessPoints = profile.accessPoints();
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
for(String pluginName : availablePlugins.keySet()){
AccessPoint accessPointElement = new AccessPoint();
@ -288,14 +286,25 @@ public class SmartExecutorInitializator implements ApplicationManager {
+ "-------------------------------------------------------",
scope);
ServiceEndpoint serviceEndpoint = createServiceEndpoint();
logger.debug("Getting Available Plugins and their own supported capabilities");
PluginManager pluginManager = PluginManager.getInstance();
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
ServiceEndpoint serviceEndpoint = createServiceEndpoint(availablePlugins);
cleanServiceEndpoints();
try {
SmartExecutorPersistenceFactory.getPersistenceConnector(scope);
publishResource(serviceEndpoint);
} catch (Exception e) {
logger.error("Unable to instantiate {} for scope {}",
logger.error("Unable to Create ServiceEndpoint for scope {}. The Service will be aborted", scope, e);
throw new RuntimeException(e);
}
final SmartExecutorPersistenceConnector smartExecutorPersistenceConnector;
try {
smartExecutorPersistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
} catch (Exception e) {
logger.error("Unable to instantiate {} for scope {}. The Service will be aborted",
SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e);
throw new RuntimeException(e);
}
@ -304,12 +313,51 @@ public class SmartExecutorInitializator implements ApplicationManager {
// Persistence to clean previous situation of a failure of HostingNode
try {
publishResource(serviceEndpoint);
} catch (RegistryNotFoundException e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
return;
logger.debug("Going to get Orphan Scheduled Tasks");
List<ScheduledTask> scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(availablePlugins.values());
for(final ScheduledTask scheduledTask : scheduledTasks){
try {
// Reserving the task.
smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask);
}catch (Exception e) {
logger.debug("someone else is going to take in charge the scheduled task. Skipping.");
continue;
}
Thread thread = new Thread(){
@Override
public void run(){
LaunchParameter launchParameter = scheduledTask.getLaunchParameter();
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
String scheduledTasktoken = scheduledTask.getToken();
SecurityTokenProvider.instance.set(scheduledTasktoken);
try {
// A new Scheduled Task will be persisted due to launch. Removing it
smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask);
smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID());
} catch (Exception e) {
logger.error("Error while trying to relaunch scheduled task.", e);
try {
smartExecutorPersistenceConnector.addScheduledTask(scheduledTask);
} catch (Exception ex) {
logger.error("Unable to ");
}
}
}
};
thread.start();
}
} catch (Exception e) {
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", scope, e);
return;
}
@ -318,7 +366,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
+ "Smart Executor Started Successfully on scope {}\n"
+ "-------------------------------------------------------", scope);
// TODO Launch repetitive thread for global task take over
}
/**
@ -337,6 +385,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
+ "-------------------------------------------------------",
getCurrentScope());
// TODO release scheduled tasks
SmartExecutorScheduler.getInstance().stopAll();

View File

@ -6,11 +6,21 @@ package org.gcube.vremanagement.executor.persistence;
import java.util.HashMap;
import java.util.UUID;
import org.gcube.common.resources.gcore.HostingNode;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.gcube.vremanagement.executor.plugin.RunOn;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Model the connector which create or open the connection to DB.
@ -18,6 +28,9 @@ import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
*/
public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification implements ScheduledTaskPersistence {
private static final Logger logger = LoggerFactory
.getLogger(SmartExecutorPersistenceConnector.class);
public SmartExecutorPersistenceConnector() {
super(new HashMap<String, String>());
}
@ -47,4 +60,57 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
*/
public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception;
protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception {
try {
UUID uuid = scheduledTask.getUUID();
RunOn runOn = scheduledTask.getRunOn();
if(runOn==null){
return true;
}
try {
HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class);
String hnAddress = hostingNode.profile().description().name();
if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){
return true;
}
}catch (Exception e) {
logger.error("Unable to chack if current hosting node is the same of the one in ScheduledTask", e);
}
String address = runOn.getEService().getAddress();
SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter(
address);
String pluginName = scheduledTask.getLaunchParameter()
.getPluginName();
try {
SmartExecutorProxy proxy = ExecutorPlugin
.getExecutorProxy(pluginName, null, null,
specificEndpointDiscoveryFilter).build();
proxy.getStateEvolution(uuid.toString());
} catch (Exception e) {
// The instance was not found or the request failed.
// The scheduledTask is considered orphan
logger.trace("{} is considered orphan.", ObjectMapperManager
.getObjectMapper().writeValueAsString(scheduledTask));
return true;
}
} catch (Exception e) {
String string = ObjectMapperManager.getObjectMapper()
.writeValueAsString(scheduledTask);
logger.error("Error while checking orphanity of " + string
+ ". Considering as not orphan.");
}
return false;
}
}

View File

@ -5,15 +5,14 @@ package org.gcube.vremanagement.executor.persistence.orientdb;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
@ -215,52 +214,40 @@ public class OrientDBPersistenceConnector extends
}
protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception {
try {
UUID uuid = scheduledTask.getUUID();
RunOn runOn = scheduledTask.getRunOn();
String address = runOn.getEService().getAddress();
SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter(
address);
String pluginName = scheduledTask.getLaunchParameter()
.getPluginName();
try {
SmartExecutorProxy proxy = ExecutorPlugin
.getExecutorProxy(pluginName, null, null,
specificEndpointDiscoveryFilter).build();
proxy.getStateEvolution(uuid.toString());
} catch (Exception e) {
// The instance was not found or the request failed.
// The scheduledTask is considered orphan
logger.trace("{} is considered orphan.", ObjectMapperManager
.getObjectMapper().writeValueAsString(scheduledTask));
return true;
}
} catch (Exception e) {
String string = ObjectMapperManager.getObjectMapper()
.writeValueAsString(scheduledTask);
logger.error("Error while checking orphanity of " + string
+ ". Considering as not orphan.");
}
return false;
}
@Override
public List<ScheduledTask> getOrphanScheduledTasks(
List<? extends PluginDeclaration> pluginDeclarations)
Collection<? extends PluginDeclaration> pluginDeclarations)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
String type = ScheduledTask.class.getSimpleName();
String queryString = String.format("SELECT * FROM %s", type);
if(pluginDeclarations!=null && pluginDeclarations.size()!=0){
boolean first = true;
for(PluginDeclaration pluginDeclaration : pluginDeclarations){
if(first){
first = false;
queryString = String.format("%s WHERE ( (%s = '%s') ",
queryString,
ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME,
pluginDeclaration.getName());
}else{
queryString = String.format("%s OR (%s = '%s') ",
queryString,
ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME,
pluginDeclaration.getName());
}
}
queryString = queryString + ")";
}
OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(
String.format("SELECT * FROM %s", type)
// TODO filter for task the instance can run
queryString
);
List<ODocument> result = query.execute();

View File

@ -6,6 +6,7 @@ package org.gcube.vremanagement.executor.scheduledtask;
import java.util.UUID;
import org.gcube.common.authorization.library.provider.ClientInfo;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint;
import org.gcube.common.resources.gcore.HostingNode;
@ -18,6 +19,7 @@ import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
@ -26,12 +28,16 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY)
public class ScheduledTask {
public static final String LAUNCH_PARAMETER = "launchParameter";
protected Long timestamp;
protected UUID uuid;
@JsonProperty(value=LAUNCH_PARAMETER)
protected LaunchParameter launchParameter;
protected String scope;
protected String token;
protected ClientInfo clientInfo;
protected RunOn runOn;
@ -45,6 +51,7 @@ public class ScheduledTask {
public ScheduledTask(UUID uuid, LaunchParameter launchParameter, RunOn runOn) {
this.uuid = uuid;
this.launchParameter = launchParameter;
this.token = SecurityTokenProvider.instance.get();
this.scope = SmartExecutorInitializator.getCurrentScope();
this.clientInfo = SmartExecutorInitializator.getClientInfo();
this.runOn = runOn;
@ -71,6 +78,13 @@ public class ScheduledTask {
return scope;
}
/**
* @return the token
*/
public String getToken() {
return token;
}
/**
* @return the clientInfo
*/
@ -86,7 +100,7 @@ public class ScheduledTask {
}
private static final String LOCALHOST = "localhost";
public static final String LOCALHOST = "localhost";
/**
* @param runOn the runOn to set
@ -94,7 +108,7 @@ public class ScheduledTask {
public static RunOn generateRunOn() {
Ref hostingNodeRef = null;
try {
HostingNode hostingNode = ContextProvider.get().profile(HostingNode.class);
HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class);
hostingNodeRef = new Ref(hostingNode.id(), hostingNode.profile().description().name());
}catch (Exception e) {
//

View File

@ -3,6 +3,7 @@
*/
package org.gcube.vremanagement.executor.scheduledtask;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@ -25,7 +26,7 @@ public interface ScheduledTaskPersistence {
* if fails
*/
public List<ScheduledTask> getOrphanScheduledTasks(
List<? extends PluginDeclaration> pluginDeclarations)
Collection<? extends PluginDeclaration> pluginDeclarations)
throws SchedulePersistenceException;
/**

View File

@ -183,7 +183,7 @@ public class SmartExecutorScheduler {
* @throws PluginNotFoundException if the request plugin is not available on
* this smart executor instance
*/
public synchronized UUID schedule(LaunchParameter parameter)
public synchronized UUID schedule(LaunchParameter parameter, UUID uuid)
throws InputsNullException, PluginNotFoundException, LaunchException {
Map<String, Object> inputs = parameter.getInputs();
if (inputs == null) {
@ -196,7 +196,9 @@ public class SmartExecutorScheduler {
*/
PluginManager.getPluginDeclaration(parameter.getPluginName());
final UUID uuid = UUID.randomUUID();
if(uuid==null){
uuid = UUID.randomUUID();
}
try {
Scheduler scheduler = reallySchedule(uuid, parameter);

View File

@ -51,7 +51,7 @@ public class SmartExecutorSchedulerTest extends ScopedTest {
parameter.setScheduling(scheduling);
SmartExecutorScheduler smartExecutorScheduler = SmartExecutorScheduler.getInstance();
UUID uuid = smartExecutorScheduler.schedule(parameter);
UUID uuid = smartExecutorScheduler.schedule(parameter, null);
logger.debug("Scheduled Job with ID {}", uuid);
pc = SmartExecutorPersistenceFactory.getPersistenceConnector();