diff --git a/CHANGELOG.md b/CHANGELOG.md index fa471c6..98861e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,73 +1,75 @@ -This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html]. +This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -# Changelog for Smart Executor +# Changelog for Smart Executor Service -## [v2.0.0-SNAPSHOT] +## [v3.0.0-SNAPSHOT] [r5.0.0] - +- Switched smart-executor JSON management to gcube-jackson [#19647] - Migrated Code from OrientDB 2.2.X APIs OrientDB 3.0.X APIs [#16123] -- Removed SOAP APIs - Redesigned HTTP APIs to comply with RESTful architectural style [#12997] - Added API to retrieve scheduled tasks [#10780] +## [v2.0.0] -## [v1.9.0] - 2017-02-15 +- Removed SOAP APIs + + +## [v1.9.0] [r4.10.0] - 2018-02-15 - Added REST interface to Smart Executor [#5109] -## [v1.8.0] - 2017-11-29 +## [v1.8.0] [r4.8.0] - 2017-11-29 - Fixed exception on stop() method [#10064] -## [v1.7.0] - 2017-10-09 +## [v1.7.0] [r4.7.0] - 2017-10-09 -- Changed TaskUsageRecord use due to changes in model [#9646] [#9647] +- Changed JobUsageRecord use due to changes in model [#9646] +- Removed TaskUsageRecord use due to changes in model [#9647] -## [v1.6.0] - 2017-07-25 +## [v1.6.0] [r4.6.0] - 2017-07-25 -- Setting explictly the context before running the plugin to avoid wrong context made from quartz thread reuse +- Setting explicitly the context before running the plugin to avoid wrong context made from quartz thread reuse -## [v1.5.0] - 2017-03-16 +## [v1.5.0] [r4.3.0] - 2017-03-16 -- Provided access to UUID and iteration number fro a plugin [#6733] +- Provided access to UUID and iteration number for a plugin [#6733] - Added support to implements Reminiscence for a Scheduled Task [#772] -- Removed config file support added in release 1.2.0 now available via Reminiscence [#772] -- Migrated from CouchDB to OrientDB [#6565] -- Added accountign by using TaskUsageRecord [#6116] +- Removed configuration file support added in release 1.2.0 now available via Reminiscence [#772] +- Migrated from CouchDB® to OrientDB® [#6565] +- Added accounting by using TaskUsageRecord [#6116] -## [v1.4.0] - 2016-11-07 +## [v1.4.0] [r4.1.0] - 2016-11-07 -- SmartExecutor has been passed to Authorization 2.0 [#4944 #2112] +- SmartExecutor has been migrated to Authorization 2.0 [#4944] [#2112] - Provided to plugins the possibility to specify progress percentage [#440] -- >Provided to plugins the possibility to define a custom notifier [#5089] +- Provided to plugins the possibility to define a custom notifier [#5089] +## [v1.3.0] [r3.10.0] - 2016-02-08 -## [v1.3.0] - 2016-02-08 - -- Using Persistence (CouchDB] to save Scheduled Task configuration [#579] +- Using CouchDB® to save Scheduled Task configuration [#579] - Added Unscheduling feature for repetitive task [#521] -## [v1.2.0] - 2015-12-31 +## [v1.2.0] [r3.9.1] - 2015-12-31 -- Removed Support for Config File to run task at service startup +- Removed Support for configuration File to run task at service startup -## [v1.1.0] - 2015-12-09 +## [v1.1.0] [r3.9.0] - 2015-12-09 -- Added Support for Config File to run task at service startup [#508] +- Added Support for configuration File to run task at service startup [#508] - Added Recurrent and Scheduled Task support [#111] -- Saving Task Evolution on Persistence (CouchDB) [#89] +- Saving Task Evolution on Persistence CouchDB® [#89] - -## [v1.0.0] - 2015-02-05 +## [v1.0.0] - 2015-02-05 - First Release - diff --git a/gcube/extra-resources/WEB-INF/gcube-app.xml b/gcube/extra-resources/WEB-INF/gcube-app.xml index 44b06dc..f77abc3 100644 --- a/gcube/extra-resources/WEB-INF/gcube-app.xml +++ b/gcube/extra-resources/WEB-INF/gcube-app.xml @@ -2,7 +2,7 @@ ${project.artifactId} - ${serviceClass} + ${project.groupId} ${project.version} ${project.description} \ No newline at end of file diff --git a/pom.xml b/pom.xml index f92e099..00e2dc5 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ org.gcube.vremanagement smart-executor - 2.0.0-SNAPSHOT + 3.0.0-SNAPSHOT SmartExecutor Smart Executor Service allows to launch recurrent tasks such as task for infrastructure management, healthy monitoring etc. war @@ -19,7 +19,7 @@ UTF-8 ${project.basedir}${file.separator}src${file.separator}main${file.separator}webapp${file.separator}WEB-INF - VREManagement + 3.1.12 @@ -66,7 +66,28 @@ com.orientechnologies orientdb-client - 3.0.35 + ${orientdb.version} + + + + org.gcube.information-system + information-system-model + provided + + + org.gcube.resource-management + gcube-model + provided + + + org.gcube.information-system + resource-registry-client + provided + + + org.gcube.information-system + resource-registry-publisher + provided @@ -77,10 +98,9 @@ org.gcube.vremanagement smart-executor-client - [2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT) + [3.0.0-SNAPSHOT, 4.0.0-SNAPSHOT) - - + @@ -103,8 +123,6 @@ org.slf4j slf4j-api - - junit @@ -112,58 +130,11 @@ 4.11 test - org.gcube.vremanagement hello-world-se-plugin - [1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT) + [2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT) test - - - - - org.apache.maven.plugins - maven-resources-plugin - - - copy-profile - - copy-resources - - process-resources - - ${webappDirectory} - - - ${project.basedir} - true - - src - - - LICENSE.md - README.md - gcube-app.xml - changelog.xml - - - - - - - - - org.apache.maven.plugins - maven-assembly-plugin - - - make-servicearchive - package - - - - - - + \ No newline at end of file diff --git a/src/main/java/org/gcube/vremanagement/executor/ContextUtility.java b/src/main/java/org/gcube/vremanagement/executor/ContextUtility.java new file mode 100644 index 0000000..f09b547 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/ContextUtility.java @@ -0,0 +1,82 @@ +package org.gcube.vremanagement.executor; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.common.authorization.client.Constants; +import org.gcube.common.authorization.client.exceptions.ObjectNotFound; +import org.gcube.common.authorization.library.AuthorizationEntry; +import org.gcube.common.authorization.library.ClientType; +import org.gcube.common.authorization.library.provider.ClientInfo; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; +import org.gcube.common.scope.api.ScopeProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) + */ +public class ContextUtility { + + private static Logger logger = LoggerFactory.getLogger(ContextUtility.class); + + public static String getCurrentContext() { + String token = SecurityTokenProvider.instance.get(); + AuthorizationEntry authorizationEntry; + try { + authorizationEntry = Constants.authorizationService().get(token); + } catch(Exception e) { + logger.trace("Context was not retrieved from token. Going to get it from {}", + ScopeProvider.class.getSimpleName()); + return ScopeProvider.instance.get(); + } + String context = authorizationEntry.getContext(); + logger.trace("Context retrieved from token is {}. Context in {} is {}", context, + ScopeProvider.class.getSimpleName(), ScopeProvider.instance.get()); + return context; + } + + public static String getCurrentContext(String token) throws ObjectNotFound, Exception { + AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); + String context = authorizationEntry.getContext(); + logger.info("Context of token {} is {}", token, context); + return context; + } + + public static void setContext(String token) throws ObjectNotFound, Exception { + SecurityTokenProvider.instance.set(token); + ScopeProvider.instance.set(getCurrentContext(token)); + } + + public static ClientInfo getClientInfo() { + String token = SecurityTokenProvider.instance.get(); + AuthorizationEntry authorizationEntry; + try { + authorizationEntry = Constants.authorizationService().get(token); + } catch(Exception e) { + return new ClientInfo() { + + /** + * Generated Serial Version UID + */ + private static final long serialVersionUID = 8311873203596762883L; + + @Override + public ClientType getType() { + return ClientType.USER; + } + + @Override + public List getRoles() { + return new ArrayList<>(); + } + + @Override + public String getId() { + return "UNKNOWN"; + } + }; + } + return authorizationEntry.getClientInfo(); + } +} diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java index bb368d5..396d0a0 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java @@ -1,40 +1,19 @@ package org.gcube.vremanagement.executor; -import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.gcube.common.authorization.client.Constants; -import org.gcube.common.authorization.client.exceptions.ObjectNotFound; -import org.gcube.common.authorization.library.AuthorizationEntry; -import org.gcube.common.authorization.library.ClientType; -import org.gcube.common.authorization.library.provider.ClientInfo; -import org.gcube.common.authorization.library.provider.SecurityTokenProvider; -import org.gcube.common.resources.gcore.Resource; -import org.gcube.common.resources.gcore.Resources; -import org.gcube.common.resources.gcore.ServiceEndpoint; -import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; -import org.gcube.common.resources.gcore.ServiceEndpoint.Profile; -import org.gcube.common.resources.gcore.ServiceEndpoint.Property; -import org.gcube.common.resources.gcore.ServiceEndpoint.Runtime; -import org.gcube.common.resources.gcore.common.Platform; -import org.gcube.common.resources.gcore.utils.Group; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.informationsystem.publisher.RegistryPublisher; -import org.gcube.informationsystem.publisher.RegistryPublisherFactory; -import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException; -import org.gcube.resources.discovery.client.api.DiscoveryClient; -import org.gcube.resources.discovery.client.queries.api.SimpleQuery; -import org.gcube.resources.discovery.icclient.ICFactory; import org.gcube.smartgears.ApplicationManager; import org.gcube.smartgears.ContextProvider; -import org.gcube.smartgears.configuration.container.ContainerConfiguration; +import org.gcube.smartgears.context.application.ApplicationContext; import org.gcube.vremanagement.executor.api.types.LaunchParameter; +import org.gcube.vremanagement.executor.ispublisher.ISPublisher; +import org.gcube.vremanagement.executor.ispublisher.RestISPublisher; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; +import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; @@ -55,241 +34,6 @@ public class SmartExecutorInitializator implements ApplicationManager { public static final long JOIN_TIMEOUT = 1000; - public static String getCurrentScope(){ - String token = SecurityTokenProvider.instance.get(); - AuthorizationEntry authorizationEntry; - try { - authorizationEntry = Constants.authorizationService().get(token); - } catch (Exception e) { - logger.trace("Context was not retrieved from token. Going to get it from {}", ScopeProvider.class.getSimpleName()); - return ScopeProvider.instance.get(); - } - String context = authorizationEntry.getContext(); - logger.trace("Context retrieved from token is {}. Context in {} is {}", - context, ScopeProvider.class.getSimpleName(), ScopeProvider.instance.get()); - return context; - } - - public static String getCurrentScope(String token) throws ObjectNotFound, Exception{ - AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); - String context = authorizationEntry.getContext(); - logger.info("Context of token {} is {}", token, context); - return context; - } - - - public static void setContext(String token) throws ObjectNotFound, Exception{ - SecurityTokenProvider.instance.set(token); - ScopeProvider.instance.set(getCurrentScope(token)); - } - - public static ClientInfo getClientInfo() { - String token = SecurityTokenProvider.instance.get(); - AuthorizationEntry authorizationEntry; - try { - authorizationEntry = Constants.authorizationService().get(token); - } catch (Exception e) { - return new ClientInfo() { - - /** - * Generated Serial Version UID - */ - private static final long serialVersionUID = 8311873203596762883L; - - @Override - public ClientType getType() { - return ClientType.USER; - } - - @Override - public List getRoles() { - return new ArrayList<>(); - } - - @Override - public String getId() { - return "UNKNOWN"; - } - }; - } - return authorizationEntry.getClientInfo(); - } - - - /** - * Publish the provided resource on all Service Scopes retrieved from - * Context - * @param resource to be published - * @throws RegistryNotFoundException if the Registry is not found so the - * resource has not be published - */ - private static void publishResource(Resource resource) throws Exception { - StringWriter stringWriter = new StringWriter(); - Resources.marshal(resource, stringWriter); - - RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); - - try { - logger.debug("Trying to publish to {}:\n{}", getCurrentScope(), stringWriter); - registryPublisher.create(resource); - } catch (Exception e) { - logger.error("The resource was not published", e); - throw e; - } - } - - /** - * Remove the resource from IS - * @param resource to be unpublished - * @throws RegistryNotFoundException if the Registry is not found so the - * resource has not be published - */ - private static void unPublishResource(Resource resource) throws Exception { - //StringWriter stringWriter = new StringWriter(); - //Resources.marshal(resource, stringWriter); - - RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); - - String id = resource.id(); - logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getCurrentScope()); - - registryPublisher.remove(resource); - - logger.debug("{} with ID {} removed successfully", resource.getClass().getSimpleName(), id); - } - - /** - * Return the parsed version string as array of short. - * @param version the version as String - * @param wantedLenght if the length is equals to dot (.) separated - * number in the string. Otherwise the version is padded or truncated to - * the required version - * @return the parsed version as array of short. If on slicing some of the - * version cannot be parsed as short 1 is used for the first number, 0 is - * used instead or for padding - */ - private static short[] getVersionSlice(String version, int wantedLenght){ - logger.trace("Trying to parse {}", version); - - short[] versionSlices = new short[wantedLenght]; - for(int j=0; j=wantedLenght){ - break; - } - try { - short n = Short.parseShort(stringSlices[i]); - versionSlices[i] = n; - logger.trace("Version slice n. {} wich is '{}' parsed as short {}", i, stringSlices[i], n); - } catch(NumberFormatException nfe){ - logger.trace("Version slice n. {} wich is '{}' failed to parse. The default value {} will be used", i, stringSlices[i], versionSlices[i]); - } - } - } catch(Exception e){ - logger.trace("Error parsing the supplied version the default will be used", versionSlices); - } - - logger.trace("Version {} parsed as {}", version, versionSlices); - return versionSlices; - } - - private static String getRunningOn(ContainerConfiguration containerConfiguration){ - return String.format("%s:%s", containerConfiguration.hostname(), containerConfiguration.port()); - } - - /** - * Create the Service Endpoint using information related to discovered - * available plugins and their own discovered capabilities - * @return the created {@link ServiceEndpoint} - */ - protected static ServiceEndpoint createServiceEndpoint(Map availablePlugins){ - logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities"); - ServiceEndpoint serviceEndpoint = new ServiceEndpoint(); - Profile profile = serviceEndpoint.newProfile(); - profile.category(ContextProvider.get().configuration().serviceClass()); - profile.name(ContextProvider.get().configuration().name()); - String version = ContextProvider.get().configuration().version(); - profile.version(version); - profile.description(ContextProvider.get().configuration().description()); - - String runningOn = getRunningOn(ContextProvider.get().container().configuration()); - Platform platform = profile.newPlatform(); - platform.name(runningOn); - - short[] versionSlices = getVersionSlice(version, 4); - platform.version(versionSlices[0]); - platform.minorVersion(versionSlices[1]); - platform.buildVersion(versionSlices[2]); - platform.revisionVersion(versionSlices[3]); - - Runtime runtime = profile.newRuntime(); - runtime.hostedOn(runningOn); - runtime.status(ContextProvider.get().configuration().mode().toString()); - - Group accessPoints = profile.accessPoints(); - - for(String pluginName : availablePlugins.keySet()){ - AccessPoint accessPointElement = new AccessPoint(); - accessPointElement.name(pluginName); - - PluginDeclaration pluginDeclaration = availablePlugins.get(pluginName); - - accessPointElement.description(pluginDeclaration.getDescription()); - - Group properties = accessPointElement.properties(); - Property propertyVersionElement = new Property(); - propertyVersionElement.nameAndValue("Version", pluginDeclaration.getVersion()); - properties.add(propertyVersionElement); - - - Map pluginCapabilities = pluginDeclaration.getSupportedCapabilities(); - for(String capabilityName : pluginCapabilities.keySet()){ - Property propertyElement = new Property(); - propertyElement.nameAndValue(capabilityName, pluginCapabilities.get(capabilityName)); - properties.add(propertyElement); - } - accessPoints.add(accessPointElement); - } - - StringWriter stringWriter = new StringWriter(); - Resources.marshal(serviceEndpoint, stringWriter); - logger.debug("The created ServiceEndpoint profile is\n{}", stringWriter.toString()); - - return serviceEndpoint; - } - - private void cleanServiceEndpoints(){ - try { - SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class) - .addCondition(String.format("$resource/Profile/Category/text() eq '%s'", ContextProvider.get().configuration().serviceClass())) - .addCondition(String.format("$resource/Profile/Name/text() eq '%s'", ContextProvider.get().configuration().name())) - .addCondition(String.format("$resource/Profile/RunTime/HostedOn/text() eq '%s'", getRunningOn(ContextProvider.get().container().configuration()))) - .setResult("$resource"); - - DiscoveryClient client = ICFactory.clientFor(ServiceEndpoint.class); - List serviceEndpoints = client.submit(query); - - for (ServiceEndpoint serviceEndpoint : serviceEndpoints) { - try { - logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}", - serviceEndpoint.id(), getCurrentScope()); - unPublishResource(serviceEndpoint); - } catch(Exception e){ - logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}", - serviceEndpoint.id(), getCurrentScope(), e); - } - } - }catch(Exception e){ - logger.debug("An Exception occur while checking and/or unpublishing old ServiceEndpoint", e); - } - } - /** * {@inheritDoc} * The method discover the plugins available on classpath and their own @@ -299,34 +43,48 @@ public class SmartExecutorInitializator implements ApplicationManager { */ @Override public void onInit() { - String scope = getCurrentScope(); + String context = ContextUtility.getCurrentContext(); logger.trace( "\n-------------------------------------------------------\n" - + "Smart Executor is Starting on scope {}\n" + + "Smart Executor is Starting on context {}\n" + "-------------------------------------------------------", - scope); + context); logger.debug("Getting Available Plugins and their own supported capabilities"); + PluginManager pluginManager = PluginManager.getInstance(); - Map availablePlugins = pluginManager.getAvailablePlugins(); - ServiceEndpoint serviceEndpoint = createServiceEndpoint(availablePlugins); - cleanServiceEndpoints(); + Map> availablePlugins = pluginManager.getAvailablePlugins(); + ApplicationContext applicationContext = ContextProvider.get(); - try { - publishResource(serviceEndpoint); - } catch (Exception e) { - logger.error("Unable to Create ServiceEndpoint for scope {}. The Service will be aborted", scope, e); - throw new RuntimeException(e); + List isPublishers = ISPublisher.getISPublishers(applicationContext); + for(ISPublisher isPublisher : isPublishers) { + try { + isPublisher.unpublishPlugins(true); + }catch (Exception e) { + logger.error("Unable to unpublish plugins from IS using {}. Trying to continue.", isPublisher.getClass().getName()); + } + try { + isPublisher.publishPlugins(availablePlugins); + }catch (Exception e) { + if(isPublisher instanceof RestISPublisher) { + logger.warn("Unable to create RunningPlugin in context {}. {}", context, e.getMessage()); + } else { + logger.error("Unable to create ServiceEndpoint in context {}. The Service will be aborted", context, e); + throw new RuntimeException(e); + } + } + } + final SmartExecutorPersistenceConnector smartExecutorPersistenceConnector; try { smartExecutorPersistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); } catch (Exception e) { logger.error("Unable to instantiate {} for scope {}. The Service will be aborted", - SmartExecutorPersistenceConnector.class.getSimpleName(), scope, e); + SmartExecutorPersistenceConnector.class.getSimpleName(), context, e); throw new RuntimeException(e); } @@ -334,11 +92,18 @@ public class SmartExecutorInitializator implements ApplicationManager { // Persistence to clean previous situation of a failure of HostingNode try { - logger.debug("Going to get Orphan Scheduled Tasks in scope {}", scope); - - List scheduledTasks = smartExecutorPersistenceConnector.getOrphanScheduledTasks(availablePlugins.values()); + logger.debug("Going to get Orphan Scheduled Tasks in scope {}", context); + + List gotScheduledTasks = smartExecutorPersistenceConnector.getScheduledTasks(pluginManager.getAvailablePlugins().keySet()); + List scheduledTasks = new ArrayList<>(); + for(ScheduledTask scheduledTask : gotScheduledTasks) { + if(smartExecutorPersistenceConnector.isOrphan(scheduledTask, true)) { + scheduledTasks.add(scheduledTask); + } + } + if(scheduledTasks.size()==0){ - logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", scope); + logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", context); } for(final ScheduledTask scheduledTask : scheduledTasks){ @@ -349,7 +114,7 @@ public class SmartExecutorInitializator implements ApplicationManager { // Reserving the task. smartExecutorPersistenceConnector.reserveScheduledTask(scheduledTask); }catch (Exception e) { - logger.debug("({}) Someone else is going to take in charge the scheduled task {}. Skipping.", scope, taskAsString); + logger.debug("({}) Someone else is going to take in charge the scheduled task {}. Skipping.", context, taskAsString); continue; } @@ -360,7 +125,7 @@ public class SmartExecutorInitializator implements ApplicationManager { LaunchParameter launchParameter = scheduledTask.getLaunchParameter(); try { - logger.info("({}) Going to schedule an already scheduled task with the following parameters {}", scope, + logger.info("({}) Going to schedule an already scheduled task with the following parameters {}", context, ExtendedSEMapper.getInstance().marshal(launchParameter)); } catch (Exception e1) { @@ -368,18 +133,18 @@ public class SmartExecutorInitializator implements ApplicationManager { String scheduledTasktoken = scheduledTask.getToken(); try { - setContext(scheduledTasktoken); + ContextUtility.setContext(scheduledTasktoken); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); // A new Scheduled Task will be persisted due to launch. Removing it smartExecutorPersistenceConnector.removeScheduledTask(scheduledTask); smartExecutorScheduler.schedule(launchParameter, scheduledTask.getUUID()); } catch (Exception e) { - logger.error("({}) Error while trying to relaunch scheduled task.", scope, e); + logger.error("({}) Error while trying to relaunch scheduled task.", context, e); try { smartExecutorPersistenceConnector.addScheduledTask(scheduledTask); } catch (Exception ex) { - logger.error("({}) Unable to add back scheduled task {}", scope, taskAsString); + logger.error("({}) Unable to add back scheduled task {}", context, taskAsString); } } @@ -391,14 +156,14 @@ public class SmartExecutorInitializator implements ApplicationManager { } } catch (Exception e) { - logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", scope, e); + logger.error("Unable to get Orphan Scheduled Tasksfor scope {}.", context, e); return; } logger.trace( "\n-------------------------------------------------------\n" - + "Smart Executor Started Successfully on scope {}\n" - + "-------------------------------------------------------", scope); + + "Smart Executor Started Successfully on context {}\n" + + "-------------------------------------------------------", context); } @@ -415,34 +180,42 @@ public class SmartExecutorInitializator implements ApplicationManager { logger.trace( "\n-------------------------------------------------------\n" - + "Smart Executor is Stopping on scope {}\n" + + "Smart Executor is Stopping on context {}\n" + "-------------------------------------------------------", - getCurrentScope()); + ContextUtility.getCurrentContext()); SmartExecutorScheduler scheduler; try { scheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler(); scheduler.stopAll(); - SmartExecutorSchedulerFactory.remove(); + SmartExecutorSchedulerFactory.removeCurrentSmartExecutorScheduler(); } catch (SchedulerException e) { logger.error("", e); } - cleanServiceEndpoints(); + ApplicationContext applicationContext = ContextProvider.get(); + List isPublishers = ISPublisher.getISPublishers(applicationContext); + for(ISPublisher isPublisher : isPublishers) { + try { + isPublisher.unpublishPlugins(false); + }catch (Exception e) { + logger.error("unable to unpublish plugins from IS using {}", isPublisher.getClass().getName()); + } + } try { - SmartExecutorPersistenceFactory.closePersistenceConnector(); + SmartExecutorPersistenceFactory.closeCurrentPersistenceConnector(); } catch (Throwable e) { - logger.error("Unable to correctly close {} for scope {}", + logger.error("Unable to correctly close {} for context {}", SmartExecutorPersistenceConnector.class.getSimpleName(), - getCurrentScope(), e); + ContextUtility.getCurrentContext(), e); } logger.trace( "\n-------------------------------------------------------\n" - + "Smart Executor Stopped Successfully on scope {}\n" + + "Smart Executor Stopped Successfully on context {}\n" + "-------------------------------------------------------", - getCurrentScope()); + ContextUtility.getCurrentContext()); } } diff --git a/src/main/java/org/gcube/vremanagement/executor/ispublisher/GCoreISPublisher.java b/src/main/java/org/gcube/vremanagement/executor/ispublisher/GCoreISPublisher.java new file mode 100644 index 0000000..9ef9865 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/ispublisher/GCoreISPublisher.java @@ -0,0 +1,251 @@ +package org.gcube.vremanagement.executor.ispublisher; + +import java.io.StringWriter; +import java.util.List; +import java.util.Map; + +import org.gcube.common.resources.gcore.Resource; +import org.gcube.common.resources.gcore.Resources; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.Profile; +import org.gcube.common.resources.gcore.ServiceEndpoint.Property; +import org.gcube.common.resources.gcore.ServiceEndpoint.Runtime; +import org.gcube.common.resources.gcore.common.Platform; +import org.gcube.common.resources.gcore.utils.Group; +import org.gcube.informationsystem.publisher.RegistryPublisher; +import org.gcube.informationsystem.publisher.RegistryPublisherFactory; +import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.resources.discovery.icclient.ICFactory; +import org.gcube.smartgears.configuration.application.ApplicationConfiguration; +import org.gcube.smartgears.configuration.container.ContainerConfiguration; +import org.gcube.smartgears.context.application.ApplicationContext; +import org.gcube.smartgears.context.container.ContainerContext; +import org.gcube.vremanagement.executor.ContextUtility; +import org.gcube.vremanagement.executor.exception.ExecutorException; +import org.gcube.vremanagement.executor.exception.PluginNotFoundException; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.gcube.vremanagement.executor.pluginmanager.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GCoreISPublisher extends ISPublisher { + + private static Logger logger = LoggerFactory.getLogger(GCoreISPublisher.class); + + public GCoreISPublisher(ApplicationContext applicationContext) { + super(applicationContext); + } + + /** + * Publish the provided resource on all Service Scopes retrieved from + * Context + * @param resource to be published + * @throws RegistryNotFoundException if the Registry is not found so the + * resource has not be published + */ + private void publishResource(Resource resource) throws Exception { + StringWriter stringWriter = new StringWriter(); + Resources.marshal(resource, stringWriter); + + RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); + + try { + logger.debug("Trying to publish to {}:\n{}", ContextUtility.getCurrentContext(), stringWriter); + registryPublisher.create(resource); + } catch(Exception e) { + logger.error("The resource was not published", e); + throw e; + } + } + + /** + * Remove the resource from IS + * @param resource to be unpublished + * @throws RegistryNotFoundException if the Registry is not found so the + * resource has not be published + */ + private void unPublishResource(Resource resource) throws Exception { + //StringWriter stringWriter = new StringWriter(); + //Resources.marshal(resource, stringWriter); + + RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); + + String id = resource.id(); + logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, + ContextUtility.getCurrentContext()); + + registryPublisher.remove(resource); + + logger.debug("{} with ID {} removed successfully", resource.getClass().getSimpleName(), id); + } + + /** + * Return the parsed version string as array of short. + * @param version the version as String + * @param wantedLenght if the length is equals to dot (.) separated + * number in the string. Otherwise the version is padded or truncated to + * the required version + * @return the parsed version as array of short. If on slicing some of the + * version cannot be parsed as short 1 is used for the first number, 0 is + * used instead or for padding + */ + private short[] getVersionSlice(String version, int wantedLenght) { + logger.trace("Trying to parse {}", version); + + short[] versionSlices = new short[wantedLenght]; + for(int j = 0; j < wantedLenght; j++) { + versionSlices[j] = (short) (j == 0 ? 1 : 0); + } + + try { + String[] stringSlices = version.split("[.-]"); + for(int i = 0; i < stringSlices.length; i++) { + logger.trace("Parsing version slice n. {} wich is '{}'", i, stringSlices[i]); + if(i >= wantedLenght) { + break; + } + try { + short n = Short.parseShort(stringSlices[i]); + versionSlices[i] = n; + logger.trace("Version slice n. {} wich is '{}' parsed as short {}", i, stringSlices[i], n); + } catch(NumberFormatException nfe) { + logger.trace("Version slice n. {} wich is '{}' failed to parse. The default value {} will be used", + i, stringSlices[i], versionSlices[i]); + } + } + } catch(Exception e) { + logger.trace("Error parsing the supplied version the default will be used", versionSlices); + } + + logger.trace("Version {} parsed as {}", version, versionSlices); + return versionSlices; + } + + private static String getRunningOn(ContainerConfiguration containerConfiguration) { + return String.format("%s:%s", containerConfiguration.hostname(), containerConfiguration.port()); + } + + /** + * Create the Service Endpoint using information related to discovered + * available plugins and their own discovered capabilities + * @return the created {@link ServiceEndpoint} + * @throws ExecutorException + * @throws PluginNotFoundException + */ + protected ServiceEndpoint createServiceEndpoint(Map> availablePlugins) throws PluginNotFoundException, ExecutorException { + logger.debug( + "Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities"); + + ApplicationConfiguration applicationConfiguration = applicationContext.configuration(); + + ServiceEndpoint serviceEndpoint = new ServiceEndpoint(); + Profile profile = serviceEndpoint.newProfile(); + profile.category(applicationConfiguration.serviceClass()); + profile.name(applicationConfiguration.name()); + String version = applicationConfiguration.version(); + profile.version(version); + profile.description(applicationConfiguration.description()); + + String runningOn = getRunningOn(applicationContext.container().configuration()); + Platform platform = profile.newPlatform(); + platform.name(runningOn); + + short[] versionSlices = getVersionSlice(version, 4); + platform.version(versionSlices[0]); + platform.minorVersion(versionSlices[1]); + platform.buildVersion(versionSlices[2]); + platform.revisionVersion(versionSlices[3]); + + Runtime runtime = profile.newRuntime(); + runtime.hostedOn(runningOn); + runtime.status(applicationConfiguration.mode().toString()); + + Group accessPoints = profile.accessPoints(); + + PluginManager pluginManager = PluginManager.getInstance(); + + for(String pluginName : availablePlugins.keySet()) { + AccessPoint accessPointElement = new AccessPoint(); + accessPointElement.name(pluginName); + + Plugin pluginDeclaration = pluginManager.getPlugin(pluginName); + + accessPointElement.description(pluginDeclaration.getDescription()); + + Group properties = accessPointElement.properties(); + Property propertyVersionElement = new Property(); + propertyVersionElement.nameAndValue("Version", pluginDeclaration.getVersion()); + properties.add(propertyVersionElement); + + Map pluginCapabilities = pluginDeclaration.getSupportedCapabilities(); + if(pluginCapabilities!=null) { + for(String capabilityName : pluginCapabilities.keySet()) { + Property propertyElement = new Property(); + propertyElement.nameAndValue(capabilityName, pluginCapabilities.get(capabilityName)); + properties.add(propertyElement); + } + } + accessPoints.add(accessPointElement); + } + + StringWriter stringWriter = new StringWriter(); + Resources.marshal(serviceEndpoint, stringWriter); + logger.debug("The created ServiceEndpoint profile is\n{}", stringWriter.toString()); + + return serviceEndpoint; + } + + protected void cleanServiceEndpoints() { + try { + ApplicationConfiguration applicationConfiguration = applicationContext.configuration(); + ContainerContext containerContext = applicationContext.container(); + + SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class) + .addCondition(String.format("$resource/Profile/Category/text() eq '%s'", + applicationConfiguration.serviceClass())) + .addCondition(String.format("$resource/Profile/Name/text() eq '%s'", + applicationConfiguration.name())) + .addCondition(String.format("$resource/Profile/RunTime/HostedOn/text() eq '%s'", + getRunningOn(containerContext.configuration()))) + .setResult("$resource"); + + DiscoveryClient client = ICFactory.clientFor(ServiceEndpoint.class); + List serviceEndpoints = client.submit(query); + + for(ServiceEndpoint serviceEndpoint : serviceEndpoints) { + try { + logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}", + serviceEndpoint.id(), ContextUtility.getCurrentContext()); + unPublishResource(serviceEndpoint); + } catch(Exception e) { + logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}", + serviceEndpoint.id(), ContextUtility.getCurrentContext(), e); + } + } + } catch(Exception e) { + logger.debug("An Exception occur while checking and/or unpublishing old ServiceEndpoint", e); + } + } + + @Override + public void publishPlugins(Map> availablePlugins) { + try { + ServiceEndpoint serviceEndpoint = createServiceEndpoint(availablePlugins); + cleanServiceEndpoints(); + publishResource(serviceEndpoint); + } catch(Exception e) { + logger.error("Unable to Create ServiceEndpoint for scope {}. The Service will be aborted", + ContextUtility.getCurrentContext(), e); + throw new RuntimeException(e); + } + } + + @Override + public void unpublishPlugins(boolean force) throws Exception { + cleanServiceEndpoints(); + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/ispublisher/ISPublisher.java b/src/main/java/org/gcube/vremanagement/executor/ispublisher/ISPublisher.java new file mode 100644 index 0000000..5478312 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/ispublisher/ISPublisher.java @@ -0,0 +1,33 @@ +package org.gcube.vremanagement.executor.ispublisher; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.gcube.smartgears.context.application.ApplicationContext; +import org.gcube.vremanagement.executor.plugin.Plugin; + +public abstract class ISPublisher { + + protected static List isPublishers; + + public synchronized static List getISPublishers(ApplicationContext applicationContext){ + if(isPublishers==null) { + isPublishers = new ArrayList<>(); + isPublishers.add(new GCoreISPublisher(applicationContext)); + isPublishers.add(new RestISPublisher(applicationContext)); + } + return isPublishers; + } + + protected ApplicationContext applicationContext; + + public ISPublisher(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + public abstract void publishPlugins(Map> availablePlugins) throws Exception; + + public abstract void unpublishPlugins(boolean force) throws Exception ; + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/ispublisher/RestISPublisher.java b/src/main/java/org/gcube/vremanagement/executor/ispublisher/RestISPublisher.java new file mode 100644 index 0000000..8b47fc6 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/ispublisher/RestISPublisher.java @@ -0,0 +1,154 @@ +package org.gcube.vremanagement.executor.ispublisher; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.gcube.informationsystem.model.impl.properties.HeaderImpl; +import org.gcube.informationsystem.model.impl.properties.PropagationConstraintImpl; +import org.gcube.informationsystem.model.reference.entities.Facet; +import org.gcube.informationsystem.model.reference.entities.Resource; +import org.gcube.informationsystem.model.reference.properties.PropagationConstraint; +import org.gcube.informationsystem.model.reference.properties.PropagationConstraint.AddConstraint; +import org.gcube.informationsystem.model.reference.properties.PropagationConstraint.RemoveConstraint; +import org.gcube.informationsystem.resourceregistry.api.exceptions.AvailableInAnotherContextException; +import org.gcube.informationsystem.resourceregistry.api.exceptions.NotFoundException; +import org.gcube.informationsystem.resourceregistry.api.exceptions.ResourceRegistryException; +import org.gcube.informationsystem.resourceregistry.client.Direction; +import org.gcube.informationsystem.resourceregistry.client.ResourceRegistryClient; +import org.gcube.informationsystem.resourceregistry.client.ResourceRegistryClientFactory; +import org.gcube.informationsystem.resourceregistry.publisher.ResourceRegistryPublisher; +import org.gcube.informationsystem.resourceregistry.publisher.ResourceRegistryPublisherFactory; +import org.gcube.resourcemanagement.model.impl.entities.facets.SimplePropertyFacetImpl; +import org.gcube.resourcemanagement.model.impl.entities.facets.SoftwareFacetImpl; +import org.gcube.resourcemanagement.model.impl.entities.resources.RunningPluginImpl; +import org.gcube.resourcemanagement.model.impl.relations.consistsof.IsIdentifiedByImpl; +import org.gcube.resourcemanagement.model.impl.relations.isrelatedto.ActivatesImpl; +import org.gcube.resourcemanagement.model.impl.relations.isrelatedto.EnablesImpl; +import org.gcube.resourcemanagement.model.reference.entities.facets.SimplePropertyFacet; +import org.gcube.resourcemanagement.model.reference.entities.facets.SoftwareFacet; +import org.gcube.resourcemanagement.model.reference.entities.resources.EService; +import org.gcube.resourcemanagement.model.reference.entities.resources.RunningPlugin; +import org.gcube.resourcemanagement.model.reference.entities.resources.Service; +import org.gcube.resourcemanagement.model.reference.entities.resources.Software; +import org.gcube.resourcemanagement.model.reference.relations.consistsof.IsIdentifiedBy; +import org.gcube.resourcemanagement.model.reference.relations.isrelatedto.Activates; +import org.gcube.resourcemanagement.model.reference.relations.isrelatedto.Enables; +import org.gcube.smartgears.context.application.ApplicationContext; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.gcube.vremanagement.executor.pluginmanager.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RestISPublisher extends ISPublisher { + + private static Logger logger = LoggerFactory.getLogger(RestISPublisher.class); + + protected final UUID eServiceUUID; + protected ResourceRegistryClient resourceRegistryClient; + protected ResourceRegistryPublisher resourceRegistryPublisher; + + + + public RestISPublisher(ApplicationContext applicationContext) { + super(applicationContext); + this.eServiceUUID = UUID.fromString(applicationContext.id()); + this.resourceRegistryClient = ResourceRegistryClientFactory.create(); + this.resourceRegistryPublisher = ResourceRegistryPublisherFactory.create(); + } + + @SuppressWarnings("unused") + protected RunningPlugin publishRunningPluginWithRelations(Plugin plugin, UUID pluginUUID) throws Exception { + + RunningPlugin runningPlugin = new RunningPluginImpl(); + runningPlugin.setHeader(new HeaderImpl(pluginUUID)); + + SoftwareFacet softwareFacet = new SoftwareFacetImpl(); + softwareFacet.setGroup(plugin.getGroup()); + softwareFacet.setName(plugin.getName()); + softwareFacet.setVersion(plugin.getVersion()); + softwareFacet.setDescription(plugin.getDescription()); + + IsIdentifiedBy isIdentifiedBy = new IsIdentifiedByImpl(runningPlugin, softwareFacet); + runningPlugin.addFacet(isIdentifiedBy); + + Map pluginCapabilities = plugin.getSupportedCapabilities(); + if(pluginCapabilities!=null) { + for(String capabilityName : pluginCapabilities.keySet()) { + SimplePropertyFacet simplePropertyFacet = new SimplePropertyFacetImpl(); + simplePropertyFacet.setName(capabilityName); + simplePropertyFacet.setValue(pluginCapabilities.get(capabilityName)); + runningPlugin.addFacet(simplePropertyFacet); + } + } + + EService smartExecutorEService = resourceRegistryClient.getInstance(EService.class, eServiceUUID); + PropagationConstraint usesPropagationConstraint = new PropagationConstraintImpl(); + usesPropagationConstraint.setAddConstraint(AddConstraint.propagate); + usesPropagationConstraint.setRemoveConstraint(RemoveConstraint.cascade); + + Activates activates = new ActivatesImpl(smartExecutorEService, runningPlugin, usesPropagationConstraint); + try { + resourceRegistryPublisher.createIsRelatedTo(activates); + } catch (ResourceRegistryException e) { + logger.error("Unable to publish %s instace %s for plugin %s. I'm going to stop the service.", Resource.NAME, RunningPlugin.NAME, plugin.getName()); + throw e; + } + + org.gcube.resourcemanagement.model.reference.entities.resources.Plugin pluginResource = null; + if(pluginResource!=null) { // The if allows not commenting the following code in the meanwhile the pluginResource retrieving is properly coded + PropagationConstraint enablesPropagationConstraint = new PropagationConstraintImpl(); + enablesPropagationConstraint.setAddConstraint(AddConstraint.propagate); + enablesPropagationConstraint.setRemoveConstraint(RemoveConstraint.keep); + Enables enables = new EnablesImpl(runningPlugin, pluginResource, enablesPropagationConstraint); + try { + resourceRegistryPublisher.createIsRelatedTo(enables); + } catch (ResourceRegistryException e) { + logger.error("Unable to publish %s instace %s for plugin %s. I'm going to stop the service.", Resource.NAME, RunningPlugin.NAME, plugin.getName()); + throw e; + } + } + + return runningPlugin; + } + + @Override + public void publishPlugins(Map> availablePlugins) throws Exception { + PluginManager pluginManager = PluginManager.getInstance(); + + for(String pluginName : availablePlugins.keySet()) { + + Plugin plugin = pluginManager.getPlugin(pluginName); + UUID pluginUUID = pluginManager.getPluginUUID(pluginName); + + RunningPlugin runningPlugin; + + try { + runningPlugin = resourceRegistryClient.getInstance(RunningPlugin.class, pluginUUID); + } catch (NotFoundException e) { + runningPlugin = publishRunningPluginWithRelations(plugin, pluginUUID); + } catch (AvailableInAnotherContextException e) { + runningPlugin = new RunningPluginImpl(); + runningPlugin.setHeader(new HeaderImpl(pluginUUID)); + resourceRegistryPublisher.addToCurrentContext(runningPlugin); + } catch (ResourceRegistryException e) { + throw e; + } + + } + } + + @Override + public void unpublishPlugins(boolean force) throws Exception { + if(force) { + List runningPlugins = resourceRegistryClient.getRelatedResourcesFromReferenceResource(RunningPlugin.class, Activates.class, EService.class, this.eServiceUUID, Direction.IN, true); + for(RunningPlugin runningPlugin : runningPlugins) { + resourceRegistryPublisher.delete(runningPlugin); + } + }else { + logger.info("The Plugin will be removed when the Eservice will be removed thanks to propagation contraints. Nothing to do"); + } + + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/json/ExtendedSEMapper.java b/src/main/java/org/gcube/vremanagement/executor/json/ExtendedSEMapper.java index 8ab8f10..8f7d20e 100644 --- a/src/main/java/org/gcube/vremanagement/executor/json/ExtendedSEMapper.java +++ b/src/main/java/org/gcube/vremanagement/executor/json/ExtendedSEMapper.java @@ -3,6 +3,7 @@ */ package org.gcube.vremanagement.executor.json; +import org.gcube.com.fasterxml.jackson.annotation.JsonTypeInfo; import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.library.provider.ContainerInfo; import org.gcube.common.authorization.library.provider.ExternalServiceInfo; @@ -10,8 +11,6 @@ import org.gcube.common.authorization.library.provider.ServiceInfo; import org.gcube.common.authorization.library.provider.UserInfo; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - /** * @author Luca Frosini (ISTI - CNR) * diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java index ccc0ec1..a63ce1c 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java @@ -16,7 +16,7 @@ import org.gcube.common.resources.gcore.utils.Group; import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.queries.api.SimpleQuery; import org.gcube.resources.discovery.icclient.ICFactory; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; /** * @author Luca Frosini (ISTI - CNR) @@ -131,7 +131,7 @@ public class SmartExecutorPersistenceConfiguration { List serviceEndpoints = client.submit(query); if(serviceEndpoints.size()>1){ query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", TARGET_SCOPE)); - query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getCurrentScope())); + query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", ContextUtility.getCurrentContext())); serviceEndpoints = client.submit(query); } return serviceEndpoints.get(0); diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java index 53cdbb2..f464a49 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java @@ -54,7 +54,7 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif */ public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException; - protected boolean isOrphan(ScheduledTask scheduledTask) throws ExecutorException { + public boolean isOrphan(ScheduledTask scheduledTask, boolean sameHost) throws ExecutorException { try { UUID uuid = scheduledTask.getUUID(); @@ -64,19 +64,20 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif } try { - HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class); - String hnAddress = hostingNode.profile().description().name(); - - if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){ - return true; + if(sameHost) { + HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class); + String hnAddress = hostingNode.profile().description().name(); + + if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){ + return true; + } } }catch (Exception e) { logger.error("Unable to check if current hosting node is the same of the one in ScheduledTask", e); } String address = runOn.getEService().getAddress(); - String pluginName = scheduledTask.getLaunchParameter() - .getPluginName(); + String pluginName = scheduledTask.getLaunchParameter().getPluginName(); try { SmartExecutorClientImpl smartExecutorClient = new SmartExecutorClientImpl(); diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java index 433eaf0..82a3a99 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java @@ -6,7 +6,7 @@ package org.gcube.vremanagement.executor.persistence; import java.util.HashMap; import java.util.Map; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.persistence.orientdb.OrientDBPersistenceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,58 +18,75 @@ import org.slf4j.LoggerFactory; public abstract class SmartExecutorPersistenceFactory { private static final Logger logger = LoggerFactory.getLogger(SmartExecutorPersistenceFactory.class); - + private static Map persistenceConnectors; - + static { persistenceConnectors = new HashMap(); } - - private static SmartExecutorPersistenceConnector getPersistenceConnector(String scope) throws Exception { - if(scope==null){ - String error = "No Scope available."; + + private static synchronized SmartExecutorPersistenceConnector getPersistenceConnector(String context) throws Exception { + if (context == null) { + String error = "No Context available."; logger.error(error); - throw new RuntimeException(error); + throw new RuntimeException(error); } - logger.trace("Retrieving {} for scope {}", - SmartExecutorPersistenceConnector.class.getSimpleName(), scope); - - SmartExecutorPersistenceConnector persistence = persistenceConnectors.get(scope); - - if(persistence==null){ - logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.", - SmartExecutorPersistenceConnector.class.getSimpleName(), - scope, Map.class.getSimpleName()); - + logger.trace("Retrieving {} for context {}", SmartExecutorPersistenceConnector.class.getSimpleName(), context); + + SmartExecutorPersistenceConnector persistence = persistenceConnectors.get(context); + + if (persistence == null) { + logger.trace("Retrieving {} for context {} not found on internal {}. Intializing it.", + SmartExecutorPersistenceConnector.class.getSimpleName(), context, Map.class.getSimpleName()); + String className = OrientDBPersistenceConnector.class.getSimpleName(); - SmartExecutorPersistenceConfiguration configuration = - new SmartExecutorPersistenceConfiguration(className); - + SmartExecutorPersistenceConfiguration configuration = new SmartExecutorPersistenceConfiguration(className); + persistence = new OrientDBPersistenceConnector(configuration); - persistenceConnectors.put(SmartExecutorInitializator.getCurrentScope(), - persistence); + persistenceConnectors.put(ContextUtility.getCurrentContext(), persistence); } - + return persistence; } - + /** * @return the persistenceConnector */ - public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception { - String scope = SmartExecutorInitializator.getCurrentScope(); - return getPersistenceConnector(scope); + public static SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception { + String context = ContextUtility.getCurrentContext(); + return getPersistenceConnector(context); } - - public static synchronized void closePersistenceConnector() throws Exception { - String scope = SmartExecutorInitializator.getCurrentScope(); - SmartExecutorPersistenceConnector persistence = - getPersistenceConnector(scope); - if(persistence!=null){ + + public static void closeCurrentPersistenceConnector() throws Exception { + String context = ContextUtility.getCurrentContext(); + closePersistenceConnector(context); + } + + private static synchronized void closePersistenceConnector(String context) throws Exception { + SmartExecutorPersistenceConnector persistence = getPersistenceConnector(context); + if (persistence != null) { persistence.close(); - persistenceConnectors.remove(scope); + persistenceConnectors.remove(context); + if(persistenceConnectors.isEmpty()) { + + } } } + public static void closeAll() { + for (String context : persistenceConnectors.keySet()) { + try { + closePersistenceConnector(context); + } catch (Exception e) { + logger.error("Unable to close {} for context {}", + SmartExecutorPersistenceConnector.class.getSimpleName(), context); + } + } + } + + public static void shutdown() { + OrientDBPersistenceConnector.shutdown(); + } + } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java index bec3da8..7452858 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java @@ -11,7 +11,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.exception.ExecutorException; import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException; @@ -19,12 +19,12 @@ import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.db.ODatabasePool; import com.orientechnologies.orient.core.db.ODatabaseSession; import com.orientechnologies.orient.core.record.impl.ODocument; @@ -38,10 +38,10 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec private static final Logger logger = LoggerFactory.getLogger(OrientDBPersistenceConnector.class); - protected final String SCOPE = "scope"; - protected final String UUID = "uuid"; - protected final String ITERATION = "iteration"; - protected final String TIMESTAMP = "timestamp"; + protected final static String CONTEXT = "context"; + protected final static String UUID = "uuid"; + protected final static String ITERATION = "iteration"; + protected final static String TIMESTAMP = "timestamp"; protected final String RUN_ON = "runOn"; @@ -53,6 +53,8 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception { + Orient.instance().removeShutdownHook(); + logger.debug("Preparing Connection for {}", this.getClass().getSimpleName()); String url = configuration.getURL(); String username = configuration.getUsername(); @@ -64,6 +66,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec @Override public void close() throws Exception { oDatabasePool.close(); + oDatabasePool = null; } @Override @@ -75,13 +78,13 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec String type = PluginStateEvolution.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); - params.put(SCOPE, SmartExecutorInitializator.getCurrentScope()); + params.put(CONTEXT, ContextUtility.getCurrentContext()); OSQLSynchQuery query = null; if(iterationNumber != null && iterationNumber > 0) { query = new OSQLSynchQuery(String.format( - "SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1", type, SCOPE, - SCOPE, UUID, UUID, ITERATION, ITERATION, TIMESTAMP)); + "SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1", type, CONTEXT, + CONTEXT, UUID, UUID, ITERATION, ITERATION, TIMESTAMP)); params.put(ITERATION, iterationNumber); } else { /* @@ -92,7 +95,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec */ query = new OSQLSynchQuery( String.format("SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC, %s DESC LIMIT 1", - type, SCOPE, SCOPE, UUID, UUID, ITERATION, TIMESTAMP)); + type, CONTEXT, CONTEXT, UUID, UUID, ITERATION, TIMESTAMP)); } List result = query.execute(params); @@ -124,9 +127,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } catch(Exception e) { throw new PluginInstanceNotFoundException(uuid); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @@ -138,7 +139,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec ODocument doc = new ODocument(PluginStateEvolution.class.getSimpleName()); String json = ExtendedSEMapper.getInstance().marshal(pluginStateEvolution); doc.fromJSON(json); - doc.field(SCOPE, SmartExecutorInitializator.getCurrentScope()); + doc.field(CONTEXT, ContextUtility.getCurrentContext()); doc.save(); oDatabaseSession.commit(); @@ -148,9 +149,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } throw e; } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @@ -176,35 +175,33 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } throw new SchedulePersistenceException(e); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @Override - public List getOrphanScheduledTasks(Collection pluginDeclarations) + public List getScheduledTasks(Collection plugins) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); String type = ScheduledTask.class.getSimpleName(); - String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'", type, "scope", - SmartExecutorInitializator.getCurrentScope()); - if(pluginDeclarations != null && pluginDeclarations.size() != 0) { + String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'", type, "context", + ContextUtility.getCurrentContext()); + if(plugins != null && plugins.size() != 0) { boolean first = true; - for(PluginDeclaration pluginDeclaration : pluginDeclarations) { + for(String pluginName : plugins) { if(first) { first = false; queryString = String.format("%s AND ( (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, - pluginDeclaration.getName()); + pluginName); } else { queryString = String.format("%s OR (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, - pluginDeclaration.getName()); + pluginName); } } queryString = queryString + ")"; @@ -218,25 +215,15 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec for(ODocument doc : result) { String json = doc.toJSON("class"); - ScheduledTask scheduledTask = ExtendedSEMapper.getInstance().unmarshal(ScheduledTask.class, json); - try { - if(isOrphan(scheduledTask)) { - scheduledTasks.add(scheduledTask); - } - } catch(Exception e) { - logger.error("An Exception occurred while evaluating if {} is orphan", json, e); - } - + scheduledTasks.add(scheduledTask); } return scheduledTasks; } catch(Exception e) { throw new SchedulePersistenceException(e); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @@ -279,9 +266,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } catch(Exception e) { throw new SchedulePersistenceException(e); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @@ -304,9 +289,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } throw new SchedulePersistenceException(e); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); } } @@ -329,9 +312,14 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec } throw new SchedulePersistenceException(e); } finally { - if(oDatabaseSession != null) { - oDatabaseSession.close(); - } + closeDatabaseSession(oDatabaseSession); + } + } + + private void closeDatabaseSession(ODatabaseSession oDatabaseSession) { + if(oDatabaseSession != null) { + oDatabaseSession.close(); + oDatabaseSession = null; } } @@ -339,4 +327,9 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec public void releaseScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { releaseScheduledTask(scheduledTask.getUUID()); } + + public static void shutdown(){ + //Orient.OShutdownOrientDBInstancesHandler. + Orient.instance().shutdown(); + } } diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java index ecb8789..4652c67 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java @@ -6,13 +6,12 @@ package org.gcube.vremanagement.executor.pluginmanager; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; import org.gcube.vremanagement.executor.plugin.PercentageSetter; import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; /** * @author Luca Frosini (ISTI - CNR) * */ -public class PercentageSetterImpl> implements PercentageSetter { +public class PercentageSetterImpl implements PercentageSetter { private final RunnablePlugin runnablePlugin; diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java index 372d78a..8c957ed 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java @@ -1,15 +1,13 @@ package org.gcube.vremanagement.executor.pluginmanager; -import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.UUID; -import org.gcube.vremanagement.executor.exception.InputsNullException; +import org.gcube.vremanagement.executor.exception.ExecutorException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException; -import org.gcube.vremanagement.executor.persistence.Persistence; import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +17,6 @@ import org.slf4j.LoggerFactory; * The plugin implementation class can be retrieved using its name. * @author Luca Frosini (ISTI - CNR) */ -@SuppressWarnings("deprecation") public class PluginManager { /** @@ -36,82 +33,9 @@ public class PluginManager { * Contains mapping between plugin name and the instance of its declaration * class */ - private Map availablePlugins; + private Map> availablePlugins; - /** - * Retrieve the PluginDeclaration class representing the plugin which - * have the name provided as input - * @param pluginName the name of the plugin - * @return the PluginDeclaration - * @throws PluginNotFoundException if the plugin is not available - */ - public static PluginDeclaration getPluginDeclaration(String pluginName) throws PluginNotFoundException { - logger.debug(String.format("Trying to instantiate a Plugin named %s", - pluginName)); - PluginDeclaration pluginDeclaration = PluginManager.getInstance() - .getPlugin(pluginName); - if (pluginDeclaration == null) { - throw new PluginNotFoundException(); - } - return pluginDeclaration; - } - - public static Plugin instantiatePlugin( - String pluginName) throws InputsNullException, - PluginNotFoundException { - - PluginDeclaration pluginDeclaration = getPluginDeclaration(pluginName); - - // Retrieving the plugin instance class to be run from PluginDeclaration - Class> plugin = pluginDeclaration - .getPluginImplementation(); - logger.debug(String.format( - "The class which will run the execution will be %s", - plugin.getName())); - - // Retrieve the Constructor of Plugin to instantiate it - @SuppressWarnings("rawtypes") - Class[] argTypes = { pluginDeclaration.getClass()}; - - // Creating the Argument to pass to constructor - Object[] arguments = { pluginDeclaration}; - - - // logger.debug(String.format("Plugin named %s once instantiated will be identified by the UUID %s", - // name, executionIdentifier)); - Constructor> executorPluginConstructor; - try { - executorPluginConstructor = plugin.getDeclaredConstructor(argTypes); - } catch (Exception e) { - - /* Maintaining backward compatibility */ - argTypes = new Class[2]; - argTypes[0] = pluginDeclaration.getClass(); - argTypes[1] = Persistence.class; - - try { - executorPluginConstructor = plugin.getDeclaredConstructor(argTypes); - arguments = new Object[2]; - arguments[0] = pluginDeclaration; - arguments[1] = null; - } catch (Exception e1) { - throw new PluginNotFoundException(); - } - } - - // Instancing the plugin - Plugin instantiatedPlugin; - try { - instantiatedPlugin = executorPluginConstructor - .newInstance(arguments); - } catch (Exception e) { - throw new PluginNotFoundException(); - } - logger.debug(String - .format("Plugin named %s has been instantiated", pluginName)); - - return instantiatedPlugin; - } + private Map uuids; /** * Get the singleton instance of {@link #PluginManager}. @@ -119,7 +43,7 @@ public class PluginManager { * so it is created. Otherwise the already created instance is returned * @return singleton instance of {@link #PluginManager} */ - public static PluginManager getInstance(){ + public synchronized static PluginManager getInstance(){ if(pluginManager== null){ pluginManager = new PluginManager(); } @@ -132,33 +56,50 @@ public class PluginManager { */ protected PluginManager(){ logger.debug("Loading plugins available on classpath"); - this.availablePlugins = new HashMap(); - ServiceLoader serviceLoader = ServiceLoader.load(PluginDeclaration.class); - for (PluginDeclaration pluginDeclaration : serviceLoader) { + this.availablePlugins = new HashMap<>(); + this.uuids = new HashMap<>(); + ServiceLoader serviceLoader = ServiceLoader.load(Plugin.class); + for (Plugin plugin : serviceLoader) { try { - logger.debug(String.format("%s plugin found", pluginDeclaration.getName())); - pluginDeclaration.init(); - String name = pluginDeclaration.getName(); - this.availablePlugins.put(name, pluginDeclaration); + logger.debug(String.format("%s plugin found", plugin.getName())); + String name = plugin.getName(); + this.availablePlugins.put(name, plugin.getClass()); + this.uuids.put(name, UUID.randomUUID()); } catch (Exception e) { - logger.debug(String.format("%s not initialized correctly. It will not be used", pluginDeclaration.getName())); + logger.debug(String.format("%s not initialized correctly. It will not be used", plugin.getName())); } } } /** * - * @param name The name of the plugin + * @param pluginName The name of the plugin * @return The plugin declaration if available, null otherwise + * @throws PluginNotFoundException */ - public PluginDeclaration getPlugin(String name){ - return this.availablePlugins.get(name); + public Plugin getPlugin(String pluginName) throws PluginNotFoundException, ExecutorException { + Class pluginClass = getAvailablePlugins().get(pluginName); + if (pluginClass== null) { + throw new PluginNotFoundException("Plugin " + pluginName + " not available in this smart-executor instance"); + } + try { + return pluginClass.getDeclaredConstructor().newInstance(); + }catch (Exception e) { + throw new ExecutorException("Unable to instatiate plugin " + pluginName, e); + } + } + + + public UUID getPluginUUID(String pluginName) { + return uuids.get(pluginName); } /** * @return the availablePlugins */ - public Map getAvailablePlugins() { + public Map> getAvailablePlugins() { return availablePlugins; } + + } diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java index 29b9438..c34b142 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java @@ -18,11 +18,10 @@ import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.smartgears.ContextProvider; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.plugin.PluginStateNotification; @@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory; * @author Luca Frosini (ISTI - CNR) * */ -public class RunnablePlugin> implements Runnable { +public class RunnablePlugin implements Runnable { /** * Logger @@ -81,10 +80,10 @@ public class RunnablePlugin> imple @Override public void run(){ - String pluginName = plugin.getPluginDeclaration().getName(); + String pluginName = plugin.getName(); logger.info("{} : {} is going to be launched (UUID={}, iterationNumber={}) with the following inputs {}", - pluginName, plugin.getPluginDeclaration().getVersion(), + pluginName, plugin.getVersion(), uuid, iterationNumber, inputs); JobUsageRecord jobUsageRecord = new JobUsageRecord(); @@ -92,7 +91,7 @@ public class RunnablePlugin> imple long startTime = actualStateEvolution.getTimestamp(); try { - SmartExecutorInitializator.setContext(token); + ContextUtility.setContext(token); setState(PluginState.RUNNING); @@ -103,7 +102,7 @@ public class RunnablePlugin> imple jobUsageRecord.setHost(hnRef.getAddress()); - ClientInfo clientInfo = SmartExecutorInitializator.getClientInfo(); + ClientInfo clientInfo = ContextUtility.getClientInfo(); String consumerId = clientInfo.getId(); jobUsageRecord.setConsumerId(consumerId); @@ -222,7 +221,7 @@ public class RunnablePlugin> imple throw new AlreadyInFinalStateException(); } - PluginStateEvolution pluginStateEvolution = new PluginStateEvolution(uuid, iterationNumber, timestamp, plugin.getPluginDeclaration(), pluginState, percentage); + PluginStateEvolution pluginStateEvolution = new PluginStateEvolution(uuid, iterationNumber, timestamp, plugin, pluginState, percentage); for(PluginStateNotification pluginStateNotification : pluginStateNotifications){ String pluginStateNotificationName = pluginStateNotification.getClass().getSimpleName(); @@ -241,7 +240,7 @@ public class RunnablePlugin> imple public String toString(){ return String.format("UUID : %s, Iteration : %d, Plugin : %s", uuid.toString(), iterationNumber, - plugin.getPluginDeclaration().getName()); + plugin.getName()); } /** diff --git a/src/main/java/org/gcube/vremanagement/executor/rest/RestSmartExecutor.java b/src/main/java/org/gcube/vremanagement/executor/rest/RestSmartExecutor.java index 2d9531a..a70a7c2 100644 --- a/src/main/java/org/gcube/vremanagement/executor/rest/RestSmartExecutor.java +++ b/src/main/java/org/gcube/vremanagement/executor/rest/RestSmartExecutor.java @@ -20,6 +20,7 @@ import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriInfo; +import org.gcube.com.fasterxml.jackson.core.JsonProcessingException; import org.gcube.common.authorization.library.provider.CalledMethodProvider; import org.gcube.vremanagement.executor.ResourceInitializer; import org.gcube.vremanagement.executor.annotation.PURGE; @@ -34,7 +35,8 @@ import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.json.SEMapper; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.gcube.vremanagement.executor.plugin.PluginDefinition; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; @@ -44,8 +46,6 @@ import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; - @Path(RestConstants.PLUGINS_PATH_PART) public class RestSmartExecutor { @@ -78,9 +78,12 @@ public class RestSmartExecutor { setCalledMethod("getAvailablePlugins"); try { PluginManager pluginManager = PluginManager.getInstance(); - Map availablePlugins = pluginManager.getAvailablePlugins(); - List plugins = new ArrayList(availablePlugins.values()); - return ExtendedSEMapper.getInstance().marshal(PluginDeclaration.class, plugins); + Map> availablePlugins = pluginManager.getAvailablePlugins(); + List plugins = new ArrayList<>(); + for(String pluginName : availablePlugins.keySet()) { + plugins.add(pluginManager.getPlugin(pluginName)); + } + return ExtendedSEMapper.getInstance().marshal(PluginDefinition.class, plugins); }catch (Exception e) { throw new ExecutorException(e); } @@ -96,25 +99,32 @@ public class RestSmartExecutor { SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory .getPersistenceConnector(); - List pluginDeclarations = new ArrayList<>(); + PluginManager.getInstance(); + + List plugins = new ArrayList<>(); + boolean orphan = false; if(pluginName.compareTo(RestConstants.ORPHAN_PATH_PARAM)!=0) { - PluginManager pluginManager = PluginManager.getInstance(); - Map availablePlugins = pluginManager.getAvailablePlugins(); - PluginDeclaration pluginDeclaration = availablePlugins.get(pluginName); - if(pluginDeclaration==null) { - String error = String.format("This SmartExecutor instace does not manage any plugin with name %s", pluginName); - logger.error(error); - throw new ExecutorException(error); - }else { - pluginDeclarations.add(pluginDeclaration); - } + plugins.add(pluginName); }else { - // TODO check role + // plugins.addAll(pluginManager.getAvailablePlugins().keySet()); + orphan = true; } - List scheduledTasks = persistenceConnector.getOrphanScheduledTasks(pluginDeclarations); + List gotScheduledTasks = persistenceConnector.getScheduledTasks(plugins); + List scheduledTasks; + + if(orphan) { + scheduledTasks = new ArrayList<>(); + for(ScheduledTask scheduledTask : gotScheduledTasks) { + if(persistenceConnector.isOrphan(scheduledTask, false)) { + scheduledTasks.add(scheduledTask); + } + } + }else { + scheduledTasks = gotScheduledTasks; + } /* * Using SEMapper because the server must not return sensitive information like token @@ -203,10 +213,10 @@ public class RestSmartExecutor { throw new ExecutorException(e); } - if(pluginName.compareTo(pluginStateEvolution.getPluginDeclaration().getName()) != 0) { + if(pluginName.compareTo(pluginStateEvolution.getPluginDefinition().getName()) != 0) { String error = String.format( "Plugin Name provided in the URL (%s) does not match with the one got from %s (%s)", pluginName, - PluginStateEvolution.class.getSimpleName(), pluginStateEvolution.getPluginDeclaration().getName()); + PluginStateEvolution.class.getSimpleName(), pluginStateEvolution.getPluginDefinition().getName()); throw new InvalidInputsException(error); } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java index e266eea..ca28ea9 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java @@ -5,6 +5,7 @@ package org.gcube.vremanagement.executor.scheduledtask; import java.util.UUID; +import org.gcube.com.fasterxml.jackson.annotation.JsonTypeInfo; import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.resources.gcore.GCoreEndpoint; @@ -13,15 +14,13 @@ import org.gcube.common.resources.gcore.HostingNode; import org.gcube.common.resources.gcore.utils.Group; import org.gcube.smartgears.Constants; import org.gcube.smartgears.ContextProvider; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.json.SEMapper; import org.gcube.vremanagement.executor.plugin.Ref; import org.gcube.vremanagement.executor.plugin.RunOn; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -29,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; public class ScheduledTask extends org.gcube.vremanagement.executor.plugin.ScheduledTask { protected String token; - protected String scope; + protected String context; protected ClientInfo clientInfo; protected ScheduledTask() { @@ -43,8 +42,8 @@ public class ScheduledTask extends org.gcube.vremanagement.executor.plugin.Sched this.uuid = uuid; this.launchParameter = launchParameter; this.token = SecurityTokenProvider.instance.get(); - this.scope = SmartExecutorInitializator.getCurrentScope(); - this.clientInfo = SmartExecutorInitializator.getClientInfo(); + this.context = ContextUtility.getCurrentContext(); + this.clientInfo = ContextUtility.getClientInfo(); this.runOn = runOn; } @@ -58,8 +57,8 @@ public class ScheduledTask extends org.gcube.vremanagement.executor.plugin.Sched /** * @return the scope */ - public String getScope() { - return scope; + public String getContext() { + return context; } /** @@ -110,7 +109,7 @@ public class ScheduledTask extends org.gcube.vremanagement.executor.plugin.Sched try { return ExtendedSEMapper.getInstance().marshal(this); } catch(Exception e) { - return "ScheduledTask [token=" + token + ", scope=" + scope + ", clientInfo=" + clientInfo + ", uuid=" + return "ScheduledTask [token=" + token + ", context=" + context + ", clientInfo=" + clientInfo + ", uuid=" + uuid + ", launchParameter=" + launchParameter + ", runOn=" + runOn + "]"; } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java index 25e6fa0..2b9f70a 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java @@ -8,7 +8,6 @@ import java.util.List; import java.util.UUID; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; /** * @author Luca Frosini (ISTI - CNR) @@ -18,13 +17,12 @@ public interface ScheduledTaskPersistence { /** * Retrieve from the #SmartExecutorPersistenceConnector the orphaned * Scheduled tasks - * @param pluginDeclarations + * @param plugins * @return the list of orphaned Scheduled * @throws SchedulePersistenceException * if fails */ - public List getOrphanScheduledTasks( - Collection pluginDeclarations) + public List getScheduledTasks(Collection plugins) throws SchedulePersistenceException; /** diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java index f35e66d..9a5e213 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java @@ -14,6 +14,7 @@ import java.util.UUID; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.Scheduling; +import org.gcube.vremanagement.executor.exception.ExecutorException; import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException; @@ -48,7 +49,7 @@ public class SmartExecutorScheduler { private static Logger logger = LoggerFactory.getLogger(SmartExecutorScheduler.class); protected Set scheduledJobs; - protected final Scheduler scheduler; + protected Scheduler scheduler; SmartExecutorScheduler(Scheduler scheduler) throws SchedulerException { this.scheduler = scheduler; @@ -192,7 +193,15 @@ public class SmartExecutorScheduler { * Checking if the requested plugin is available on this smart executor * instance */ - PluginManager.getPluginDeclaration(parameter.getPluginName()); + try { + PluginManager pluginManager = PluginManager.getInstance(); + pluginManager.getPlugin(parameter.getPluginName()); + } catch (PluginNotFoundException e) { + throw e; + } catch (ExecutorException e) { + throw new LaunchException(e); + } + if(uuid == null) { uuid = UUID.randomUUID(); @@ -310,6 +319,7 @@ public class SmartExecutorScheduler { } scheduler.clear(); scheduler.shutdown(); + scheduler = null; } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java index 53dad23..914b829 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorSchedulerFactory.java @@ -3,7 +3,7 @@ package org.gcube.vremanagement.executor.scheduler; import java.util.HashMap; import java.util.Map; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.quartz.Scheduler; import org.quartz.SchedulerException; @@ -25,27 +25,27 @@ public class SmartExecutorSchedulerFactory { smartExecutorSchedulers = new HashMap<>(); } - private static SmartExecutorScheduler getSmartExecutorScheduler(String scope) throws SchedulerException { - if(scope==null){ - String error = "No Scope available."; + private static SmartExecutorScheduler getSmartExecutorScheduler(String context) throws SchedulerException { + if(context==null){ + String error = "No context available."; logger.error(error); throw new RuntimeException(error); } logger.trace("Retrieving {} for scope {}", - SmartExecutorPersistenceConnector.class.getSimpleName(), scope); + SmartExecutorPersistenceConnector.class.getSimpleName(), context); - SmartExecutorScheduler smartExecutorScheduler = smartExecutorSchedulers.get(scope); + SmartExecutorScheduler smartExecutorScheduler = smartExecutorSchedulers.get(context); if(smartExecutorScheduler==null){ logger.trace("Retrieving {} for scope {} not found on internal {}. Intializing it.", SmartExecutorScheduler.class.getSimpleName(), - scope, Map.class.getSimpleName()); + context, Map.class.getSimpleName()); Scheduler scheduler = schedulerFactory.getScheduler(); smartExecutorScheduler = new SmartExecutorScheduler(scheduler); - smartExecutorSchedulers.put(SmartExecutorInitializator.getCurrentScope(), + smartExecutorSchedulers.put(ContextUtility.getCurrentContext(), smartExecutorScheduler); } @@ -57,14 +57,13 @@ public class SmartExecutorSchedulerFactory { * @throws SchedulerException */ public static synchronized SmartExecutorScheduler getSmartExecutorScheduler() throws SchedulerException { - String scope = SmartExecutorInitializator.getCurrentScope(); - return getSmartExecutorScheduler(scope); + String context = ContextUtility.getCurrentContext(); + return getSmartExecutorScheduler(context); } - - public static void remove(){ - String scope = SmartExecutorInitializator.getCurrentScope(); - smartExecutorSchedulers.remove(scope); + public static void removeCurrentSmartExecutorScheduler(){ + String context = ContextUtility.getCurrentContext(); + smartExecutorSchedulers.remove(context); } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java index ff48b81..480632d 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java @@ -13,13 +13,10 @@ import java.util.UUID; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException; -import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; -import org.gcube.vremanagement.executor.exception.PluginNotFoundException; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateNotification; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; @@ -74,9 +71,9 @@ public class SmartExecutorTask implements InterruptableJob { /* Derived from launchParameter*/ protected int executionCount; protected String pluginName; - protected Plugin plugin; + protected Plugin plugin; protected Map inputs; - protected RunnablePlugin> runnablePlugin; + protected RunnablePlugin runnablePlugin; protected boolean mustPreviousExecutionsCompleted; protected int maxExecutionNumber; /**/ @@ -89,9 +86,10 @@ public class SmartExecutorTask implements InterruptableJob { pluginName = launchParameter.getPluginName(); + PluginManager pluginManager = PluginManager.getInstance(); try { - plugin = PluginManager.instantiatePlugin(pluginName); - } catch (InputsNullException | PluginNotFoundException e) { + plugin = pluginManager.getPlugin(pluginName); + } catch (Exception e) { throw new JobExecutionException(e); } @@ -216,7 +214,7 @@ public class SmartExecutorTask implements InterruptableJob { return; } - runnablePlugin = new RunnablePlugin>( + runnablePlugin = new RunnablePlugin( plugin, inputs, uuid, executionCount, pluginStateNotifications, token); logger.debug("Going to run Job with ID {} (iteration {})", uuid, executionCount); diff --git a/src/test/java/org/gcube/vremanagement/executor/ContextTest.java b/src/test/java/org/gcube/vremanagement/executor/ContextTest.java index ee6073e..8d2198e 100644 --- a/src/test/java/org/gcube/vremanagement/executor/ContextTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/ContextTest.java @@ -31,6 +31,8 @@ public class ContextTest { protected static Properties properties; protected static final String PROPERTIES_FILENAME = "token.properties"; + public static final String ROOT = "/d4science.research-infrastructures.eu"; + public static final String DEFAULT_TEST_SCOPE_NAME; static { @@ -48,7 +50,22 @@ public class ContextTest { DEFAULT_TEST_SCOPE_NAME = "/gcube/devNext/NextNext"; } - public static String getCurrentScope(String token) throws ObjectNotFound, Exception { + public static String getCurrentContext(){ + String token = SecurityTokenProvider.instance.get(); + AuthorizationEntry authorizationEntry; + try { + authorizationEntry = Constants.authorizationService().get(token); + } catch (Exception e) { + logger.trace("Context was not retrieved from token. Going to get it from {}", ScopeProvider.class.getSimpleName()); + return ScopeProvider.instance.get(); + } + String context = authorizationEntry.getContext(); + logger.trace("Context retrieved from token is {}. Context in {} is {}", + context, ScopeProvider.class.getSimpleName(), ScopeProvider.instance.get()); + return context; + } + + public static String getContext(String token) throws ObjectNotFound, Exception { AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); String context = authorizationEntry.getContext(); logger.info("Context of token {} is {}", token, context); @@ -68,7 +85,7 @@ public class ContextTest { String qualifier = authorizationEntry.getQualifier(); Caller caller = new Caller(clientInfo, qualifier); AuthorizationProvider.instance.set(caller); - ScopeProvider.instance.set(getCurrentScope(token)); + ScopeProvider.instance.set(getContext(token)); } @BeforeClass diff --git a/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java b/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java index 846a37a..3d7e9d2 100644 --- a/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java @@ -11,27 +11,27 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.gcube.com.fasterxml.jackson.core.JsonGenerationException; +import org.gcube.com.fasterxml.jackson.databind.JsonMappingException; +import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.json.SEMapper; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.gcube.vremanagement.executor.plugin.PluginDefinition; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.plugin.Ref; import org.gcube.vremanagement.executor.plugin.RunOn; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; +import org.gcube.vremanagement.helloworld.HelloWorldPlugin; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.ObjectMapper; - /** * @author Luca Frosini (ISTI - CNR) * @@ -94,7 +94,7 @@ public class SerializationTest extends ContextTest { throws JsonGenerationException, JsonMappingException, IOException, InvalidPluginStateEvolutionException { PluginStateEvolution pes = new PluginStateEvolution(UUID.randomUUID(), 1, - Calendar.getInstance().getTimeInMillis(), new HelloWorldPluginDeclaration(), PluginState.RUNNING, 10); + Calendar.getInstance().getTimeInMillis(), new HelloWorldPlugin(), PluginState.RUNNING, 10); logger.debug("{} to be Marshalled : {}", pes.getClass().getSimpleName(), pes); ObjectMapper objectMapper = new ObjectMapper(); @@ -108,13 +108,13 @@ public class SerializationTest extends ContextTest { @Test public void testAvailablePluginMarshalling() throws Exception { - HelloWorldPluginDeclaration helloWorldPluginDeclaration = new HelloWorldPluginDeclaration(); - logger.debug("{}", ExtendedSEMapper.getInstance().marshal(helloWorldPluginDeclaration)); - PluginManager pluginManager = PluginManager.getInstance(); - Map availablePlugins = pluginManager.getAvailablePlugins(); - List plugins = new ArrayList(availablePlugins.values()); - String list = ExtendedSEMapper.getInstance().marshal(PluginDeclaration.class, plugins); + Map> availablePlugins = pluginManager.getAvailablePlugins(); + List plugins = new ArrayList<>(); + for(String pluginName : availablePlugins.keySet()) { + plugins.add(pluginManager.getPlugin(pluginName)); + } + String list = ExtendedSEMapper.getInstance().marshal(PluginDefinition.class, plugins); logger.debug("Plugins are :\n{}", list); } diff --git a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java index 25d8312..40c2595 100644 --- a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java @@ -8,10 +8,8 @@ import java.util.HashMap; import java.util.Map; import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.helloworld.HelloWorldPlugin; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; import org.junit.Test; /** @@ -25,17 +23,10 @@ public class SmartExecutorImplTest { Map inputs = new HashMap(); long sleepTime = 10000; inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime); - Plugin runnablePlugin = PluginManager.instantiatePlugin(HelloWorldPluginDeclaration.NAME); - runnablePlugin.launch(inputs); - } - - @Test - public void helloWorldFullTest() throws Exception{ - Map inputs = new HashMap(); - long sleepTime = 10000; - inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime); - Plugin runnablePlugin = PluginManager.instantiatePlugin(HelloWorldPluginDeclaration.NAME); - runnablePlugin.launch(inputs); + String name = (new HelloWorldPlugin()).getName(); + PluginManager pluginManager = PluginManager.getInstance(); + Plugin plugin = pluginManager.getPlugin(name); + plugin.launch(inputs); } } diff --git a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorInizializatorTest.java b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorInizializatorTest.java index 4c1514d..7357142 100644 --- a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorInizializatorTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorInizializatorTest.java @@ -5,19 +5,18 @@ package org.gcube.vremanagement.executor; import java.util.List; +import java.util.Map; -import org.gcube.common.authorization.client.Constants; -import org.gcube.common.authorization.client.exceptions.ObjectNotFound; -import org.gcube.common.authorization.library.AuthorizationEntry; -import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.resources.gcore.Resource; import org.gcube.common.resources.gcore.ServiceEndpoint; -import org.gcube.common.scope.api.ScopeProvider; import org.gcube.informationsystem.publisher.RegistryPublisher; import org.gcube.informationsystem.publisher.RegistryPublisherFactory; import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.queries.api.SimpleQuery; import org.gcube.resources.discovery.icclient.ICFactory; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.gcube.vremanagement.executor.pluginmanager.PluginManager; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,33 +28,13 @@ public class SmartExecutorInizializatorTest { private static Logger logger = LoggerFactory.getLogger(SmartExecutorInizializatorTest.class); - public static String getCurrentScope(String token) throws ObjectNotFound, Exception{ - AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); - String context = authorizationEntry.getContext(); - logger.info("Context of token {} is {}", token, context); - return context; - } - - - public static void setContext(String token) throws ObjectNotFound, Exception{ - SecurityTokenProvider.instance.set(token); - ScopeProvider.instance.set(getCurrentScope(token)); - } - - - public static String getCurrentScope(){ - String token = SecurityTokenProvider.instance.get(); - AuthorizationEntry authorizationEntry; - try { - authorizationEntry = Constants.authorizationService().get(token); - } catch (Exception e) { - logger.trace("Context was not retrieved from token. Going to get it from {}", ScopeProvider.class.getSimpleName()); - return ScopeProvider.instance.get(); + @Test + public void getAvailablePlugin() { + PluginManager pluginManager = PluginManager.getInstance(); + Map> availablePlugins = pluginManager.getAvailablePlugins(); + for(String pluginName : availablePlugins.keySet()) { + logger.debug("Plugin {}, Class {}", pluginName, availablePlugins.get(pluginName).getSimpleName()); } - String context = authorizationEntry.getContext(); - logger.trace("Context retrieved from token is {}. Context in {} is {}", - context, ScopeProvider.class.getSimpleName(), ScopeProvider.instance.get()); - return context; } protected static void unPublishResource(Resource resource) throws Exception { @@ -65,7 +44,7 @@ public class SmartExecutorInizializatorTest { RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); String id = resource.id(); - logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getCurrentScope()); + logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, ContextTest.getCurrentContext()); registryPublisher.remove(resource); @@ -88,11 +67,11 @@ public class SmartExecutorInizializatorTest { for (ServiceEndpoint serviceEndpoint : serviceEndpoints) { try { logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} ({}) from scope {}", - serviceEndpoint.id(), serviceEndpoint.profile().runtime().hostedOn(), SmartExecutorInitializator.getCurrentScope()); + serviceEndpoint.id(), serviceEndpoint.profile().runtime().hostedOn(), ContextUtility.getCurrentContext()); // unPublishResource(serviceEndpoint); } catch(Exception e){ logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} ({}) from scope {}", - serviceEndpoint.id(), serviceEndpoint.profile().runtime().hostedOn(), SmartExecutorInitializator.getCurrentScope(), e); + serviceEndpoint.id(), serviceEndpoint.profile().runtime().hostedOn(), ContextUtility.getCurrentContext(), e); } } }catch(Exception e){ diff --git a/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java b/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java index 58ebe8d..a84db16 100644 --- a/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java @@ -3,6 +3,7 @@ */ package org.gcube.vremanagement.executor.persistence; +import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; @@ -15,7 +16,7 @@ import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistenceFactory; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; +import org.gcube.vremanagement.helloworld.HelloWorldPlugin; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -31,10 +32,11 @@ public class SmartExecutorPersistenceConnectorTest extends ContextTest { @Test public void getConnectionTest() throws Exception { + // ContextTest.setContextByName(ROOT); SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); Assert.assertNotNull(persistenceConnector); Assert.assertEquals(OrientDBPersistenceConnector.class, persistenceConnector.getClass()); - SmartExecutorPersistenceFactory.closePersistenceConnector(); + SmartExecutorPersistenceFactory.closeCurrentPersistenceConnector(); } @Test @@ -46,7 +48,7 @@ public class SmartExecutorPersistenceConnectorTest extends ContextTest { for(int i=0; i lc = stc.getOrphanScheduledTasks(null); + List plugins = new ArrayList<>(); + plugins.add("hello-world-se-plugin"); + List lc = stc.getScheduledTasks(plugins); logger.debug("Available Scheduled Tasks : {}", lc); } diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java index 34d3148..b32c13d 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java @@ -3,7 +3,9 @@ package org.gcube.vremanagement.executor.pluginmanager; import java.util.UUID; import org.gcube.vremanagement.executor.ContextTest; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; +import org.gcube.vremanagement.executor.exception.ExecutorException; +import org.gcube.vremanagement.executor.exception.PluginNotFoundException; +import org.gcube.vremanagement.helloworld.HelloWorldPlugin; import org.junit.Assert; import org.junit.Test; @@ -13,17 +15,14 @@ import org.junit.Test; public class PluginManagerTest extends ContextTest { @Test - public void getInstance(){ + public void getHelloWorldPlugin() throws ExecutorException { PluginManager pluginManager = PluginManager.getInstance(); - Assert.assertNotNull(pluginManager); - } - - @Test - public void getHelloWorldPlugin(){ - PluginManager pluginManager = PluginManager.getInstance(); - Assert.assertNotNull(pluginManager); - Assert.assertEquals(HelloWorldPluginDeclaration.class, pluginManager.getPlugin(HelloWorldPluginDeclaration.NAME).getClass()); - Assert.assertNull(pluginManager.getPlugin(UUID.randomUUID().toString())); + Assert.assertEquals(HelloWorldPlugin.class, pluginManager.getPlugin(new HelloWorldPlugin().getName()).getClass()); + try { + pluginManager.getPlugin(UUID.randomUUID().toString()); + }catch (PluginNotFoundException e) { + // Ok. This is the expected behaviour + } } } diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java index dafc633..83e03dd 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java @@ -16,7 +16,6 @@ import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFact import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateNotification; import org.gcube.vremanagement.helloworld.HelloWorldPlugin; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -34,11 +33,11 @@ public class RunnablePluginTest extends ContextTest { public void launchNullInputsTest() throws Exception { logger.debug("Testing Null inputs"); UUID uuid = UUID.randomUUID(); - HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration(); + SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); List pluginStateNotifications = new ArrayList(); pluginStateNotifications.add(persistenceConnector); - HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd); + HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(); try { RunnablePlugin runnablePlugin = new RunnablePlugin(helloWorldPlugin, null, uuid, 1, pluginStateNotifications, SecurityTokenProvider.instance.get()); runnablePlugin.run(); @@ -52,11 +51,10 @@ public class RunnablePluginTest extends ContextTest { logger.debug("Testing Empty inputs"); Map inputs = new HashMap(); UUID uuid = UUID.randomUUID(); - HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration(); SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); List pluginStateNotifications = new ArrayList(); pluginStateNotifications.add(persistenceConnector); - HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd); + HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(); RunnablePlugin pt = new RunnablePlugin(helloWorldPlugin, inputs, uuid, 1, pluginStateNotifications,SecurityTokenProvider.instance.get()); try { @@ -76,11 +74,10 @@ public class RunnablePluginTest extends ContextTest { inputs.put(HelloWorldPlugin.SLEEP_TIME, sleepTime); UUID uuid = UUID.randomUUID(); - HelloWorldPluginDeclaration hwpd = new HelloWorldPluginDeclaration(); SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); List pluginStateNotifications = new ArrayList(); pluginStateNotifications.add(persistenceConnector); - HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(hwpd); + HelloWorldPlugin helloWorldPlugin = new HelloWorldPlugin(); RunnablePlugin rp = new RunnablePlugin(helloWorldPlugin, inputs, uuid, 1, pluginStateNotifications,SecurityTokenProvider.instance.get()); long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = startTime; diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java index a698a65..0e470be 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java @@ -20,7 +20,6 @@ import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; import org.gcube.vremanagement.executor.scheduler.SmartExecutorSchedulerFactory; import org.gcube.vremanagement.helloworld.HelloWorldPlugin; -import org.gcube.vremanagement.helloworld.HelloWorldPluginDeclaration; import org.junit.Assert; import org.junit.Test; import org.quartz.CronExpression; @@ -48,7 +47,7 @@ public class SmartExecutorSchedulerTest extends ContextTest { inputs.put("Test UUID", UUID.randomUUID()); logger.debug("Inputs : {}", inputs); - LaunchParameter parameter = new LaunchParameter(HelloWorldPluginDeclaration.NAME, inputs); + LaunchParameter parameter = new LaunchParameter(new HelloWorldPlugin().getName(), inputs); parameter.setScheduling(scheduling); SmartExecutorScheduler smartExecutorScheduler = SmartExecutorSchedulerFactory.getSmartExecutorScheduler();