Reintroduced ScopeProvider to avoid problem in the next release
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@124153 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
5982f63f8a
commit
c499f902b7
|
@ -0,0 +1,11 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<handlers>
|
||||||
|
<lifecycle>
|
||||||
|
<profile-management />
|
||||||
|
<plugin-registration-handler />
|
||||||
|
</lifecycle>
|
||||||
|
<request>
|
||||||
|
<request-validation />
|
||||||
|
<request-accounting />
|
||||||
|
</request>
|
||||||
|
</handlers>
|
|
@ -4,7 +4,7 @@ import java.util.UUID;
|
||||||
|
|
||||||
import javax.jws.WebService;
|
import javax.jws.WebService;
|
||||||
|
|
||||||
import org.gcube.smartgears.annotations.ManagedBy;
|
import org.gcube.smartgears.context.application.ApplicationContext;
|
||||||
import org.gcube.vremanagement.executor.api.SmartExecutor;
|
import org.gcube.vremanagement.executor.api.SmartExecutor;
|
||||||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
||||||
import org.gcube.vremanagement.executor.exception.ExecutorException;
|
import org.gcube.vremanagement.executor.exception.ExecutorException;
|
||||||
|
@ -31,7 +31,6 @@ portName = "SmartExecutorPort",
|
||||||
serviceName = SmartExecutor.WEB_SERVICE_SERVICE_NAME,
|
serviceName = SmartExecutor.WEB_SERVICE_SERVICE_NAME,
|
||||||
targetNamespace = SmartExecutor.TARGET_NAMESPACE,
|
targetNamespace = SmartExecutor.TARGET_NAMESPACE,
|
||||||
endpointInterface = "org.gcube.vremanagement.executor.api.SmartExecutor" )
|
endpointInterface = "org.gcube.vremanagement.executor.api.SmartExecutor" )
|
||||||
@ManagedBy(SmartExecutorInitializator.class)
|
|
||||||
public class SmartExecutorImpl implements SmartExecutor {
|
public class SmartExecutorImpl implements SmartExecutor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,6 +38,15 @@ public class SmartExecutorImpl implements SmartExecutor {
|
||||||
*/
|
*/
|
||||||
private static Logger logger = LoggerFactory.getLogger(SmartExecutorImpl.class);
|
private static Logger logger = LoggerFactory.getLogger(SmartExecutorImpl.class);
|
||||||
|
|
||||||
|
protected static ApplicationContext ctx;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the ctx
|
||||||
|
*/
|
||||||
|
public static ApplicationContext getCtx() {
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
/**{@inheritDoc}*/
|
/**{@inheritDoc}*/
|
||||||
@Override
|
@Override
|
||||||
public String launch(LaunchParameter parameter) throws InputsNullException,
|
public String launch(LaunchParameter parameter) throws InputsNullException,
|
||||||
|
|
|
@ -1,14 +1,22 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
package org.gcube.vremanagement.executor;
|
package org.gcube.vremanagement.executor;
|
||||||
|
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.gcube.common.authorization.client.Constants;
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
import org.gcube.common.authorization.library.AuthorizationEntry;
|
|
||||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
||||||
import org.gcube.common.resources.gcore.Resource;
|
import org.gcube.common.resources.gcore.Resource;
|
||||||
import org.gcube.common.resources.gcore.Resources;
|
import org.gcube.common.resources.gcore.Resources;
|
||||||
|
import org.gcube.common.resources.gcore.ScopeGroup;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
|
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
|
||||||
|
@ -16,15 +24,18 @@ import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.Runtime;
|
import org.gcube.common.resources.gcore.ServiceEndpoint.Runtime;
|
||||||
import org.gcube.common.resources.gcore.common.Platform;
|
import org.gcube.common.resources.gcore.common.Platform;
|
||||||
import org.gcube.common.resources.gcore.utils.Group;
|
import org.gcube.common.resources.gcore.utils.Group;
|
||||||
import org.gcube.informationsystem.publisher.RegistryPublisher;
|
import org.gcube.common.scope.api.ScopeProvider;
|
||||||
|
import org.gcube.informationsystem.publisher.AdvancedScopedPublisher;
|
||||||
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
|
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
|
||||||
|
import org.gcube.informationsystem.publisher.ScopedPublisher;
|
||||||
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
|
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
|
||||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
||||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
||||||
import org.gcube.resources.discovery.icclient.ICFactory;
|
import org.gcube.resources.discovery.icclient.ICFactory;
|
||||||
import org.gcube.smartgears.ApplicationManager;
|
|
||||||
import org.gcube.smartgears.ContextProvider;
|
|
||||||
import org.gcube.smartgears.configuration.container.ContainerConfiguration;
|
import org.gcube.smartgears.configuration.container.ContainerConfiguration;
|
||||||
|
import org.gcube.smartgears.context.application.ApplicationContext;
|
||||||
|
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
|
||||||
|
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
|
||||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
|
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
|
||||||
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
|
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
|
||||||
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
|
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
|
||||||
|
@ -37,7 +48,8 @@ import org.slf4j.LoggerFactory;
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class SmartExecutorInitializator implements ApplicationManager {
|
@XmlRootElement(name = "plugin-registration-handler")
|
||||||
|
public class SmartExecutorInitializator extends ApplicationLifecycleHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logger
|
* Logger
|
||||||
|
@ -46,16 +58,38 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
|
|
||||||
public static final long JOIN_TIMEOUT = 1000;
|
public static final long JOIN_TIMEOUT = 1000;
|
||||||
|
|
||||||
public static String getScopeFromToken(){
|
/* *
|
||||||
String token = SecurityTokenProvider.instance.get();
|
* Contains the ServiceEnpoint Resource to be published/unpublished on IS
|
||||||
AuthorizationEntry authorizationEntry;
|
* /
|
||||||
try {
|
private static ServiceEndpoint serviceEndpoint;
|
||||||
authorizationEntry = Constants.authorizationService().get(token);
|
*/
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
/* *
|
||||||
}
|
* The application context
|
||||||
String scope = authorizationEntry.getContext();
|
* /
|
||||||
return scope;
|
protected static ApplicationContext ctx;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the ctx
|
||||||
|
* /
|
||||||
|
public static ApplicationContext getCtx() {
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
protected static ScheduledTaskConfiguration launchConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the configuredTasks
|
||||||
|
* /
|
||||||
|
public static ScheduledTaskConfiguration getConfiguredTasks() {
|
||||||
|
return launchConfiguration;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
public static String getCurrentScope(){
|
||||||
|
return ScopeProvider.instance.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,16 +99,15 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
* @throws RegistryNotFoundException if the Registry is not found so the
|
* @throws RegistryNotFoundException if the Registry is not found so the
|
||||||
* resource has not be published
|
* resource has not be published
|
||||||
*/
|
*/
|
||||||
private static void publishResource(Resource resource) throws Exception {
|
private static void publishScopedResource(Resource resource, List<String> scopes) throws RegistryNotFoundException, Exception {
|
||||||
StringWriter stringWriter = new StringWriter();
|
StringWriter stringWriter = new StringWriter();
|
||||||
Resources.marshal(resource, stringWriter);
|
Resources.marshal(resource, stringWriter);
|
||||||
|
|
||||||
RegistryPublisher registryPublisher = RegistryPublisherFactory.create();
|
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.debug("Trying to publish to {}:\n{}", getScopeFromToken(), stringWriter);
|
logger.debug("Trying to publish to {}:\n{}", scopes, stringWriter);
|
||||||
registryPublisher.create(resource);
|
scopedPublisher.create(resource, scopes);
|
||||||
} catch (Exception e) {
|
} catch (RegistryNotFoundException e) {
|
||||||
logger.error("The resource was not published", e);
|
logger.error("The resource was not published", e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -86,16 +119,18 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
* @throws RegistryNotFoundException if the Registry is not found so the
|
* @throws RegistryNotFoundException if the Registry is not found so the
|
||||||
* resource has not be published
|
* resource has not be published
|
||||||
*/
|
*/
|
||||||
private static void unPublishResource(Resource resource) throws Exception {
|
private static void unPublishScopedResource(Resource resource) throws RegistryNotFoundException, Exception {
|
||||||
//StringWriter stringWriter = new StringWriter();
|
//StringWriter stringWriter = new StringWriter();
|
||||||
//Resources.marshal(resource, stringWriter);
|
//Resources.marshal(resource, stringWriter);
|
||||||
|
|
||||||
RegistryPublisher registryPublisher = RegistryPublisherFactory.create();
|
ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher();
|
||||||
|
AdvancedScopedPublisher advancedScopedPublisher = new AdvancedScopedPublisher(scopedPublisher);
|
||||||
|
|
||||||
String id = resource.id();
|
String id = resource.id();
|
||||||
logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getScopeFromToken());
|
logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, ScopeProvider.instance.get());
|
||||||
|
|
||||||
registryPublisher.remove(resource);
|
//scopedPublisher.remove(resource, scopes);
|
||||||
|
advancedScopedPublisher.forceRemove(resource);
|
||||||
|
|
||||||
logger.debug("{} with ID {} removed successfully", resource.getClass().getSimpleName(), id);
|
logger.debug("{} with ID {} removed successfully", resource.getClass().getSimpleName(), id);
|
||||||
}
|
}
|
||||||
|
@ -157,13 +192,13 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
|
logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities");
|
||||||
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
|
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
|
||||||
Profile profile = serviceEndpoint.newProfile();
|
Profile profile = serviceEndpoint.newProfile();
|
||||||
profile.category(ContextProvider.get().configuration().serviceClass());
|
profile.category(SmartExecutorImpl.ctx.configuration().serviceClass());
|
||||||
profile.name(ContextProvider.get().configuration().name());
|
profile.name(SmartExecutorImpl.ctx.configuration().name());
|
||||||
String version = ContextProvider.get().configuration().version();
|
String version = SmartExecutorImpl.ctx.configuration().version();
|
||||||
profile.version(version);
|
profile.version(version);
|
||||||
profile.description(ContextProvider.get().configuration().description());
|
profile.description(SmartExecutorImpl.ctx.configuration().description());
|
||||||
|
|
||||||
String runningOn = getRunningOn(ContextProvider.get().container().configuration());
|
String runningOn = getRunningOn(SmartExecutorImpl.ctx.container().configuration());
|
||||||
Platform platform = profile.newPlatform();
|
Platform platform = profile.newPlatform();
|
||||||
platform.name(runningOn);
|
platform.name(runningOn);
|
||||||
|
|
||||||
|
@ -175,7 +210,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
|
|
||||||
Runtime runtime = profile.newRuntime();
|
Runtime runtime = profile.newRuntime();
|
||||||
runtime.hostedOn(runningOn);
|
runtime.hostedOn(runningOn);
|
||||||
runtime.status(ContextProvider.get().configuration().mode().toString());
|
runtime.status(SmartExecutorImpl.ctx.configuration().mode().toString());
|
||||||
|
|
||||||
Group<AccessPoint> accessPoints = profile.accessPoints();
|
Group<AccessPoint> accessPoints = profile.accessPoints();
|
||||||
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
|
Map<String, PluginDeclaration> availablePlugins = pluginManager.getAvailablePlugins();
|
||||||
|
@ -210,12 +245,34 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
return serviceEndpoint;
|
return serviceEndpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanServiceEndpoints(){
|
public static List<String> getScopes(ApplicationContext applicationContext){
|
||||||
|
Collection<String> scopes;
|
||||||
|
|
||||||
|
ScopeGroup<String> scopeGroup = applicationContext.profile(GCoreEndpoint.class).scopes();
|
||||||
|
if(scopeGroup==null || scopeGroup.isEmpty()){
|
||||||
|
Set<String> applicationScopes = applicationContext.configuration().startScopes();
|
||||||
|
Set<String> containerScopes = applicationContext.container().configuration().startScopes();
|
||||||
|
|
||||||
|
if(applicationScopes==null || applicationScopes.isEmpty()){
|
||||||
|
scopes = containerScopes;
|
||||||
|
logger.debug("Application Scopes ({}). The Container Scopes ({}) will be used.", applicationScopes, scopes);
|
||||||
|
} else{
|
||||||
|
logger.debug("Container Scopes ({}). Application Scopes ({}) will be used.", containerScopes, applicationScopes);
|
||||||
|
scopes = new HashSet<String>(applicationScopes);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
scopes = scopeGroup.asCollection();
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ArrayList<String>(scopes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanServiceEndpoints(String scope){
|
||||||
try {
|
try {
|
||||||
SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class)
|
SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class)
|
||||||
.addCondition(String.format("$resource/Profile/Category/text() eq '%s'", ContextProvider.get().configuration().serviceClass()))
|
.addCondition(String.format("$resource/Profile/Category/text() eq '%s'", SmartExecutorImpl.ctx.configuration().serviceClass()))
|
||||||
.addCondition(String.format("$resource/Profile/Name/text() eq '%s'", ContextProvider.get().configuration().name()))
|
.addCondition(String.format("$resource/Profile/Name/text() eq '%s'", SmartExecutorImpl.ctx.configuration().name()))
|
||||||
.addCondition(String.format("$resource/Profile/RunTime/HostedOn/text() eq '%s'", getRunningOn(ContextProvider.get().container().configuration())))
|
.addCondition(String.format("$resource/Profile/RunTime/HostedOn/text() eq '%s'", getRunningOn(SmartExecutorImpl.ctx.container().configuration())))
|
||||||
.setResult("$resource");
|
.setResult("$resource");
|
||||||
|
|
||||||
DiscoveryClient<ServiceEndpoint> client = ICFactory.clientFor(ServiceEndpoint.class);
|
DiscoveryClient<ServiceEndpoint> client = ICFactory.clientFor(ServiceEndpoint.class);
|
||||||
|
@ -224,11 +281,11 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
for (ServiceEndpoint serviceEndpoint : serviceEndpoints) {
|
for (ServiceEndpoint serviceEndpoint : serviceEndpoints) {
|
||||||
try {
|
try {
|
||||||
logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}",
|
logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}",
|
||||||
serviceEndpoint.id(), getScopeFromToken());
|
serviceEndpoint.id(), scope);
|
||||||
unPublishResource(serviceEndpoint);
|
unPublishScopedResource(serviceEndpoint);
|
||||||
} catch(Exception e){
|
} catch(Exception e){
|
||||||
logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}",
|
logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}",
|
||||||
serviceEndpoint.id(), getScopeFromToken(), e);
|
serviceEndpoint.id(), scope, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}catch(Exception e){
|
}catch(Exception e){
|
||||||
|
@ -244,38 +301,37 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
* Furthermore create/connect to DB
|
* Furthermore create/connect to DB
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onInit() {
|
public void onStart(ApplicationLifecycleEvent.Start applicationLifecycleEventStart) {
|
||||||
String token = SecurityTokenProvider.instance.get();
|
|
||||||
AuthorizationEntry authorizationEntry;
|
|
||||||
try {
|
|
||||||
authorizationEntry = Constants.authorizationService().get(token);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
String scope = authorizationEntry.getContext();
|
|
||||||
|
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"\n-------------------------------------------------------\n"
|
"\n-------------------------------------------------------\n"
|
||||||
+ "Smart Executor is Starting on scope {}\n"
|
+ "Smart Executor is Starting\n"
|
||||||
+ "-------------------------------------------------------", scope);
|
+ "-------------------------------------------------------");
|
||||||
|
|
||||||
|
SmartExecutorImpl.ctx = applicationLifecycleEventStart.context();
|
||||||
|
|
||||||
ServiceEndpoint serviceEndpoint = createServiceEndpoint();
|
ServiceEndpoint serviceEndpoint = createServiceEndpoint();
|
||||||
|
|
||||||
cleanServiceEndpoints();
|
// Checking if there are old unpublished ServiceEndpoints related to
|
||||||
|
// this vHN and trying to unpublish them
|
||||||
|
List<String> scopes = getScopes(SmartExecutorImpl.ctx);
|
||||||
|
|
||||||
try {
|
for(String scope : scopes){
|
||||||
SmartExecutorPersistenceFactory.getPersistenceConnector();
|
ScopeProvider.instance.set(scope);
|
||||||
} catch (Exception e) {
|
cleanServiceEndpoints(scope);
|
||||||
logger.error("Unable to isntantiate {} for scope {}",
|
try {
|
||||||
SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e);
|
SmartExecutorPersistenceFactory.getPersistenceConnector();
|
||||||
throw new RuntimeException(e);
|
} catch (Exception e) {
|
||||||
|
logger.error("Unable to isntantiate {} for scope {}",
|
||||||
|
SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO set task that are still on running state to FAILED state on
|
// TODO set task that are still on running state on DB to have a clear
|
||||||
// Persistence to clean previous situation of a failure of HostingNode
|
// room
|
||||||
|
|
||||||
try {
|
try {
|
||||||
publishResource(serviceEndpoint);
|
publishScopedResource(serviceEndpoint, scopes);
|
||||||
} catch (RegistryNotFoundException e) {
|
} catch (RegistryNotFoundException e) {
|
||||||
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
|
logger.error("Unable to Create ServiceEndpoint. the Service will be aborted", e);
|
||||||
return;
|
return;
|
||||||
|
@ -286,10 +342,12 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
|
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"\n-------------------------------------------------------\n"
|
"\n-------------------------------------------------------\n"
|
||||||
+ "Smart Executor Started Successfully on scope {}\n"
|
+ "Smart Executor Started Successfully\n"
|
||||||
+ "-------------------------------------------------------", scope);
|
+ "-------------------------------------------------------");
|
||||||
|
|
||||||
|
// TODO Launch initializer thread
|
||||||
|
|
||||||
|
|
||||||
// TODO Launch repetitive thread for global task take over
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,32 +357,31 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
* Furthermore close the connection to DB.
|
* Furthermore close the connection to DB.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onShutdown(){
|
public void onStop(ApplicationLifecycleEvent.Stop applicationLifecycleEventStop) {
|
||||||
|
|
||||||
|
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"\n-------------------------------------------------------\n"
|
"\n-------------------------------------------------------\n"
|
||||||
+ "Smart Executor is Stopping on scope {}\n"
|
+ "Smart Executor is Stopping\n"
|
||||||
+ "-------------------------------------------------------",
|
+ "-------------------------------------------------------");
|
||||||
getScopeFromToken());
|
|
||||||
|
|
||||||
SmartExecutorScheduler.getInstance().stopAll();
|
SmartExecutorScheduler.getInstance().stopAll();
|
||||||
|
|
||||||
|
|
||||||
cleanServiceEndpoints();
|
List<String> scopes = getScopes(SmartExecutorImpl.ctx);
|
||||||
try {
|
|
||||||
SmartExecutorPersistenceFactory.getPersistenceConnector().close();
|
for(String scope : scopes){
|
||||||
} catch (Exception e) {
|
ScopeProvider.instance.set(scope);
|
||||||
logger.error("Unable to correctly close {} for scope {}",
|
cleanServiceEndpoints(scope);
|
||||||
SmartExecutorPersistenceConnector.class.getSimpleName(),
|
try {
|
||||||
getScopeFromToken(), e);
|
SmartExecutorPersistenceFactory.getPersistenceConnector().close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Unable to correctly close {} for scope {}",
|
||||||
|
SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"\n-------------------------------------------------------\n"
|
"\n-------------------------------------------------------\n"
|
||||||
+ "Smart Executor Stopped Successfully on scope {}\n"
|
+ "Smart Executor Stopped Successfully\n"
|
||||||
+ "-------------------------------------------------------",
|
+ "-------------------------------------------------------");
|
||||||
getScopeFromToken());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,9 @@ import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
||||||
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Every implementation MUST take in account to store/query the records
|
||||||
|
* on the current scope which is not passed as argument but MUST be retrieved
|
||||||
|
* from thread local
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*/
|
*/
|
||||||
public interface ScheduledTaskConfiguration {
|
public interface ScheduledTaskConfiguration {
|
||||||
|
|
|
@ -13,7 +13,8 @@ import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.gcube.smartgears.ContextProvider;
|
//import org.gcube.smartgears.ContextProvider;
|
||||||
|
import org.gcube.vremanagement.executor.SmartExecutorImpl;
|
||||||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
||||||
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
||||||
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
|
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
|
||||||
|
@ -44,7 +45,8 @@ public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguratio
|
||||||
public static final String CONFIG_TASK_FILENAME = "definedTasks.json";
|
public static final String CONFIG_TASK_FILENAME = "definedTasks.json";
|
||||||
|
|
||||||
public FileScheduledTaskConfiguration() throws Exception {
|
public FileScheduledTaskConfiguration() throws Exception {
|
||||||
this(ContextProvider.get().persistence().location());
|
this(SmartExecutorImpl.getCtx().persistence().location());
|
||||||
|
//this(ContextProvider.get().persistence().location());
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileScheduledTaskConfiguration(String location) throws IOException, JSONException {
|
public FileScheduledTaskConfiguration(String location) throws IOException, JSONException {
|
||||||
|
|
|
@ -48,23 +48,23 @@ public class JSONLaunchParameter extends LaunchParameter {
|
||||||
|
|
||||||
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs) {
|
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs) {
|
||||||
super(pluginName, pluginCapabilities, inputs);
|
super(pluginName, pluginCapabilities, inputs);
|
||||||
this.scope = SmartExecutorInitializator.getScopeFromToken();
|
this.scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
}
|
}
|
||||||
|
|
||||||
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
||||||
super(pluginName, inputs, scheduling);
|
super(pluginName, inputs, scheduling);
|
||||||
this.scope = SmartExecutorInitializator.getScopeFromToken();
|
this.scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
}
|
}
|
||||||
|
|
||||||
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
|
||||||
super(pluginName, pluginCapabilities, inputs, scheduling);
|
super(pluginName, pluginCapabilities, inputs, scheduling);
|
||||||
this.scope = SmartExecutorInitializator.getScopeFromToken();
|
this.scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
}
|
}
|
||||||
|
|
||||||
public JSONLaunchParameter(LaunchParameter parameter) throws ParseException {
|
public JSONLaunchParameter(LaunchParameter parameter) throws ParseException {
|
||||||
super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling());
|
super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling());
|
||||||
this.scheduling = new JSONScheduling(parameter.getScheduling());
|
this.scheduling = new JSONScheduling(parameter.getScheduling());
|
||||||
this.scope = SmartExecutorInitializator.getScopeFromToken();
|
this.scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
}
|
}
|
||||||
|
|
||||||
public JSONLaunchParameter(JSONObject jsonObject) throws JSONException, ParseException, ScopeNotMatchException {
|
public JSONLaunchParameter(JSONObject jsonObject) throws JSONException, ParseException, ScopeNotMatchException {
|
||||||
|
@ -100,7 +100,7 @@ public class JSONLaunchParameter extends LaunchParameter {
|
||||||
this.usedBy = jsonObject.getString(USED_BY);
|
this.usedBy = jsonObject.getString(USED_BY);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.scope = SmartExecutorInitializator.getScopeFromToken();
|
this.scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
if(jsonObject.has(SCOPE)){
|
if(jsonObject.has(SCOPE)){
|
||||||
String jsonScope = jsonObject.getString(SCOPE);
|
String jsonScope = jsonObject.getString(SCOPE);
|
||||||
if(jsonScope.compareTo(scope)!=0){
|
if(jsonScope.compareTo(scope)!=0){
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class SmartExecutorPersistenceConfiguration {
|
||||||
List<ServiceEndpoint> serviceEndpoints = client.submit(query);
|
List<ServiceEndpoint> serviceEndpoints = client.submit(query);
|
||||||
if(serviceEndpoints.size()>1){
|
if(serviceEndpoints.size()>1){
|
||||||
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", TARGET_SCOPE));
|
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", TARGET_SCOPE));
|
||||||
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getScopeFromToken()));
|
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getCurrentScope()));
|
||||||
serviceEndpoints = client.submit(query);
|
serviceEndpoints = client.submit(query);
|
||||||
}
|
}
|
||||||
return serviceEndpoints.get(0);
|
return serviceEndpoints.get(0);
|
||||||
|
|
|
@ -11,6 +11,9 @@ import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Model the connector which create or open the connection to DB.
|
* Model the connector which create or open the connection to DB.
|
||||||
|
* Every implementation MUST take in account to store/query the records
|
||||||
|
* on the current scope which is not passed as argument but MUSt be retrieved
|
||||||
|
* from thread local.
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*/
|
*/
|
||||||
public abstract class SmartExecutorPersistenceConnector implements PluginStateNotification {
|
public abstract class SmartExecutorPersistenceConnector implements PluginStateNotification {
|
||||||
|
|
|
@ -43,12 +43,12 @@ public abstract class SmartExecutorPersistenceFactory {
|
||||||
*/
|
*/
|
||||||
public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception {
|
public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception {
|
||||||
SmartExecutorPersistenceConnector persistence =
|
SmartExecutorPersistenceConnector persistence =
|
||||||
getPersistenceConnector(SmartExecutorInitializator.getScopeFromToken());
|
getPersistenceConnector(SmartExecutorInitializator.getCurrentScope());
|
||||||
|
|
||||||
if(persistence==null){
|
if(persistence==null){
|
||||||
logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.",
|
logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.",
|
||||||
SmartExecutorPersistenceConnector.class.getSimpleName(),
|
SmartExecutorPersistenceConnector.class.getSimpleName(),
|
||||||
SmartExecutorInitializator.getScopeFromToken(),
|
SmartExecutorInitializator.getCurrentScope(),
|
||||||
Map.class.getSimpleName());
|
Map.class.getSimpleName());
|
||||||
|
|
||||||
String className = CouchDBPersistenceConnector.class.getSimpleName();
|
String className = CouchDBPersistenceConnector.class.getSimpleName();
|
||||||
|
@ -56,7 +56,7 @@ public abstract class SmartExecutorPersistenceFactory {
|
||||||
new SmartExecutorPersistenceConfiguration(className);
|
new SmartExecutorPersistenceConfiguration(className);
|
||||||
|
|
||||||
persistence = new CouchDBPersistenceConnector(configuration);
|
persistence = new CouchDBPersistenceConnector(configuration);
|
||||||
persistenceConnectors.put(SmartExecutorInitializator.getScopeFromToken(),
|
persistenceConnectors.put(SmartExecutorInitializator.getCurrentScope(),
|
||||||
persistence);
|
persistence);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,10 +65,10 @@ public abstract class SmartExecutorPersistenceFactory {
|
||||||
|
|
||||||
public static synchronized void closePersistenceConnector() throws Exception {
|
public static synchronized void closePersistenceConnector() throws Exception {
|
||||||
SmartExecutorPersistenceConnector persistence =
|
SmartExecutorPersistenceConnector persistence =
|
||||||
getPersistenceConnector(SmartExecutorInitializator.getScopeFromToken());
|
getPersistenceConnector(SmartExecutorInitializator.getCurrentScope());
|
||||||
if(persistence!=null){
|
if(persistence!=null){
|
||||||
persistence.close();
|
persistence.close();
|
||||||
persistenceConnectors.remove(SmartExecutorInitializator.getScopeFromToken());
|
persistenceConnectors.remove(SmartExecutorInitializator.getCurrentScope());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
||||||
|
|
||||||
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE_DOCUMENT));
|
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE_DOCUMENT));
|
||||||
|
|
||||||
String scope = SmartExecutorInitializator.getScopeFromToken();
|
String scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
ArrayNode startKey = new ObjectMapper().createArrayNode();
|
ArrayNode startKey = new ObjectMapper().createArrayNode();
|
||||||
startKey.add(scope);
|
startKey.add(scope);
|
||||||
ArrayNode endKey = new ObjectMapper().createArrayNode();
|
ArrayNode endKey = new ObjectMapper().createArrayNode();
|
||||||
|
@ -273,7 +273,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
||||||
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT));
|
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT));
|
||||||
query = query.viewName(ORPHAN_VIEW);
|
query = query.viewName(ORPHAN_VIEW);
|
||||||
|
|
||||||
String scope = SmartExecutorInitializator.getScopeFromToken();
|
String scope = SmartExecutorInitializator.getCurrentScope();
|
||||||
ArrayNode startKey = new ObjectMapper().createArrayNode();
|
ArrayNode startKey = new ObjectMapper().createArrayNode();
|
||||||
startKey.add(scope);
|
startKey.add(scope);
|
||||||
ArrayNode endKey = new ObjectMapper().createArrayNode();
|
ArrayNode endKey = new ObjectMapper().createArrayNode();
|
||||||
|
@ -316,7 +316,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
|
||||||
JSONObject obj = jlp.toJSON();
|
JSONObject obj = jlp.toJSON();
|
||||||
obj.append(TYPE_JSON_FIELD, SCHEDULED_TASK_TYPE);
|
obj.append(TYPE_JSON_FIELD, SCHEDULED_TASK_TYPE);
|
||||||
obj.append(USED_BY_FIELD, consumerID);
|
obj.append(USED_BY_FIELD, consumerID);
|
||||||
obj.append(ScheduledTaskConfiguration.SCOPE, SmartExecutorInitializator.getScopeFromToken());
|
obj.append(ScheduledTaskConfiguration.SCOPE, SmartExecutorInitializator.getCurrentScope());
|
||||||
createItem(obj, uuid.toString());
|
createItem(obj, uuid.toString());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error Adding Scheduled Task UUID : {}, Consumer : {}, LaunchParameter : {}",
|
logger.error("Error Adding Scheduled Task UUID : {}, Consumer : {}, LaunchParameter : {}",
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class PluginStateEvolutionObjectNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void addScope(ObjectNode objectNode){
|
public static void addScope(ObjectNode objectNode){
|
||||||
objectNode.put(SCOPE_FIELD, SmartExecutorInitializator.getScopeFromToken());
|
objectNode.put(SCOPE_FIELD, SmartExecutorInitializator.getCurrentScope());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ObjectNode getObjectMapper(PluginStateEvolution pluginStateEvolution){
|
public static ObjectNode getObjectMapper(PluginStateEvolution pluginStateEvolution){
|
||||||
|
|
|
@ -12,7 +12,8 @@ import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
||||||
import org.gcube.smartgears.ContextProvider;
|
import org.gcube.vremanagement.executor.SmartExecutorImpl;
|
||||||
|
//import org.gcube.smartgears.ContextProvider;
|
||||||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
||||||
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
import org.gcube.vremanagement.executor.api.types.Scheduling;
|
||||||
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
|
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
|
||||||
|
@ -150,7 +151,8 @@ public class SmartExecutorScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String runningInstanceID = ContextProvider.get().profile(GCoreEndpoint.class).id();
|
//String runningInstanceID = ContextProvider.get().profile(GCoreEndpoint.class).id();
|
||||||
|
String runningInstanceID = SmartExecutorImpl.getCtx().profile(GCoreEndpoint.class).id();
|
||||||
logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ",
|
logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ",
|
||||||
uuid.toString(), runningInstanceID, parameter);
|
uuid.toString(), runningInstanceID, parameter);
|
||||||
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue