From 59dcdec5f6d1b552cbe1373e2a25aa1fa3d0390b Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Wed, 1 Feb 2017 16:01:15 +0000 Subject: [PATCH] Merged from private branch before release 4.3.0 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@141987 82a268e6-3cf1-43bd-a215-b396298e98cf --- .classpath | 1 + .settings/org.eclipse.wst.common.component | 8 +- distro/changelog.xml | 13 +- distro/descriptor.xml | 6 +- distro/gcube-app.xml | 2 + distro/profile.xml | 1 + distro/sun-jaxws.xml | 1 + pom.xml | 86 ++-- .../executor/SmartExecutorImpl.java | 2 +- .../executor/SmartExecutorInitializator.java | 56 +- .../ScheduledTaskConfiguration.java | 68 --- .../ScheduledTaskConfigurationFactory.java | 18 - .../FileScheduledTaskConfiguration.java | 193 ------- .../jsonbased/JSONLaunchParameter.java | 199 ------- .../jsonbased/JSONScheduling.java | 123 ----- .../AlreadyInFinalStateException.java | 2 +- .../InvalidPluginStateEvolutionException.java | 2 +- .../MaxIterationRuntimeException.java | 2 +- .../PluginStateNotRetrievedException.java | 2 +- .../SchedulePersistenceException.java | 2 +- .../exception/SchedulerNotFoundException.java | 2 +- .../exception/ScopeNotMatchException.java | 2 +- .../exception/StopRuntimeException.java | 2 +- .../executor/json/ObjectMapperManager.java | 64 +++ ...SmartExecutorPersistenceConfiguration.java | 40 +- .../SmartExecutorPersistenceConnector.java | 12 +- .../SmartExecutorPersistenceFactory.java | 14 +- .../couchdb/CouchDBPersistenceConnector.java | 389 -------------- .../PluginStateEvolutionObjectNode.java | 169 ------ .../OrientDBPersistenceConnector.java | 487 ++++++++++++++++++ .../pluginmanager/PercentageSetterImpl.java | 2 +- .../executor/pluginmanager/PluginManager.java | 2 +- .../pluginmanager/RunnablePlugin.java | 7 +- .../executor/scheduledtask/ScheduledTask.java | 121 +++++ .../ScheduledTaskDurationInfo.java | 30 +- .../ScheduledTaskPersistence.java | 107 ++++ .../ScheduledTaskPersistenceFactory.java | 18 + .../scheduler/JobCompletedNotification.java | 2 +- .../scheduler/SmartExecutorScheduler.java | 25 +- .../executor/scheduler/SmartExecutorTask.java | 2 +- .../scheduler/SmartExecutorTaskListener.java | 2 +- .../vremanagement/executor/ScopedTest.java | 90 ++++ .../executor/SerializationTest.java | 104 ++++ .../executor/SmartExecutorImplTest.java | 2 +- .../executor/TokenBasedTests.java | 22 - .../configuration/ConfiguredTasksTest.java | 106 ---- ...SmartExecutorPersistenceConnectorTest.java | 31 +- .../pluginmanager/PluginManagerTest.java | 2 +- .../pluginmanager/RunnablePluginTest.java | 2 +- .../SmartExecutorSchedulerTest.java | 2 +- src/test/resources/logback-test.xml | 2 + 51 files changed, 1205 insertions(+), 1444 deletions(-) delete mode 100644 src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfiguration.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfigurationFactory.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/FileScheduledTaskConfiguration.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/JSONLaunchParameter.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/JSONScheduling.java create mode 100644 src/main/java/org/gcube/vremanagement/executor/json/ObjectMapperManager.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/CouchDBPersistenceConnector.java delete mode 100644 src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/PluginStateEvolutionObjectNode.java create mode 100644 src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java create mode 100644 src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java rename src/main/java/org/gcube/vremanagement/executor/{configuration/jsonbased => scheduledtask}/ScheduledTaskDurationInfo.java (55%) create mode 100644 src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java create mode 100644 src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistenceFactory.java create mode 100644 src/test/java/org/gcube/vremanagement/executor/ScopedTest.java create mode 100644 src/test/java/org/gcube/vremanagement/executor/SerializationTest.java delete mode 100644 src/test/java/org/gcube/vremanagement/executor/TokenBasedTests.java delete mode 100644 src/test/java/org/gcube/vremanagement/executor/configuration/ConfiguredTasksTest.java diff --git a/.classpath b/.classpath index 953de0b..d3c5af8 100644 --- a/.classpath +++ b/.classpath @@ -25,6 +25,7 @@ + diff --git a/.settings/org.eclipse.wst.common.component b/.settings/org.eclipse.wst.common.component index 3de269f..73d3b85 100644 --- a/.settings/org.eclipse.wst.common.component +++ b/.settings/org.eclipse.wst.common.component @@ -5,7 +5,13 @@ - + + uses + + + uses + + uses diff --git a/distro/changelog.xml b/distro/changelog.xml index 5c004cc..8ba4fe5 100644 --- a/distro/changelog.xml +++ b/distro/changelog.xml @@ -1,9 +1,16 @@ + + - - + + + + + SmartExecutor has been passed to Authorization 2.0 (refs #4944 #2112) + Provided to plugins the possibility to specify progress percentage (refs #440) + >Provided to plugins the possibility to define a custom notifier (refs #5089) - Using Persistence (CouchDB) to save Scheduled Task configuration (refs #579) + Using Persistence (CouchDB) to save Scheduled Task configuration (refs #579) Added Unscheduling feature for repetitive task (refs #521) diff --git a/distro/descriptor.xml b/distro/descriptor.xml index 11561cb..4e9ae23 100644 --- a/distro/descriptor.xml +++ b/distro/descriptor.xml @@ -1,7 +1,5 @@ - + servicearchive tar.gz diff --git a/distro/gcube-app.xml b/distro/gcube-app.xml index 0938e60..28f89c7 100644 --- a/distro/gcube-app.xml +++ b/distro/gcube-app.xml @@ -1,3 +1,5 @@ + + ${serviceClass} ${name} diff --git a/distro/profile.xml b/distro/profile.xml index d9d46df..e7a30cf 100644 --- a/distro/profile.xml +++ b/distro/profile.xml @@ -1,4 +1,5 @@ + Service diff --git a/distro/sun-jaxws.xml b/distro/sun-jaxws.xml index ec39cd6..574e0af 100644 --- a/distro/sun-jaxws.xml +++ b/distro/sun-jaxws.xml @@ -1,4 +1,5 @@ + diff --git a/pom.xml b/pom.xml index 0006ab0..6d04a0e 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.gcube.vremanagement smart-executor - 1.4.0-SNAPSHOT + 1.5.0-SNAPSHOT SmartExecutor Smart Executor Service war @@ -21,8 +21,9 @@ ${project.basedir}/distro VREManagement https://wiki.gcube-system.org/gcube/SmartExecutor + 2.2.3 - + scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/${project.artifactId} scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/${project.artifactId} @@ -40,11 +41,19 @@ org.gcube.distribution - maven-smartgears-bom + gcube-smartgears-bom LATEST pom import + + org.gcube.information-system + information-system-bom + LATEST + pom + import + + @@ -59,7 +68,7 @@ discovery-client provided - + org.gcube.core common-smartgears @@ -74,7 +83,7 @@ org.gcube.core common-smartgears-app - + org.gcube.common authorization-client @@ -85,13 +94,44 @@ common-authorization provided - + + org.gcube.vremanagement + smart-executor-client + [1.4.0-SNAPSHOT,2.0.0-SNAPSHOT] + + + org.gcube.information-system + resource-registry-publisher + provided + + + org.gcube.information-system + information-system-model + + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + 2.4.0 + + + + + com.orientechnologies + orientdb-graphdb + + + + com.tinkerpop + frames + + org.gcube.vremanagement smart-executor-api [1.4.0-SNAPSHOT, 2.0.0-SNAPSHOT) - + javax.servlet javax.servlet-api @@ -108,45 +148,23 @@ slf4j-api provided - - - org.json - json - 20090211 - jar - - - - org.ektorp - org.ektorp - 1.3.0 - jar - - - org.codehaus.jackson - jackson-core-asl - 1.9.7 - jar - - - - - + + junit junit 4.11 test - + org.acme HelloWorldPlugin [1.2.0-SNAPSHOT, 2.0.0-SNAPSHOT) test - + @@ -180,7 +198,7 @@ smart-executor false - + diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java index 760da13..c1001bb 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; /** * Effective implementation of Executor - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ @WebService( portName = "SmartExecutorPort", diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java index dcef38d..f40ca37 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitializator.java @@ -1,11 +1,14 @@ package org.gcube.vremanagement.executor; import java.io.StringWriter; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.gcube.common.authorization.client.Constants; import org.gcube.common.authorization.library.AuthorizationEntry; +import org.gcube.common.authorization.library.ClientType; +import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.resources.gcore.Resource; import org.gcube.common.resources.gcore.Resources; @@ -35,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class SmartExecutorInitializator implements ApplicationManager { @@ -47,7 +50,7 @@ public class SmartExecutorInitializator implements ApplicationManager { public static final long JOIN_TIMEOUT = 1000; - public static String getScopeFromToken(){ + public static String getCurrentScope(){ String token = SecurityTokenProvider.instance.get(); AuthorizationEntry authorizationEntry; try { @@ -58,6 +61,39 @@ public class SmartExecutorInitializator implements ApplicationManager { return authorizationEntry.getContext(); } + public static ClientInfo getClientInfo() { + String token = SecurityTokenProvider.instance.get(); + AuthorizationEntry authorizationEntry; + try { + authorizationEntry = Constants.authorizationService().get(token); + } catch (Exception e) { + return new ClientInfo() { + + /** + * Generated Serial Version UID + */ + private static final long serialVersionUID = 8311873203596762883L; + + @Override + public ClientType getType() { + return ClientType.USER; + } + + @Override + public List getRoles() { + return new ArrayList<>(); + } + + @Override + public String getId() { + return "UNKNOWN"; + } + }; + } + return authorizationEntry.getClientInfo(); + } + + /** * Publish the provided resource on all Service Scopes retrieved from * Context @@ -72,7 +108,7 @@ public class SmartExecutorInitializator implements ApplicationManager { RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); try { - logger.debug("Trying to publish to {}:\n{}", getScopeFromToken(), stringWriter); + logger.debug("Trying to publish to {}:\n{}", getCurrentScope(), stringWriter); registryPublisher.create(resource); } catch (Exception e) { logger.error("The resource was not published", e); @@ -93,7 +129,7 @@ public class SmartExecutorInitializator implements ApplicationManager { RegistryPublisher registryPublisher = RegistryPublisherFactory.create(); String id = resource.id(); - logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getScopeFromToken()); + logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getCurrentScope()); registryPublisher.remove(resource); @@ -224,11 +260,11 @@ public class SmartExecutorInitializator implements ApplicationManager { for (ServiceEndpoint serviceEndpoint : serviceEndpoints) { try { logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}", - serviceEndpoint.id(), getScopeFromToken()); + serviceEndpoint.id(), getCurrentScope()); unPublishResource(serviceEndpoint); } catch(Exception e){ logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}", - serviceEndpoint.id(), getScopeFromToken(), e); + serviceEndpoint.id(), getCurrentScope(), e); } } }catch(Exception e){ @@ -245,7 +281,7 @@ public class SmartExecutorInitializator implements ApplicationManager { */ @Override public void onInit() { - String scope = getScopeFromToken(); + String scope = getCurrentScope(); logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor is Starting on scope {}\n" @@ -299,7 +335,7 @@ public class SmartExecutorInitializator implements ApplicationManager { "\n-------------------------------------------------------\n" + "Smart Executor is Stopping on scope {}\n" + "-------------------------------------------------------", - getScopeFromToken()); + getCurrentScope()); SmartExecutorScheduler.getInstance().stopAll(); @@ -310,14 +346,14 @@ public class SmartExecutorInitializator implements ApplicationManager { } catch (Exception e) { logger.error("Unable to correctly close {} for scope {}", SmartExecutorPersistenceConnector.class.getSimpleName(), - getScopeFromToken(), e); + getCurrentScope(), e); } logger.trace( "\n-------------------------------------------------------\n" + "Smart Executor Stopped Successfully on scope {}\n" + "-------------------------------------------------------", - getScopeFromToken()); + getCurrentScope()); } } diff --git a/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfiguration.java b/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfiguration.java deleted file mode 100644 index 5b654f1..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfiguration.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.configuration; - -import java.util.List; -import java.util.UUID; - -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - */ -public interface ScheduledTaskConfiguration { - - public static final String SCOPE = "scope"; - - /** - * Retrieve from the #SmartExecutorPersistenceConnector the orphaned - * Scheduled tasks - * @return the list of orphaned Scheduled - * @throws SchedulePersistenceException if fails - */ - public List getAvailableScheduledTasks() throws SchedulePersistenceException; - - - /** - * Return the Scheduled Task if any, null otherwise - * @param uuid which identify the Scheduled Task - * @return LaunchParameter of the Scheduled task if any, null otherwise - * @throws SchedulePersistenceException if fails - */ - public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException; - - - /** - * Create a Scheduled Task on persistence - * @param uuid the uuid which (will) identify the task on the SmartExecutor instance - * @param parameter - * @throws SchedulePersistenceException if fails - */ - public void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter) throws SchedulePersistenceException; - - /** - * Reserve an orphan Scheduled tasks - * @param uuid the uuid which (will) identify the task on the SmartExecutor instance - * @throws SchedulePersistenceException if fails - */ - public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException; - - /** - * Remove from persistence the Scheduled Task. - * @param uuid the uuid which (will) identify the task on the SmartExecutor instance - * @param parameter - * @throws SchedulePersistenceException - */ - public void removeScheduledTask(UUID uuid)throws SchedulePersistenceException; - - /** - * Release the Scheduled Task leaving it as orphan on persistence - * @param uuid the uuid which (will) identify the task on the SmartExecutor - * instance - * @throws SchedulePersistenceException - */ - public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException; - -} diff --git a/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfigurationFactory.java b/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfigurationFactory.java deleted file mode 100644 index 32b0805..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/configuration/ScheduledTaskConfigurationFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.configuration; - -import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class ScheduledTaskConfigurationFactory { - - public static ScheduledTaskConfiguration getLaunchConfiguration() throws Exception { - return (ScheduledTaskConfiguration) SmartExecutorPersistenceFactory.getPersistenceConnector(); - } - -} diff --git a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/FileScheduledTaskConfiguration.java b/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/FileScheduledTaskConfiguration.java deleted file mode 100644 index 5173f1e..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/FileScheduledTaskConfiguration.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.configuration.jsonbased; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintWriter; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.io.FileUtils; -import org.gcube.smartgears.ContextProvider; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.api.types.Scheduling; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; -import org.gcube.vremanagement.executor.exception.ScopeNotMatchException; -import org.gcube.vremanagement.executor.utils.IOUtility; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguration { - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(FileScheduledTaskConfiguration.class); - - protected String configurationFileLocation; - protected List configuredTasks; - - - public static final String CONFIG_TASK_FILENAME = "definedTasks.json"; - - public FileScheduledTaskConfiguration() throws Exception { - this(ContextProvider.get().persistence().location()); - } - - public FileScheduledTaskConfiguration(String location) throws IOException, JSONException { - this.configurationFileLocation = location; - this.configuredTasks = new ArrayList(); - this.configuredTasks = retriveConfiguredTask(); - } - - protected Scheduling getScheduling(JSONObject jsonObject) - throws JSONException, ParseException, ScopeNotMatchException { - return new JSONScheduling(jsonObject); - } - - - protected static String configurationFileName(String configurationFileLocation){ - return configurationFileLocation + "/" + CONFIG_TASK_FILENAME; - } - - public List retriveConfiguredTask() - throws IOException, JSONException { - - String configuredTasksDefinition = IOUtility.readFile(configurationFileName(configurationFileLocation)); - List tasks = new ArrayList(); - - JSONArray jsonArray = new JSONArray(configuredTasksDefinition); - for(int i=0; i getConfiguredTasks() { - return configuredTasks; - } - - /** - * @param configuredTasks the configuredTasks to set - */ - public void setConfiguredTasks(List configuredTasks) { - this.configuredTasks = configuredTasks; - } - - /** {@inheritDoc} */ - @Override - public List getAvailableScheduledTasks() - throws SchedulePersistenceException { - // TODO Auto-generated method stub - return null; - } - - /** {@inheritDoc} */ - @Override - public void reserveScheduledTask(UUID uuid, String consumerID) - throws SchedulePersistenceException { - // TODO Auto-generated method stub - } - - /** {@inheritDoc} */ - @Override - public void removeScheduledTask(UUID uuid) - throws SchedulePersistenceException { - // TODO Auto-generated method stub - } - - /** {@inheritDoc} */ - @Override - public void releaseScheduledTask(UUID uuid) - throws SchedulePersistenceException { - // TODO Auto-generated method stub - } - - /* (non-Javadoc) - * @see org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration#getScheduledTask(java.util.UUID) - */ - @Override - public LaunchParameter getScheduledTask(UUID uuid) - throws SchedulePersistenceException { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/JSONLaunchParameter.java b/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/JSONLaunchParameter.java deleted file mode 100644 index fe253f3..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/JSONLaunchParameter.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.configuration.jsonbased; - -import java.text.ParseException; -import java.util.HashMap; -import java.util.Map; - -import org.gcube.vremanagement.executor.SmartExecutorInitializator; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.api.types.Scheduling; -import org.gcube.vremanagement.executor.exception.ScopeNotMatchException; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class JSONLaunchParameter extends LaunchParameter { - - private static Logger logger = LoggerFactory.getLogger(JSONLaunchParameter.class); - - public static final String PLUGIN_NAME = "pluginName"; - public static final String PLUGIN_CAPABILITIES = "pluginCapabilites"; - public static final String INPUTS = "inputs"; - public static final String SCHEDULING = "scheduling"; - public static final String USED_BY = "usedBy"; - public static final String SCOPE = "SCOPE"; - - /** - * Contains the GCOREEndpoint (aka Running Instance) ID - */ - protected String usedBy; - - protected String scope; - - @SuppressWarnings("unused") - private JSONLaunchParameter(){} - - public JSONLaunchParameter(String pluginName, Map inputs) { - super(pluginName, inputs); - } - - public JSONLaunchParameter(String pluginName, Map pluginCapabilities, Map inputs) { - super(pluginName, pluginCapabilities, inputs); - this.scope = SmartExecutorInitializator.getScopeFromToken(); - } - - public JSONLaunchParameter(String pluginName, Map inputs, Scheduling scheduling) throws ParseException { - super(pluginName, inputs, scheduling); - this.scope = SmartExecutorInitializator.getScopeFromToken(); - } - - public JSONLaunchParameter(String pluginName, Map pluginCapabilities, Map inputs, Scheduling scheduling) throws ParseException { - super(pluginName, pluginCapabilities, inputs, scheduling); - this.scope = SmartExecutorInitializator.getScopeFromToken(); - } - - public JSONLaunchParameter(LaunchParameter parameter) throws ParseException { - super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling()); - this.scheduling = new JSONScheduling(parameter.getScheduling()); - this.scope = SmartExecutorInitializator.getScopeFromToken(); - } - - public JSONLaunchParameter(JSONObject jsonObject) throws JSONException, ParseException, ScopeNotMatchException { - super(); - - this.pluginName = jsonObject.getString(PLUGIN_NAME); - - this.pluginCapabilities = null; - if(jsonObject.has(PLUGIN_CAPABILITIES)){ - this.pluginCapabilities = new HashMap(); - JSONObject capabilities = jsonObject.getJSONObject(PLUGIN_CAPABILITIES); - JSONArray names = capabilities.names(); - for(int j=0; j(); - JSONObject inputsJsonObject = jsonObject.getJSONObject(INPUTS); - JSONArray names = inputsJsonObject.names(); - for(int j=0; j client = ICFactory.clientFor(ServiceEndpoint.class); List serviceEndpoints = client.submit(query); if(serviceEndpoints.size()>1){ query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", TARGET_SCOPE)); - query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getScopeFromToken())); + query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getCurrentScope())); serviceEndpoints = client.submit(query); } return serviceEndpoints.get(0); @@ -139,10 +143,8 @@ public class SmartExecutorPersistenceConfiguration { protected void setValues(ServiceEndpoint serviceEndpoint, String persistenceClassName) throws Exception{ Group accessPoints = serviceEndpoint.profile().accessPoints(); for(AccessPoint accessPoint : accessPoints){ - Collection properties = accessPoint.propertyMap().values(); - - if(properties.contains(new ServiceEndpoint.Property().nameAndValue(PERSISTENCE_CLASS_NAME, persistenceClassName))){ - this.uri = new URI(accessPoint.address()); + if(accessPoint.name().compareTo(persistenceClassName)==0){ + this.url = accessPoint.address(); this.username = accessPoint.username(); String encryptedPassword = accessPoint.password(); diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java index d4079dd..2f9854c 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnector.java @@ -10,12 +10,13 @@ import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.plugin.PluginStateNotification; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; /** * Model the connector which create or open the connection to DB. - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ -public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification { +public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification implements ScheduledTaskPersistence { public SmartExecutorPersistenceConnector() { super(new HashMap()); @@ -35,8 +36,8 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif * @return the actual/last {@link PluginState} of the Plugin * @throws Exception if fails */ - public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) - throws Exception; + public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) throws Exception; + /** * Retrieve the status of the iterationNumber of the last running/run {@link Plugin} which is/was identified * by the UUID passed as parameter @@ -44,7 +45,6 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif * @return the actual/last {@link PluginState} of the Plugin * @throws Exception if fails */ - public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) - throws Exception; + public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception; } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java index 237a1b3..3dca539 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceFactory.java @@ -7,12 +7,12 @@ import java.util.HashMap; import java.util.Map; import org.gcube.vremanagement.executor.SmartExecutorInitializator; -import org.gcube.vremanagement.executor.persistence.couchdb.CouchDBPersistenceConnector; +import org.gcube.vremanagement.executor.persistence.orientdb.OrientDBPersistenceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public abstract class SmartExecutorPersistenceFactory { @@ -42,7 +42,7 @@ public abstract class SmartExecutorPersistenceFactory { * @return the persistenceConnector */ public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception { - String scope = SmartExecutorInitializator.getScopeFromToken(); + String scope = SmartExecutorInitializator.getCurrentScope(); SmartExecutorPersistenceConnector persistence = getPersistenceConnector(scope); @@ -51,12 +51,12 @@ public abstract class SmartExecutorPersistenceFactory { SmartExecutorPersistenceConnector.class.getSimpleName(), scope, Map.class.getSimpleName()); - String className = CouchDBPersistenceConnector.class.getSimpleName(); + String className = OrientDBPersistenceConnector.class.getSimpleName(); SmartExecutorPersistenceConfiguration configuration = new SmartExecutorPersistenceConfiguration(className); - persistence = new CouchDBPersistenceConnector(configuration); - persistenceConnectors.put(SmartExecutorInitializator.getScopeFromToken(), + persistence = new OrientDBPersistenceConnector(scope, configuration); + persistenceConnectors.put(SmartExecutorInitializator.getCurrentScope(), persistence); } @@ -64,7 +64,7 @@ public abstract class SmartExecutorPersistenceFactory { } public static synchronized void closePersistenceConnector() throws Exception { - String scope = SmartExecutorInitializator.getScopeFromToken(); + String scope = SmartExecutorInitializator.getCurrentScope(); SmartExecutorPersistenceConnector persistence = getPersistenceConnector(scope); if(persistence!=null){ diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/CouchDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/CouchDBPersistenceConnector.java deleted file mode 100644 index 1a51edc..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/CouchDBPersistenceConnector.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.persistence.couchdb; - -import java.io.InputStream; -import java.io.StringWriter; -import java.net.URL; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.io.IOUtils; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.ObjectNode; -import org.ektorp.CouchDbConnector; -import org.ektorp.CouchDbInstance; -import org.ektorp.DocumentNotFoundException; -import org.ektorp.UpdateConflictException; -import org.ektorp.ViewQuery; -import org.ektorp.ViewResult; -import org.ektorp.http.HttpClient; -import org.ektorp.http.StdHttpClient; -import org.ektorp.http.StdHttpClient.Builder; -import org.ektorp.impl.StdCouchDbConnector; -import org.ektorp.impl.StdCouchDbInstance; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration; -import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter; -import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; -import org.gcube.vremanagement.executor.exception.ScopeNotMatchException; -import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration; -import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; -import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnector implements ScheduledTaskConfiguration { - - private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class); - - protected CouchDbInstance couchDbInstance; - protected CouchDbConnector couchDbConnector; - - protected static final String DB_NAME = "dbName"; - - protected static final String _ID_JSON_FIELD = "_id"; - protected static final String _REV_JSON_FIELD = "_rev"; - protected static final String TYPE_JSON_FIELD = "type"; - - public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception { - super(); - prepareConnection(configuration); - } - - protected HttpClient initHttpClient(URL url, String username, String password){ - Builder builder = new StdHttpClient.Builder().url(url); - builder.username(username).password(password); - HttpClient httpClient = builder.build(); - return httpClient; - } - - protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception { - logger.debug("Preparing Connection for {}", this.getClass().getSimpleName()); - HttpClient httpClient = initHttpClient(configuration.getUri().toURL(), configuration.getUsername(), configuration.getPassword()); - couchDbInstance = new StdCouchDbInstance(httpClient); - couchDbConnector = new StdCouchDbConnector(configuration.getProperty(DB_NAME), couchDbInstance); - } - - protected ViewResult query(ViewQuery query){ - ViewResult result = couchDbConnector.queryView(query); - return result; - } - - @Override - public void close() throws Exception { - couchDbConnector.getConnection().shutdown(); - } - - protected void updateItem(JSONObject obj) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(obj.toString()); - couchDbConnector.update(node); - } - - protected JSONObject getObjectByID(String id) throws Exception { - InputStream is = couchDbConnector.getAsStream(id); - StringWriter writer = new StringWriter(); - IOUtils.copy(is, writer, "UTF-8"); - JSONObject obj = new JSONObject(writer.toString()); - return obj; - } - - protected void createItem(JSONObject obj, String id) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(obj.toString()); - createItem(node, id); - } - - protected void createItem(JsonNode node, String id) throws Exception { - if(id!=null && id.compareTo("")!=0){ - couchDbConnector.create(id, node); - }else{ - couchDbConnector.create(node); - } - } - - protected void deleteItem(String id, String revision) throws UpdateConflictException, Exception { - if(revision==null || revision.compareTo("")==0){ - JSONObject toDelete = getObjectByID(id); - revision = toDelete.getString(_REV_JSON_FIELD); - } - couchDbConnector.delete(id, revision); - } - - /** - * {@inheritDoc} - */ - @Override - public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, Exception e) throws Exception { - ObjectNode objectNode = PluginStateEvolutionObjectNode.getObjectMapper(pluginStateEvolution); - createItem(objectNode, null); - } - - /** - * {@inheritDoc} - */ - @Override - @Deprecated - public PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) - throws Exception { - return reallyQuery(null, uuid, iterationNumber); - } - - /** - * {@inheritDoc} - */ - @Override - @Deprecated - public PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception { - return reallyQuery(null, uuid, LAST); - } - - protected final static int LAST = -1; - - /* * - * {@inheritDoc} - * / - @Override - public PluginState getPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid, int iterationNumber) - throws Exception { - return reallyQuery(pluginDeclaration, uuid, iterationNumber); - } - - /* * - * {@inheritDoc} - * / - @Override - public PluginState getLastPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid) throws Exception { - return reallyQuery(pluginDeclaration, uuid, LAST); - } - */ - - protected static final String MAP_REDUCE__DESIGN = "_design/"; - - protected static final String PLUGIN_STATE_DOCUMENT = "pluginState"; - protected static final String PLUGIN_STATE = "pluginState"; - protected static final String PLUGIN_STATE_VIEW_ABANDONED = "pluginStateABANDONED"; - - protected static final String SCHEDULED_TASKS_DOCUMENT = "scheduledTasks"; - protected static final String ACTIVE_VIEW = "active"; - protected static final String ORPHAN_VIEW = "orphan"; - - protected static final String USED_BY_FIELD = "usedBy"; - protected static final String STOPPED = "stopped"; - - protected static final String RESERVED_BY = "reservedBy"; - protected static final String PREVIOUSLY_USED_BY = "previouslyUsedBy"; - protected static final String RESERVATION_TIMESTAMP = "reservationTimestamp"; - - protected static final String SCHEDULED_TASK_TYPE = "scheduledTask"; - - /** - * @param uuid - * @param iterationNumber -1 means LAST - * @return - * @throws Exception - */ - protected PluginStateEvolution reallyQuery(PluginDeclaration pluginDeclaration, UUID uuid, int iterationNumber) - throws Exception { - - ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE_DOCUMENT)); - - String scope = SmartExecutorInitializator.getScopeFromToken(); - ArrayNode startKey = new ObjectMapper().createArrayNode(); - startKey.add(scope); - ArrayNode endKey = new ObjectMapper().createArrayNode(); - endKey.add(scope); - if(pluginDeclaration!=null && pluginDeclaration.getName()!=null && pluginDeclaration.getName().compareTo("")!=0){ - startKey.add(pluginDeclaration.getName()); - endKey.add(pluginDeclaration.getName()); - query = query.viewName(PLUGIN_STATE_VIEW_ABANDONED); - }else{ - query = query.viewName(PLUGIN_STATE); - } - - startKey.add(uuid.toString()); - endKey.add(uuid.toString()); - - if(iterationNumber != LAST){ - startKey.add(iterationNumber); - endKey.add(iterationNumber); - - startKey.add(1); - endKey.add("{}"); - }else{ - // Adding time interval - startKey.add(1); - endKey.add("{}"); - } - - query.startKey(startKey); - query.endKey(endKey); - - query.reduce(false); - - PluginStateEvolution pluginStateEvolution = null; - ViewResult viewResult = query(query); - for (ViewResult.Row row : viewResult) { - //JsonNode key = row.getKeyAsNode(); - JsonNode value = row.getValueAsNode(); - - pluginStateEvolution = PluginStateEvolutionObjectNode.getPluginStateEvolution(value); - - - } - - if(pluginStateEvolution==null){ - throw new PluginStateNotRetrievedException(); - } - return pluginStateEvolution; - } - - - protected List findOrphanedScheduledTasks(){ - // TODO Implements after sweeper has been implemented - return null; - } - - protected void freeOrphanedScheduledTasks(){ - //List orphaned = findOrphanedScheduledTasks(); - // TODO - // TODO Implements after sweeper has been implemented - } - - - /** {@inheritDoc} */ - @Override - public List getAvailableScheduledTasks() - throws SchedulePersistenceException { - ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT)); - query = query.viewName(ORPHAN_VIEW); - - String scope = SmartExecutorInitializator.getScopeFromToken(); - ArrayNode startKey = new ObjectMapper().createArrayNode(); - startKey.add(scope); - ArrayNode endKey = new ObjectMapper().createArrayNode(); - endKey.add(scope); - endKey.add("{}"); - query.startKey(startKey); - query.endKey(endKey); - - - List ret = new ArrayList(); - - ViewResult viewResult = query(query); - for (ViewResult.Row row : viewResult) { - //JsonNode key = row.getKeyAsNode(); - JsonNode value = row.getValueAsNode(); - try { - JSONObject obj = new JSONObject(value.toString()); - JSONLaunchParameter jlp = new JSONLaunchParameter(obj); - ret.add(jlp); - } catch (ParseException | JSONException e) { - logger.error("Unable to parse result Row", e.getCause()); - continue; - } catch (ScopeNotMatchException ex){ - logger.error("The result row does not macth the current Scope. This should indicate a query error.", ex.getCause()); - continue; - } - } - - return ret; - } - - - - /** {@inheritDoc} */ - @Override - public void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter) - throws SchedulePersistenceException { - try { - JSONLaunchParameter jlp = new JSONLaunchParameter(parameter); - JSONObject obj = jlp.toJSON(); - obj.append(TYPE_JSON_FIELD, SCHEDULED_TASK_TYPE); - obj.append(USED_BY_FIELD, consumerID); - obj.append(ScheduledTaskConfiguration.SCOPE, SmartExecutorInitializator.getScopeFromToken()); - createItem(obj, uuid.toString()); - } catch (Exception e) { - logger.error("Error Adding Scheduled Task UUID : {}, Consumer : {}, LaunchParameter : {}", - uuid, consumerID, parameter, e); - throw new SchedulePersistenceException(e.getCause()); - } - } - - /** {@inheritDoc} */ - @Override - public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException { - try { - JSONObject obj = getObjectByID(uuid.toString()); - // TODO change it - String previousConsumerID = obj.getString(USED_BY_FIELD); - obj.put(PREVIOUSLY_USED_BY, previousConsumerID); - obj.remove(USED_BY_FIELD); - obj.put(RESERVED_BY, consumerID); - obj.put(RESERVATION_TIMESTAMP, Calendar.getInstance().getTimeInMillis()); - updateItem(obj); - } catch (Exception e) { - logger.error("Error Reserving Scheduled Task UUID : {} Consumer : {}", - uuid, consumerID, e); - throw new SchedulePersistenceException(e.getCause()); - } - - } - - /** {@inheritDoc} */ - @Override - public void removeScheduledTask(UUID uuid) throws SchedulePersistenceException { - try { - JSONObject obj = getObjectByID(uuid.toString()); - obj.remove(USED_BY_FIELD); - obj.put(STOPPED, true); - updateItem(obj); - } catch (Exception e) { - logger.error("Error Removing Scheduled Task UUID : {}", uuid, e); - throw new SchedulePersistenceException(e.getCause()); - } - } - - /** {@inheritDoc} */ - @Override - public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException { - try { - JSONObject obj = getObjectByID(uuid.toString()); - obj.remove(USED_BY_FIELD); - updateItem(obj); - } catch (Exception e) { - logger.error("Error Releasing Scheduled Task UUID : {}", uuid, e); - throw new SchedulePersistenceException(e.getCause()); - } - } - - /** {@inheritDoc} */ - @Override - public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException { - try { - JSONObject jsonObject = getObjectByID(uuid.toString()); - return new JSONLaunchParameter(jsonObject); - } catch (DocumentNotFoundException e) { - return null; - } catch (Exception e) { - throw new SchedulePersistenceException(e.getCause()); - } - } - -} diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/PluginStateEvolutionObjectNode.java b/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/PluginStateEvolutionObjectNode.java deleted file mode 100644 index 0b36cf1..0000000 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/couchdb/PluginStateEvolutionObjectNode.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.persistence.couchdb; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; - -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ObjectNode; -import org.gcube.common.resources.gcore.GCoreEndpoint; -import org.gcube.smartgears.ContextProvider; -import org.gcube.vremanagement.executor.SmartExecutorInitializator; -import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; -import org.gcube.vremanagement.executor.plugin.Plugin; -import org.gcube.vremanagement.executor.plugin.PluginDeclaration; -import org.gcube.vremanagement.executor.plugin.PluginState; -import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class PluginStateEvolutionObjectNode { - - protected static final String EVOLUTION_TYPE = "evolution"; - - public final static String UUID_FIELD = "uuid"; - public final static String ITERATION_FIELD = "iteration"; - - public final static String PLUGIN_DECLARATION_FIELD = "pluginDeclaration"; - public final static String PLUGIN_DECLARATION_NAME_FIELD = "name"; - public final static String PLUGIN_DECLARATION_DESCRIPTION_FIELD = "description"; - public final static String PLUGIN_DECLARATION_VERSION_FIELD = "version"; - public final static String PLUGIN_DECLARATION_HOST_DISCOVERED_CAPABILITIES_FIELD = "hostDiscoveredCapabilities"; - - public final static String TIMESTAMP_FIELD = "timestamp"; - public final static String STATE_FIELD = "state"; - - protected static final String RUN_ON_FIELD = "runOn"; - public final static String GHN_HOSTNAME_FIELD = "ghnHostname"; - public final static String GHN_ID_FIELD = "ghnID"; - - public final static String SCOPE_FIELD = "scope"; - - public final static String LOCALHOST = "localhost"; - - public final static String PERCENTAGE = "percentage"; - - protected static ObjectNode getRunOn(){ - ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode objectNode = objectMapper.createObjectNode(); - try { - GCoreEndpoint gCoreEndpoint = ContextProvider.get().profile(GCoreEndpoint.class); - objectNode.put(GHN_ID_FIELD, gCoreEndpoint.profile().ghnId()); - objectNode.put(GHN_HOSTNAME_FIELD, ContextProvider.get().container().configuration().hostname()); - }catch(Exception e){ - objectNode.put(GHN_ID_FIELD, LOCALHOST + "_" + UUID.randomUUID()); - objectNode.put(GHN_HOSTNAME_FIELD, LOCALHOST); - } - return objectNode; - } - - protected static ObjectNode getPluginInfo(PluginDeclaration pluginDeclaration){ - ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode objectNode = objectMapper.createObjectNode(); - objectNode.put(PLUGIN_DECLARATION_NAME_FIELD, pluginDeclaration.getName()); - objectNode.put(PLUGIN_DECLARATION_DESCRIPTION_FIELD, pluginDeclaration.getDescription()); - objectNode.put(PLUGIN_DECLARATION_VERSION_FIELD, pluginDeclaration.getVersion()); - - Map capabilites = pluginDeclaration.getSupportedCapabilities(); - ObjectNode capabilitiesObjectNode = objectMapper.createObjectNode(); - if(capabilites!=null){ - for(String key : capabilites.keySet()){ - capabilitiesObjectNode.put(key, capabilites.get(key)); - } - } - objectNode.put(PLUGIN_DECLARATION_HOST_DISCOVERED_CAPABILITIES_FIELD, capabilitiesObjectNode); - - return objectNode; - } - - protected static PluginDeclaration getPluginDeclaration(final JsonNode jsonNode){ - PluginDeclaration pluginDeclaration = new PluginDeclaration() { - - @Override - public void init() throws Exception {} - - @Override - public String getVersion() { - return jsonNode.get(PLUGIN_DECLARATION_VERSION_FIELD).asText(); - } - - @Override - public Map getSupportedCapabilities() { - Map capabilities = new HashMap<>(); - JsonNode node = jsonNode.get(PLUGIN_DECLARATION_VERSION_FIELD); - Iterator iterator = node.getFieldNames(); - while(iterator.hasNext()) { - String key = iterator.next(); - capabilities.put(key, node.get(key).asText()); - } - return capabilities; - } - - @Override - public Class> getPluginImplementation() { - return null; - } - - @Override - public String getName() { - return jsonNode.get(PLUGIN_DECLARATION_NAME_FIELD).asText(); - } - - @Override - public String getDescription() { - return jsonNode.get(PLUGIN_DECLARATION_DESCRIPTION_FIELD).asText(); - } - }; - - return pluginDeclaration; - } - - - public static void addScope(ObjectNode objectNode){ - objectNode.put(SCOPE_FIELD, SmartExecutorInitializator.getScopeFromToken()); - } - - public static ObjectNode getObjectMapper(PluginStateEvolution pluginStateEvolution){ - ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode objectNode = objectMapper.createObjectNode(); - objectNode.put(UUID_FIELD, pluginStateEvolution.getUuid().toString()); - objectNode.put(ITERATION_FIELD, pluginStateEvolution.getIteration()); - objectNode.put(TIMESTAMP_FIELD, pluginStateEvolution.getTimestamp()); - - objectNode.put(PLUGIN_DECLARATION_FIELD, getPluginInfo(pluginStateEvolution.getPluginDeclaration())); - - objectNode.put(STATE_FIELD, pluginStateEvolution.getPluginState().toString()); - - objectNode.put(PERCENTAGE, pluginStateEvolution.getPercentage()); - - addScope(objectNode); - - objectNode.put(CouchDBPersistenceConnector.TYPE_JSON_FIELD, EVOLUTION_TYPE); - try { - objectNode.put(RUN_ON_FIELD, getRunOn()); - }catch(Exception e){ - // TODO - } - - return objectNode; - } - - - public static PluginStateEvolution getPluginStateEvolution(JsonNode jsonNode) - throws InvalidPluginStateEvolutionException{ - UUID uuid = UUID.fromString(jsonNode.get(UUID_FIELD).asText()); - int iteration = jsonNode.get(ITERATION_FIELD).asInt(); - long timestamp = jsonNode.get(TIMESTAMP_FIELD).asInt(); - PluginDeclaration pluginDeclaration = getPluginDeclaration(jsonNode.get(PLUGIN_DECLARATION_FIELD)); - PluginState pluginState = PluginState.valueOf(jsonNode.get(STATE_FIELD).asText()); - int percentage = jsonNode.get(PERCENTAGE).asInt(); - return new PluginStateEvolution(uuid, iteration, timestamp, pluginDeclaration, pluginState, percentage); - } -} diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java new file mode 100644 index 0000000..1ad7b56 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/orientdb/OrientDBPersistenceConnector.java @@ -0,0 +1,487 @@ +/** + * + */ +package org.gcube.vremanagement.executor.persistence.orientdb; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; +import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; +import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException; +import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; +import org.gcube.vremanagement.executor.json.ObjectMapperManager; +import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration; +import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; +import org.gcube.vremanagement.executor.plugin.PluginDeclaration; +import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; +import org.gcube.vremanagement.executor.plugin.RunOn; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; + +/** + * @author Luca Frosini (ISTI - CNR) + * + */ +public class OrientDBPersistenceConnector extends + SmartExecutorPersistenceConnector { + + private static final Logger logger = LoggerFactory + .getLogger(OrientDBPersistenceConnector.class); + + protected final static int LAST = -1; + + protected final String scope; + + protected final String SCOPE = "scope"; + protected final String UUID = "uuid"; + protected final String ITERATION = "iteration"; + protected final String TIMESTAMP = "timestamp"; + + protected final String RUN_ON = "runOn"; + + protected OPartitionedDatabasePool oPartitionedDatabasePool; + protected ObjectMapper mapper; + + public OrientDBPersistenceConnector(String scope, + SmartExecutorPersistenceConfiguration configuration) + throws Exception { + super(); + this.scope = scope; + prepareConnection(configuration); + this.mapper = ObjectMapperManager.getObjectMapper(); + } + + public OrientDBPersistenceConnector( + SmartExecutorPersistenceConfiguration configuration) + throws Exception { + this(SmartExecutorInitializator.getCurrentScope(), configuration); + } + + protected void prepareConnection( + SmartExecutorPersistenceConfiguration configuration) + throws Exception { + logger.debug("Preparing Connection for {}", this.getClass() + .getSimpleName()); + String url = configuration.getURL(); + String username = configuration.getUsername(); + String password = configuration.getPassword(); + this.oPartitionedDatabasePool = new OPartitionedDatabasePool(url, + username, password); + } + + protected void prepareObjectMapper() { + this.mapper = new ObjectMapper(); + + } + + @Override + public void close() throws Exception { + oPartitionedDatabasePool.close(); + } + + protected PluginStateEvolution getPluginStateEvolution(UUID uuid, + int iterationNumber) throws Exception { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + String type = PluginStateEvolution.class.getSimpleName(); + Map params = new HashMap(); + params.put(UUID, uuid.toString()); + params.put(SCOPE, scope); + + OSQLSynchQuery query = null; + if (iterationNumber != LAST) { + query = new OSQLSynchQuery( + String.format( + "SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1", + type, SCOPE, SCOPE, UUID, UUID, ITERATION, + ITERATION, TIMESTAMP)); + params.put(ITERATION, iterationNumber); + } else { + query = new OSQLSynchQuery( + String.format( + "SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC", + type, SCOPE, SCOPE, UUID, UUID, ITERATION)); + } + + List result = query.execute(params); + + ODocument resDoc = null; + + if (iterationNumber != LAST) { + resDoc = result.get(0); + } else { + // TODO manage better + long maxTimestamp = 0; + for (ODocument oDoc : result) { + long tm = (long) oDoc.field(TIMESTAMP); + if (maxTimestamp <= tm) { + maxTimestamp = tm; + resDoc = oDoc; + } + } + } + + String json = resDoc.toJSON("class"); + PluginStateEvolution pluginStateEvolution = mapper.readValue(json, + PluginStateEvolution.class); + + return pluginStateEvolution; + } catch (Exception e) { + throw new PluginStateNotRetrievedException(e); + } finally { + db.close(); + } + } + + @Override + public PluginStateEvolution getPluginInstanceState(UUID uuid, + int iterationNumber) throws Exception { + return getPluginStateEvolution(uuid, iterationNumber); + } + + @Override + public PluginStateEvolution getLastPluginInstanceState(UUID uuid) + throws Exception { + return getPluginStateEvolution(uuid, LAST); + } + + @Override + public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, + Exception exception) throws Exception { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + + ODocument doc = new ODocument( + PluginStateEvolution.class.getSimpleName()); + String json = mapper.writeValueAsString(pluginStateEvolution); + doc.fromJSON(json); + doc.field(SCOPE, scope); + + doc.save(); + db.commit(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw e; + } finally { + if (db != null) { + db.close(); + } + } + } + + @Override + public void addScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + + ODocument doc = new ODocument(ScheduledTask.class.getSimpleName()); + + long timestamp = Calendar.getInstance().getTimeInMillis(); + doc.field(TIMESTAMP, timestamp); + + String json = mapper.writeValueAsString(scheduledTask); + doc.fromJSON(json); + doc.save(); + + db.commit(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + db.close(); + } + + } + + protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception { + try { + UUID uuid = scheduledTask.getUUID(); + + RunOn runOn = scheduledTask.getRunOn(); + String address = runOn.getEService().getAddress(); + SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter( + address); + + String pluginName = scheduledTask.getLaunchParameter() + .getPluginName(); + + try { + SmartExecutorProxy proxy = ExecutorPlugin + .getExecutorProxy(pluginName, null, null, + specificEndpointDiscoveryFilter).build(); + proxy.getStateEvolution(uuid.toString()); + } catch (Exception e) { + // The instance was not found or the request failed. + // The scheduledTask is considered orphan + logger.trace("{} is considered orphan.", ObjectMapperManager + .getObjectMapper().writeValueAsString(scheduledTask)); + return true; + } + } catch (Exception e) { + String string = ObjectMapperManager.getObjectMapper() + .writeValueAsString(scheduledTask); + logger.error("Error while checking orphanity of " + string + + ". Considering as not orphan."); + } + + return false; + } + + @Override + public List getOrphanScheduledTasks( + List pluginDeclarations) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + String type = ScheduledTask.class.getSimpleName(); + + OSQLSynchQuery query = new OSQLSynchQuery( + String.format("SELECT * FROM %s WHERE %s = :%s", type) + // TODO filter for task the instance can run + ); + + List result = query.execute(); + + List scheduledTasks = new ArrayList<>(); + + for (ODocument doc : result) { + String json = doc.toJSON("class"); + + ScheduledTask scheduledTask = mapper.readValue(json, + ScheduledTask.class); + try { + if (isOrphan(scheduledTask)) { + scheduledTasks.add(scheduledTask); + } + } catch (Exception e) { + logger.error( + "An Exception occurred while evaluating if {} is orphan", + json); + } + + } + + return scheduledTasks; + } catch (Exception e) { + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + protected ODocument getScheduledTaskDocument(ODatabaseDocumentTx db, + UUID uuid) throws SchedulePersistenceException { + try { + String type = ScheduledTask.class.getSimpleName(); + Map params = new HashMap(); + params.put(UUID, uuid.toString()); + + OSQLSynchQuery query = new OSQLSynchQuery( + String.format("SELECT FROM %s WHERE %s = :%s", type, UUID, + UUID)); + + List result = query.execute(params); + if (result.size() > 1) { + String error = String.format( + "Found more than one %s with UUID=%s. %s. %s.", type, + uuid.toString(), + "This is really strange and should not occur", + "Please contact the smart-executor administrator"); + logger.error(error); + throw new SchedulePersistenceException(error); + } else if (result.size() == 0) { + String error = String.format("No %s with UUID=%s found.", type, + uuid.toString()); + logger.error(error); + throw new SchedulePersistenceException(error); + } + + return result.get(0); + } catch (Exception e) { + throw new SchedulePersistenceException(e); + } + } + + @Override + public ScheduledTask getScheduledTask(UUID uuid) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + ODocument doc = getScheduledTaskDocument(db, uuid); + String json = doc.toJSON("class"); + return mapper.readValue(json, ScheduledTask.class); + } catch (Exception e) { + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + @Override + public void reserveScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + UUID uuid = scheduledTask.getUUID(); + + ODocument doc = getScheduledTaskDocument(db, uuid); + + long timestamp = doc.field(TIMESTAMP); + if (timestamp != scheduledTask.getTimestamp()) { + throw new SchedulePersistenceException( + "The ScheduledTask has been already reserved."); + } + + RunOn runOn = ScheduledTask.generateRunOn(); + String json = mapper.writeValueAsString(runOn); + doc.field(RUN_ON, json); + + timestamp = Calendar.getInstance().getTimeInMillis(); + doc.field(TIMESTAMP, timestamp); + + doc.save(); + db.commit(); + + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + @Override + public void removeScheduledTask(UUID uuid) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + ODocument doc = getScheduledTaskDocument(db, uuid); + doc.delete(); + db.commit(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + + @Override + public void removeScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + UUID uuid = scheduledTask.getUUID(); + + ODocument doc = getScheduledTaskDocument(db, uuid); + + long timestamp = doc.field(TIMESTAMP); + if (timestamp != scheduledTask.getTimestamp()) { + throw new SchedulePersistenceException( + "The ScheduledTask has been changed."); + } + + doc.delete(); + db.commit(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + @Override + public void releaseScheduledTask(UUID uuid) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + ODocument doc = getScheduledTaskDocument(db, uuid); + doc.removeField(RUN_ON); + doc.save(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } + + @Override + public void releaseScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException { + ODatabaseDocumentTx db = null; + try { + db = oPartitionedDatabasePool.acquire(); + UUID uuid = scheduledTask.getUUID(); + ODocument doc = getScheduledTaskDocument(db, uuid); + + long timestamp = doc.field(TIMESTAMP); + if (timestamp != scheduledTask.getTimestamp()) { + throw new SchedulePersistenceException( + "The ScheduledTask has been changed."); + } + + doc.removeField(RUN_ON); + doc.save(); + } catch (Exception e) { + if (db != null) { + db.rollback(); + } + throw new SchedulePersistenceException(e); + } finally { + if (db != null) { + db.close(); + } + } + } +} diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java index ca46aad..ecb8789 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PercentageSetterImpl.java @@ -9,7 +9,7 @@ import org.gcube.vremanagement.executor.plugin.Plugin; import org.gcube.vremanagement.executor.plugin.PluginDeclaration; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class PercentageSetterImpl> implements PercentageSetter { diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java index 9fe8a87..372d78a 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/PluginManager.java @@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory; * This is a singleton class which discover on classpath the available plugins * and map the plugin name to its implementation class. * The plugin implementation class can be retrieved using its name. - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ @SuppressWarnings("deprecation") public class PluginManager { diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java index a45ace1..940fa8e 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java @@ -19,7 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class RunnablePlugin> implements Runnable { @@ -66,8 +66,9 @@ public class RunnablePlugin> imple public void run(){ try { setState(PluginState.RUNNING); - - plugin.launch(inputs); + this.plugin.setUUID(uuid); + this.plugin.setIterationNumber(iterationNumber); + this.plugin.launch(inputs); setState(PluginState.DONE); } catch (AlreadyInFinalStateException e1) { return; diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java new file mode 100644 index 0000000..51d5160 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTask.java @@ -0,0 +1,121 @@ +/** + * + */ +package org.gcube.vremanagement.executor.scheduledtask; + +import java.util.UUID; + +import org.gcube.common.authorization.library.provider.ClientInfo; +import org.gcube.common.resources.gcore.GCoreEndpoint; +import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint; +import org.gcube.common.resources.gcore.HostingNode; +import org.gcube.common.resources.gcore.utils.Group; +import org.gcube.smartgears.Constants; +import org.gcube.smartgears.ContextProvider; +import org.gcube.vremanagement.executor.SmartExecutorInitializator; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; +import org.gcube.vremanagement.executor.api.types.Scheduling; +import org.gcube.vremanagement.executor.plugin.Ref; +import org.gcube.vremanagement.executor.plugin.RunOn; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * @author Luca Frosini (ISTI - CNR) + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY) +public class ScheduledTask { + + protected Long timestamp; + + protected UUID uuid; + protected LaunchParameter launchParameter; + + protected String scope; + protected ClientInfo clientInfo; + + protected RunOn runOn; + + protected ScheduledTask(){} + + public ScheduledTask(UUID uuid, LaunchParameter launchParameter) { + this(uuid, launchParameter, generateRunOn()); + } + + public ScheduledTask(UUID uuid, LaunchParameter launchParameter, RunOn runOn) { + this.uuid = uuid; + this.launchParameter = launchParameter; + this.scope = SmartExecutorInitializator.getCurrentScope(); + this.clientInfo = SmartExecutorInitializator.getClientInfo(); + this.runOn = runOn; + } + + /** + * @return the timestamp + */ + public Long getTimestamp() { + return timestamp; + } + + /** + * @return the uuid + */ + public UUID getUUID() { + return uuid; + } + + /** + * @return the scope + */ + public String getScope() { + return scope; + } + + /** + * @return the clientInfo + */ + public ClientInfo getClientInfo() { + return clientInfo; + } + + /** + * @return the runOn + */ + public RunOn getRunOn() { + return runOn; + } + + /** + * @param runOn the runOn to set + */ + public static RunOn generateRunOn() { + HostingNode hostingNode = ContextProvider.get().profile(HostingNode.class); + Ref hostingNodeRef = new Ref(hostingNode.id(), hostingNode.profile().description().name()); + + GCoreEndpoint gCoreEndpoint = ContextProvider.get().profile(GCoreEndpoint.class); + String address = ""; + Group endpoints = gCoreEndpoint.profile().endpoints(); + for(Endpoint endpoint : endpoints){ + if(endpoint.name().contains(Constants.remote_management)){ + continue; + }else{ + address = endpoint.uri().toString(); + break; + } + } + + Ref eServiceRef = new Ref(gCoreEndpoint.id(), address); + + RunOn runOn = new RunOn(hostingNodeRef, eServiceRef); + + return runOn; + } + + /** + * @return the launchParameter + */ + public LaunchParameter getLaunchParameter(){ + return launchParameter; + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/ScheduledTaskDurationInfo.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskDurationInfo.java similarity index 55% rename from src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/ScheduledTaskDurationInfo.java rename to src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskDurationInfo.java index 7cebd02..504a8f2 100644 --- a/src/main/java/org/gcube/vremanagement/executor/configuration/jsonbased/ScheduledTaskDurationInfo.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskDurationInfo.java @@ -1,27 +1,21 @@ /** * */ -package org.gcube.vremanagement.executor.configuration.jsonbased; +package org.gcube.vremanagement.executor.scheduledtask; -import org.json.JSONException; -import org.json.JSONObject; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * + * @author Luca Frosini (ISTI - CNR) */ public class ScheduledTaskDurationInfo { - public static final String LAST = "last"; - public static final String AVERAGE = "avg"; - public static final String MIN = "min"; - public static final String MAX = "max"; - protected long last; protected long avg; protected long min; protected long max; + protected ScheduledTaskDurationInfo(){} + public ScheduledTaskDurationInfo(long last, long avg, long min, long max){ this.last = last; this.avg = avg; @@ -29,13 +23,6 @@ public class ScheduledTaskDurationInfo { this.max = max; } - public ScheduledTaskDurationInfo(JSONObject jsonObject) throws JSONException{ - this.last = jsonObject.getLong(LAST); - this.avg = jsonObject.getLong(AVERAGE); - this.min = jsonObject.getLong(MIN); - this.max = jsonObject.getLong(MAX); - } - /** * @return the last */ @@ -92,13 +79,4 @@ public class ScheduledTaskDurationInfo { this.max = max; } - public JSONObject toJSON() throws JSONException { - JSONObject obj = new JSONObject(); - obj.put(LAST, last); - obj.put(AVERAGE, avg); - obj.put(MIN, min); - obj.put(MAX, max); - return obj; - } - } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java new file mode 100644 index 0000000..dd2b0eb --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistence.java @@ -0,0 +1,107 @@ +/** + * + */ +package org.gcube.vremanagement.executor.scheduledtask; + +import java.util.List; +import java.util.UUID; + +import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; +import org.gcube.vremanagement.executor.plugin.PluginDeclaration; + +/** + * @author Luca Frosini (ISTI - CNR) + */ +public interface ScheduledTaskPersistence { + + public static final String SCOPE = "scope"; + + /** + * Retrieve from the #SmartExecutorPersistenceConnector the orphaned + * Scheduled tasks + * @param pluginDeclarations + * @return the list of orphaned Scheduled + * @throws SchedulePersistenceException + * if fails + */ + public List getOrphanScheduledTasks( + List pluginDeclarations) + throws SchedulePersistenceException; + + /** + * Return the Scheduled Task if any, null otherwise + * + * @param uuid + * which identify the Scheduled Task + * @return LaunchParameter of the Scheduled task if any, null otherwise + * @throws SchedulePersistenceException + * if fails + */ + public ScheduledTask getScheduledTask(UUID uuid) + throws SchedulePersistenceException; + + /** + * Create a Scheduled Task on persistence + * + * @param uuid + * the uuid which (will) identify the task on the SmartExecutor + * instance + * @param parameter + * @throws SchedulePersistenceException + * if fails + */ + public void addScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException; + + /** + * Release the Scheduled Task leaving it as orphan on persistence + * + * @param uuid + * the uuid which (will) identify the task on the SmartExecutor + * instance + * @throws SchedulePersistenceException + */ + public void releaseScheduledTask(UUID uuid) + throws SchedulePersistenceException; + + /** + * Remove from persistence the Scheduled Task. + * + * @param scheduledTask + * @param parameter + * @throws SchedulePersistenceException + */ + public void removeScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException; + + /** + * Remove from persistence the Scheduled Task. + * + * @param uuid + * the uuid which (will) identify the task on the SmartExecutor + * instance + * @param parameter + * @throws SchedulePersistenceException + */ + public void removeScheduledTask(UUID uuid) + throws SchedulePersistenceException; + + /** + * Release the Scheduled Task leaving it as orphan on persistence + * + * @param scheduledTask + * @throws SchedulePersistenceException + */ + public void releaseScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException; + + /** + * Reserve an orphan Scheduled tasks + * + * @param scheduledTask + * @throws SchedulePersistenceException + * if fails + */ + public void reserveScheduledTask(ScheduledTask scheduledTask) + throws SchedulePersistenceException; +} diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistenceFactory.java b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistenceFactory.java new file mode 100644 index 0000000..e38b9f4 --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/scheduledtask/ScheduledTaskPersistenceFactory.java @@ -0,0 +1,18 @@ +/** + * + */ +package org.gcube.vremanagement.executor.scheduledtask; + +import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory; + +/** + * @author Luca Frosini (ISTI - CNR) + * + */ +public class ScheduledTaskPersistenceFactory { + + public static ScheduledTaskPersistence getLaunchConfiguration() throws Exception { + return (ScheduledTaskPersistence) SmartExecutorPersistenceFactory.getPersistenceConnector(); + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/JobCompletedNotification.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/JobCompletedNotification.java index 5ed9449..ec9f3f0 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/JobCompletedNotification.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/JobCompletedNotification.java @@ -16,7 +16,7 @@ import org.gcube.vremanagement.executor.plugin.PluginStateNotification; * in the running plugin evolution. * Future use of this possibility are possibility to send an email to * the job owner, notify a registered process. Send a tweet and so on. - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ @Deprecated public class JobCompletedNotification extends PluginStateNotification { diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java index 425bb95..45ecfd4 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java @@ -11,12 +11,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.gcube.common.resources.gcore.GCoreEndpoint; -import org.gcube.smartgears.ContextProvider; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.Scheduling; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory; import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException; @@ -25,6 +21,9 @@ import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException; import org.gcube.vremanagement.executor.exception.SchedulerRemoveException; import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistenceFactory; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDataMap; @@ -43,7 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ public class SmartExecutorScheduler { @@ -136,8 +135,8 @@ public class SmartExecutorScheduler { triggerBuilder = getTriggerBuilderWithScheduling(uuid, scheduling); - if (scheduling.getFirtStartTime() != null && scheduling.getFirtStartTime().longValue()!=0) { - Date triggerStartTime = new Date(scheduling.getFirtStartTime()); + if (scheduling.getFirstStartTime() != null && scheduling.getFirstStartTime().longValue()!=0) { + Date triggerStartTime = new Date(scheduling.getFirstStartTime()); triggerBuilder.startAt(triggerStartTime); } else { triggerBuilder.startNow(); @@ -150,11 +149,11 @@ public class SmartExecutorScheduler { } try { - String runningInstanceID = ContextProvider.get().profile(GCoreEndpoint.class).id(); - logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ", - uuid.toString(), runningInstanceID, parameter); - ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration(); - stc.addScheduledTask(uuid, runningInstanceID, parameter); + ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration(); + ScheduledTask scheduledTask = new ScheduledTask(uuid, parameter); + logger.debug("Going to persist Scheduled Task {} : {} ", + scheduledTask); + stc.addScheduledTask(scheduledTask); } catch (Exception e) { logger.error("Unable to persist Scheduled Task {}", uuid.toString(), e.getCause()); } @@ -287,7 +286,7 @@ public class SmartExecutorScheduler { protected void removeFromPersistence(boolean global, UUID uuid, boolean remove) throws SchedulePersistenceException{ try { - ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration(); + ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration(); if(remove){ logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid); stc.removeScheduledTask(uuid); diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java index 0e70fd0..a846de5 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class SmartExecutorTask implements InterruptableJob { diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTaskListener.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTaskListener.java index afe578e..6d93d58 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTaskListener.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTaskListener.java @@ -8,7 +8,7 @@ import org.quartz.JobExecutionException; import org.quartz.JobListener; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class SmartExecutorTaskListener implements JobListener { diff --git a/src/test/java/org/gcube/vremanagement/executor/ScopedTest.java b/src/test/java/org/gcube/vremanagement/executor/ScopedTest.java new file mode 100644 index 0000000..4e9f50f --- /dev/null +++ b/src/test/java/org/gcube/vremanagement/executor/ScopedTest.java @@ -0,0 +1,90 @@ +/** + * + */ +package org.gcube.vremanagement.executor; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.gcube.common.authorization.client.Constants; +import org.gcube.common.authorization.client.exceptions.ObjectNotFound; +import org.gcube.common.authorization.library.AuthorizationEntry; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; +import org.gcube.common.scope.api.ScopeProvider; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) + * + */ +public class ScopedTest { + + private static final Logger logger = LoggerFactory.getLogger(ScopedTest.class); + + protected static final String PROPERTIES_FILENAME = "token.properties"; + + private static final String GCUBE_DEVNEXT_VARNAME = "GCUBE_DEVNEXT"; + public static final String GCUBE_DEVNEXT; + + private static final String GCUBE_DEVNEXT_NEXTNEXT_VARNAME = "GCUBE_DEVNEXT_NEXTNEXT"; + public static final String GCUBE_DEVNEXT_NEXTNEXT; + + public static final String GCUBE_DEVSEC_VARNAME = "GCUBE_DEVSEC"; + public static final String GCUBE_DEVSEC; + + public static final String GCUBE_DEVSEC_DEVVRE_VARNAME = "GCUBE_DEVSEC_DEVVRE"; + public static final String GCUBE_DEVSEC_DEVVRE; + + public static final String DEFAULT_TEST_SCOPE; + public static final String ALTERNATIVE_TEST_SCOPE; + + static { + Properties properties = new Properties(); + InputStream input = ScopedTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME); + + try { + // load the properties file + properties.load(input); + } catch (IOException e) { + throw new RuntimeException(e); + } + + GCUBE_DEVNEXT = properties.getProperty(GCUBE_DEVNEXT_VARNAME); + GCUBE_DEVNEXT_NEXTNEXT = properties.getProperty(GCUBE_DEVNEXT_NEXTNEXT_VARNAME); + + GCUBE_DEVSEC = properties.getProperty(GCUBE_DEVSEC_VARNAME); + GCUBE_DEVSEC_DEVVRE = properties.getProperty(GCUBE_DEVSEC_DEVVRE_VARNAME); + + DEFAULT_TEST_SCOPE = GCUBE_DEVNEXT; + ALTERNATIVE_TEST_SCOPE = GCUBE_DEVSEC; + } + + public static String getCurrentScope(String token) throws ObjectNotFound, Exception{ + AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); + String context = authorizationEntry.getContext(); + logger.info("Context of token {} is {}", token, context); + return context; + } + + + public static void setContext(String token) throws ObjectNotFound, Exception{ + SecurityTokenProvider.instance.set(token); + ScopeProvider.instance.set(getCurrentScope(token)); + } + + @BeforeClass + public static void beforeClass() throws Exception{ + setContext(DEFAULT_TEST_SCOPE); + } + + @AfterClass + public static void afterClass() throws Exception{ + SecurityTokenProvider.instance.reset(); + ScopeProvider.instance.reset(); + } + +} diff --git a/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java b/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java new file mode 100644 index 0000000..33325cb --- /dev/null +++ b/src/test/java/org/gcube/vremanagement/executor/SerializationTest.java @@ -0,0 +1,104 @@ +/** + * + */ +package org.gcube.vremanagement.executor; + +import java.io.IOException; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.acme.HelloWorldPluginDeclaration; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; +import org.gcube.vremanagement.executor.api.types.Scheduling; +import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; +import org.gcube.vremanagement.executor.json.ObjectMapperManager; +import org.gcube.vremanagement.executor.plugin.PluginState; +import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; +import org.gcube.vremanagement.executor.plugin.Ref; +import org.gcube.vremanagement.executor.plugin.RunOn; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author Luca Frosini (ISTI - CNR) + * + */ +public class SerializationTest extends ScopedTest { + + private static Logger logger = LoggerFactory.getLogger(SerializationTest.class); + + @Test + public void testScheduling() throws JsonGenerationException, JsonMappingException, IOException { + Map inputs = new HashMap(); + inputs.put("Hello", "World"); + long sleepTime = 10000; + inputs.put("sleepTime", sleepTime); + + Scheduling scheduling = new Scheduling(20); + scheduling.setGlobal(true); + + LaunchParameter launchParameter = new LaunchParameter("HelloWorld", inputs, scheduling); + logger.debug("{} to be Marshalled : {}", launchParameter.getClass().getSimpleName(), launchParameter); + + ObjectMapper objectMapper = new ObjectMapper(); + String launchParameterJSONString = objectMapper.writeValueAsString(launchParameter); + logger.debug("Marshalled : {}", launchParameterJSONString); + + LaunchParameter launchParameterUnmarshalled = objectMapper.readValue(launchParameterJSONString, LaunchParameter.class); + logger.debug("UnMarshalled : {}", launchParameterUnmarshalled); + } + + @Test + public void testScheduledTask() throws JsonGenerationException, JsonMappingException, IOException { + Map inputs = new HashMap(); + inputs.put("Hello", "World"); + long sleepTime = 10000; + inputs.put("sleepTime", sleepTime); + + Scheduling scheduling = new Scheduling(20); + scheduling.setGlobal(true); + + LaunchParameter launchParameter = new LaunchParameter("HelloWorld", inputs, scheduling); + UUID uuid = UUID.randomUUID(); + Ref hostingNode = new Ref(UUID.randomUUID().toString(), "localhost"); + Ref eService = new Ref(UUID.randomUUID().toString(), "localhost"); + RunOn runOn = new RunOn(hostingNode, eService); + ScheduledTask scheduledTask = new ScheduledTask(uuid, launchParameter, runOn); + logger.debug("{} to be Marshalled : {}", scheduledTask.getClass().getSimpleName(), launchParameter); + + + ObjectMapper mapper = ObjectMapperManager.getObjectMapper(); + String scheduledTaskJSONString = mapper.writeValueAsString(scheduledTask); + logger.debug("Marshalled : {}", scheduledTaskJSONString); + + ScheduledTask scheduledTaskUnmarshalled = mapper.readValue(scheduledTaskJSONString, ScheduledTask.class); + logger.debug("UnMarshalled : {}", scheduledTaskUnmarshalled); + + + + + } + + @Test + public void testPluginEvolutionState() throws JsonGenerationException, JsonMappingException, IOException, InvalidPluginStateEvolutionException { + + PluginStateEvolution pes = new PluginStateEvolution(UUID.randomUUID(), 1, Calendar.getInstance().getTimeInMillis(), new HelloWorldPluginDeclaration(), PluginState.RUNNING, 10); + logger.debug("{} to be Marshalled : {}", pes.getClass().getSimpleName(), pes); + + ObjectMapper objectMapper = new ObjectMapper(); + String scheduledTaskJSONString = objectMapper.writeValueAsString(pes); + logger.debug("Marshalled : {}", scheduledTaskJSONString); + + PluginStateEvolution pesUnmarshalled = objectMapper.readValue(scheduledTaskJSONString, PluginStateEvolution.class); + logger.debug("UnMarshalled : {}", pesUnmarshalled); + } + +} diff --git a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java index aff9c7e..ca89df6 100644 --- a/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/SmartExecutorImplTest.java @@ -15,7 +15,7 @@ import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.junit.Test; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class SmartExecutorImplTest { diff --git a/src/test/java/org/gcube/vremanagement/executor/TokenBasedTests.java b/src/test/java/org/gcube/vremanagement/executor/TokenBasedTests.java deleted file mode 100644 index 4b60f6c..0000000 --- a/src/test/java/org/gcube/vremanagement/executor/TokenBasedTests.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor; - -import org.gcube.common.authorization.library.provider.SecurityTokenProvider; -import org.gcube.common.scope.api.ScopeProvider; -import org.junit.Before; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class TokenBasedTests { - - @Before - public void before(){ - SecurityTokenProvider.instance.set("7c66c94c-7f6e-49cd-9a34-909cd3832f3e-98187548"); - ScopeProvider.instance.set("/gcube/devNext/NextNext"); - } - -} diff --git a/src/test/java/org/gcube/vremanagement/executor/configuration/ConfiguredTasksTest.java b/src/test/java/org/gcube/vremanagement/executor/configuration/ConfiguredTasksTest.java deleted file mode 100644 index 0e0f423..0000000 --- a/src/test/java/org/gcube/vremanagement/executor/configuration/ConfiguredTasksTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * - */ -package org.gcube.vremanagement.executor.configuration; - -import java.io.File; -import java.io.IOException; -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.acme.HelloWorldPlugin; -import org.acme.HelloWorldPluginDeclaration; -import org.gcube.vremanagement.executor.TokenBasedTests; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.api.types.Scheduling; -import org.gcube.vremanagement.executor.configuration.jsonbased.FileScheduledTaskConfiguration; -import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter; -import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; -import org.json.JSONException; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * - */ -public class ConfiguredTasksTest extends TokenBasedTests { - - private static Logger logger = LoggerFactory.getLogger(ConfiguredTasksTest.class); - - public static final String TEST = "test"; - - - public void checkOriginal(FileScheduledTaskConfiguration parser, int size){ - List configuredTasks = parser.getConfiguredTasks(); - Assert.assertEquals(size, configuredTasks.size()); - - JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(0); - Assert.assertEquals(HelloWorldPluginDeclaration.NAME, parameter.getPluginName()); - Map inputs = parameter.getInputs(); - Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); - Assert.assertEquals(1, inputs.get(TEST)); - Assert.assertEquals(null, parameter.getScheduling()); - - parameter = (JSONLaunchParameter) configuredTasks.get(1); - Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); - inputs = parameter.getInputs(); - Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); - Assert.assertEquals(2, inputs.get(TEST)); - Scheduling scheduling = parameter.getScheduling(); - Assert.assertEquals(null, scheduling.getCronExpression()); - Assert.assertEquals(new Integer(2000), scheduling.getDelay()); - Assert.assertEquals(2, scheduling.getSchedulingTimes()); - Assert.assertEquals(null, scheduling.getFirtStartTime()); - Assert.assertEquals(null, scheduling.getEndTime()); - Assert.assertEquals(false, scheduling.mustPreviousExecutionsCompleted()); - Assert.assertEquals(true, scheduling.getGlobal()); - - parameter = (JSONLaunchParameter) configuredTasks.get(2); - Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); - inputs = parameter.getInputs(); - Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); - Assert.assertEquals(3, inputs.get(TEST)); - Assert.assertEquals(null, parameter.getScheduling()); - Assert.assertEquals(true, scheduling.getGlobal()); - } - - public static final String TASK_FILE_PATH = "/src/test/resources/"; - - @Test - public void testLaunchConfiguredTask() throws SchedulePersistenceException, IOException, JSONException, ParseException { - File file = new File(".", TASK_FILE_PATH); - String location = file.getAbsolutePath(); - logger.trace("File location : {}", location); - FileScheduledTaskConfiguration parser = new FileScheduledTaskConfiguration(location); - - checkOriginal(parser, 3); - - Map inputs = new HashMap(); - inputs.put(HelloWorldPlugin.SLEEP_TIME, 1000); - inputs.put(TEST, 4); - JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs); - parser.addLaunch(added); - - parser = new FileScheduledTaskConfiguration(location); - checkOriginal(parser, 4); - - List configuredTasks = parser.getConfiguredTasks(); - JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(3); - Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME); - inputs = parameter.getInputs(); - Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME)); - Assert.assertEquals(4, inputs.get(TEST)); - Assert.assertEquals(null, parameter.getScheduling()); - - parser.releaseLaunch(parameter); - - parser = new FileScheduledTaskConfiguration(location); - checkOriginal(parser, 3); - - } -} diff --git a/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java b/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java index 1d2fe57..eaeaf92 100644 --- a/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConnectorTest.java @@ -9,23 +9,23 @@ import java.util.List; import java.util.UUID; import org.acme.HelloWorldPluginDeclaration; -import org.gcube.vremanagement.executor.TokenBasedTests; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration; -import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory; -import org.gcube.vremanagement.executor.persistence.couchdb.CouchDBPersistenceConnector; +import org.gcube.vremanagement.executor.ScopedTest; +import org.gcube.vremanagement.executor.persistence.orientdb.OrientDBPersistenceConnector; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence; +import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistenceFactory; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ -public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests { +public class SmartExecutorPersistenceConnectorTest extends ScopedTest { private static Logger logger = LoggerFactory.getLogger(SmartExecutorPersistenceConnectorTest.class); @@ -33,7 +33,7 @@ public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests { public void getConnectionTest() throws Exception { SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector(); Assert.assertNotNull(persistenceConnector); - Assert.assertEquals(CouchDBPersistenceConnector.class, persistenceConnector.getClass()); + Assert.assertEquals(OrientDBPersistenceConnector.class, persistenceConnector.getClass()); SmartExecutorPersistenceFactory.closePersistenceConnector(); } @@ -55,20 +55,25 @@ public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests { endTime = Calendar.getInstance().getTimeInMillis(); } - PluginState ps = persistenceConnector.getPluginInstanceState(uuid, 1).getPluginState(); + PluginStateEvolution pse = persistenceConnector.getPluginInstanceState(uuid, 1); + PluginState ps = pse.getPluginState(); Assert.assertEquals(states[i], ps); } - + + PluginStateEvolution pse = persistenceConnector.getLastPluginInstanceState(uuid); + PluginState ps = pse.getPluginState(); + Assert.assertEquals(states[states.length-1], ps); + SmartExecutorPersistenceFactory.closePersistenceConnector(); } @Test public void getAvailableScheduledTasksTest() throws Exception { - ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration(); + ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration(); Assert.assertNotNull(stc); - Assert.assertEquals(CouchDBPersistenceConnector.class, stc.getClass()); + Assert.assertEquals(OrientDBPersistenceConnector.class, stc.getClass()); - List lc = stc.getAvailableScheduledTasks(); + List lc = stc.getOrphanScheduledTasks(null); logger.debug("Available Scheduled Tasks : {}", lc); } diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java index c56421c..f690260 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/PluginManagerTest.java @@ -7,7 +7,7 @@ import org.junit.Assert; import org.junit.Test; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ public class PluginManagerTest { diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java index bde07e3..787d552 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePluginTest.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) * */ public class RunnablePluginTest { diff --git a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java index 1f8bac2..5a12b55 100644 --- a/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/pluginmanager/SmartExecutorSchedulerTest.java @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * @author Luca Frosini (ISTI - CNR) */ public class SmartExecutorSchedulerTest { diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 4f36cc8..4fa8510 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -1,3 +1,5 @@ + +