refs #579: Use IS to persist Scheduled Task configuration on smart-executor

https://support.d4science.org/issues/579

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@119097 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-09-28 15:44:12 +00:00
parent 5ca1a0b641
commit 68863d668d
14 changed files with 381 additions and 140 deletions

View File

@ -30,11 +30,6 @@
<arguments> <arguments>
</arguments> </arguments>
</buildCommand> </buildCommand>
<buildCommand>
<name>net.sf.eclipsecs.core.CheckstyleBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec> </buildSpec>
<natures> <natures>
<nature>org.eclipse.jem.workbench.JavaEMFNature</nature> <nature>org.eclipse.jem.workbench.JavaEMFNature</nature>
@ -43,6 +38,5 @@
<nature>org.eclipse.m2e.core.maven2Nature</nature> <nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.eclipse.wst.common.project.facet.core.nature</nature> <nature>org.eclipse.wst.common.project.facet.core.nature</nature>
<nature>org.eclipse.wst.jsdt.core.jsNature</nature> <nature>org.eclipse.wst.jsdt.core.jsNature</nature>
<nature>net.sf.eclipsecs.core.CheckstyleNature</nature>
</natures> </natures>
</projectDescription> </projectDescription>

View File

@ -84,6 +84,14 @@
<type>jar</type> <type>jar</type>
</dependency> </dependency>
<!-- This dependency will be used to implement Sweeper thread
<dependency>
<groupId>org.gcube.portlets.admin</groupId>
<artifactId>rmp-common-library</artifactId>
<version>[2.5.0-SNAPSHOT, 3.0.0-SNAPSHOT)</version>
</dependency>
-->
<!-- H2 libraries --> <!-- H2 libraries -->
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>

View File

@ -34,9 +34,7 @@ import org.gcube.smartgears.configuration.container.ContainerConfiguration;
import org.gcube.smartgears.context.application.ApplicationContext; import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent; import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler; import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.LaunchConfiguration;
import org.gcube.vremanagement.executor.configuration.jsonbased.FileLaunchConfiguration;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
@ -74,12 +72,12 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
protected static SmartExecutorScheduler smartExecutorScheduler; protected static SmartExecutorScheduler smartExecutorScheduler;
protected static LaunchConfiguration launchConfiguration; protected static ScheduledTaskConfiguration launchConfiguration;
/** /**
* @return the configuredTasks * @return the configuredTasks
*/ */
public static LaunchConfiguration getConfiguredTasks() { public static ScheduledTaskConfiguration getConfiguredTasks() {
return launchConfiguration; return launchConfiguration;
} }
@ -234,10 +232,10 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
properties.add(propertyVersionElement); properties.add(propertyVersionElement);
Map<String, String> pluginCapablities = pluginDeclaration.getSupportedCapabilities(); Map<String, String> pluginCapabilities = pluginDeclaration.getSupportedCapabilities();
for(String capabilityName : pluginCapablities.keySet()){ for(String capabilityName : pluginCapabilities.keySet()){
Property propertyElement = new Property(); Property propertyElement = new Property();
propertyElement.nameAndValue(capabilityName, pluginCapablities.get(capabilityName)); propertyElement.nameAndValue(capabilityName, pluginCapabilities.get(capabilityName));
properties.add(propertyElement); properties.add(propertyElement);
} }
accessPoints.add(accessPointElement); accessPoints.add(accessPointElement);
@ -349,28 +347,11 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
+ "Smart Executor Started Successfully\n" + "Smart Executor Started Successfully\n"
+ "-------------------------------------------------------"); + "-------------------------------------------------------");
// TODO
logger.trace("Going to Run Configured Tasks"); // TODO
try { // TODO Launch initializer thread
launchConfiguration = new FileLaunchConfiguration(); // TODO
List<LaunchParameter> configuredTaskList = launchConfiguration.getAvailableScheduledTasks(); // TODO
// TODO review this
// Get the launch that will start before the next time the scheduled
// internal thread will newly analyze the scheduling situation
SmartExecutorImpl smartExecutorImpl = new SmartExecutorImpl();
for(LaunchParameter parameter : configuredTaskList){
try {
smartExecutorImpl.launch(parameter);
} catch (Exception e) {
logger.error(String.format("Error launching %s", parameter), e);
}
}
} catch (Exception e) {
logger.error("Unable to parse Configured Tasks", e.getCause());
}
} }

View File

@ -1,18 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import org.gcube.vremanagement.executor.configuration.jsonbased.FileLaunchConfiguration;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class LaunchConfigurationFactory {
public static LaunchConfiguration getLaunchConfiguration() throws Exception {
return new FileLaunchConfiguration();
}
}

View File

@ -4,6 +4,7 @@
package org.gcube.vremanagement.executor.configuration; package org.gcube.vremanagement.executor.configuration;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
@ -12,7 +13,7 @@ import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* *
*/ */
public interface LaunchConfiguration { public interface ScheduledTaskConfiguration {
/** /**
* Retrieve from the #SmartExecutorPersistenceConnector the orphaned * Retrieve from the #SmartExecutorPersistenceConnector the orphaned
@ -23,31 +24,31 @@ public interface LaunchConfiguration {
public List<LaunchParameter> getAvailableScheduledTasks() throws SchedulePersistenceException; public List<LaunchParameter> getAvailableScheduledTasks() throws SchedulePersistenceException;
/** /**
* * @param uuid the uuid which identify the task on the SmartExecutor instance
* @param parameter * @param parameter
* @throws SchedulePersistenceException * @throws SchedulePersistenceException
*/ */
public void addScheduledTask(LaunchParameter parameter) throws SchedulePersistenceException; public void addScheduledTask(UUID uuid, String userID, LaunchParameter parameter) throws SchedulePersistenceException;
/** /**
* *
* @param uuid
* @throws SchedulePersistenceException
*/
public void reserveScheduledTask(UUID uuid) throws SchedulePersistenceException;
/**
* @param uuid the uuid which identify the task on the SmartExecutor instance
* @param parameter * @param parameter
* @throws SchedulePersistenceException * @throws SchedulePersistenceException
*/ */
public void reserveScheduledTask(LaunchParameter parameter) throws SchedulePersistenceException; public void removeScheduledTask(UUID uuid)throws SchedulePersistenceException;
/** /**
* *
* @param parameter * @param uuid
* @throws SchedulePersistenceException * @throws SchedulePersistenceException
*/ */
public void removeScheduledTask(LaunchParameter parameter)throws SchedulePersistenceException; public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException;
/**
*
* @param parameter
* @throws SchedulePersistenceException
*/
public void releaseScheduledTask(LaunchParameter parameter) throws SchedulePersistenceException;
} }

View File

@ -0,0 +1,18 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class ScheduledTaskConfigurationFactory {
public static ScheduledTaskConfiguration getLaunchConfiguration() throws Exception {
return (ScheduledTaskConfiguration) SmartExecutorPersistenceConnector.getPersistenceConnector();
}
}

View File

@ -10,12 +10,13 @@ import java.io.PrintWriter;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.gcube.vremanagement.executor.SmartExecutorInitalizator; import org.gcube.vremanagement.executor.SmartExecutorInitalizator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.LaunchConfiguration; import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.utils.IOUtility; import org.gcube.vremanagement.executor.utils.IOUtility;
import org.json.JSONArray; import org.json.JSONArray;
@ -28,12 +29,12 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* *
*/ */
public class FileLaunchConfiguration implements LaunchConfiguration { public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguration {
/** /**
* Logger * Logger
*/ */
private static Logger logger = LoggerFactory.getLogger(FileLaunchConfiguration.class); private static Logger logger = LoggerFactory.getLogger(FileScheduledTaskConfiguration.class);
protected String configurationFileLocation; protected String configurationFileLocation;
protected List<LaunchParameter> configuredTasks; protected List<LaunchParameter> configuredTasks;
@ -41,11 +42,11 @@ public class FileLaunchConfiguration implements LaunchConfiguration {
public static final String CONFIG_TASK_FILENAME = "definedTasks.json"; public static final String CONFIG_TASK_FILENAME = "definedTasks.json";
public FileLaunchConfiguration() throws Exception { public FileScheduledTaskConfiguration() throws Exception {
this(SmartExecutorInitalizator.ctx.persistence().location()); this(SmartExecutorInitalizator.ctx.persistence().location());
} }
public FileLaunchConfiguration(String location) throws IOException, JSONException { public FileScheduledTaskConfiguration(String location) throws IOException, JSONException {
this.configurationFileLocation = location; this.configurationFileLocation = location;
this.configuredTasks = new ArrayList<LaunchParameter>(); this.configuredTasks = new ArrayList<LaunchParameter>();
this.configuredTasks = retriveConfiguredTask(); this.configuredTasks = retriveConfiguredTask();
@ -102,7 +103,7 @@ public class FileLaunchConfiguration implements LaunchConfiguration {
} }
@Override @Override
public synchronized void addScheduledTask(LaunchParameter parameter) throws SchedulePersistenceException{ public synchronized void addScheduledTask(UUID uuid, String userID, LaunchParameter parameter) throws SchedulePersistenceException{
try { try {
addLaunch(new JSONLaunchParameter(parameter)); addLaunch(new JSONLaunchParameter(parameter));
} catch (ParseException e) { } catch (ParseException e) {
@ -149,9 +150,7 @@ public class FileLaunchConfiguration implements LaunchConfiguration {
this.configuredTasks = configuredTasks; this.configuredTasks = configuredTasks;
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#getAvailableScheduledTasks()
*/
@Override @Override
public List<LaunchParameter> getAvailableScheduledTasks() public List<LaunchParameter> getAvailableScheduledTasks()
throws SchedulePersistenceException { throws SchedulePersistenceException {
@ -159,31 +158,25 @@ public class FileLaunchConfiguration implements LaunchConfiguration {
return null; return null;
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#reserveScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void reserveScheduledTask(LaunchParameter parameter) public void reserveScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#removeScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void removeScheduledTask(LaunchParameter parameter) public void removeScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#releaseScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void releaseScheduledTask(LaunchParameter parameter) public void releaseScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -20,13 +20,22 @@ import org.json.JSONObject;
public class JSONLaunchParameter extends LaunchParameter { public class JSONLaunchParameter extends LaunchParameter {
public static final String PLUGIN_NAME = "pluginName"; public static final String PLUGIN_NAME = "pluginName";
public static final String PLUGIN_CAPABILITIES = "pluginCapabilites";
public static final String INPUTS = "inputs"; public static final String INPUTS = "inputs";
public static final String SCHEDULING = "scheduling"; public static final String SCHEDULING = "scheduling";
public static final String PERSIST = "persist"; public static final String PERSIST = "persist";
public static final String USED_BY = "usedBy";
protected JSONScheduling scheduling; protected JSONScheduling scheduling;
/**
* Contains the GCOREEndpoint (aka Running Instance) ID
*/
protected String usedBy;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private JSONLaunchParameter(){} private JSONLaunchParameter(){}
@ -34,22 +43,37 @@ public class JSONLaunchParameter extends LaunchParameter {
super(pluginName, inputs); super(pluginName, inputs);
} }
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs) {
super(pluginName, pluginCapabilities, inputs, false, null);
}
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, boolean persist) { public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, boolean persist) {
super(pluginName, inputs, persist); super(pluginName, inputs, persist);
} }
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, boolean persist) {
super(pluginName, pluginCapabilities, inputs, persist, null);
}
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException { public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
super(pluginName, inputs, scheduling); this(pluginName, null, inputs, false, scheduling);
this.scheduling = new JSONScheduling(scheduling); }
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
this(pluginName, pluginCapabilities, inputs, false, scheduling);
} }
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, boolean persist, Scheduling scheduling) throws ParseException { public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, boolean persist, Scheduling scheduling) throws ParseException {
super(pluginName, inputs, persist, scheduling); this(pluginName, null, inputs, persist, scheduling);
}
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, boolean persist, Scheduling scheduling) throws ParseException {
super(pluginName, pluginCapabilities, inputs, persist, scheduling);
this.scheduling = new JSONScheduling(scheduling); this.scheduling = new JSONScheduling(scheduling);
} }
public JSONLaunchParameter(LaunchParameter parameter) throws ParseException { public JSONLaunchParameter(LaunchParameter parameter) throws ParseException {
super(parameter.getPluginName(), parameter.getInputs(), parameter.isPersist()); super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.isPersist(), parameter.getScheduling());
this.scheduling = new JSONScheduling(parameter.getScheduling()); this.scheduling = new JSONScheduling(parameter.getScheduling());
} }
@ -58,6 +82,17 @@ public class JSONLaunchParameter extends LaunchParameter {
this.pluginName = jsonObject.getString(PLUGIN_NAME); this.pluginName = jsonObject.getString(PLUGIN_NAME);
this.pluginCapabilities = null;
if(jsonObject.has(PLUGIN_CAPABILITIES)){
this.pluginCapabilities = new HashMap<String, String>();
JSONObject capabilities = jsonObject.getJSONObject(PLUGIN_CAPABILITIES);
JSONArray names = capabilities.names();
for(int j=0; j<names.length(); j++){
String key = names.getString(j);
this.pluginCapabilities.put(key, capabilities.getString(key));
}
}
this.inputs = new HashMap<String, Object>(); this.inputs = new HashMap<String, Object>();
JSONObject inputsJsonObject = jsonObject.getJSONObject(INPUTS); JSONObject inputsJsonObject = jsonObject.getJSONObject(INPUTS);
JSONArray names = inputsJsonObject.names(); JSONArray names = inputsJsonObject.names();
@ -76,6 +111,10 @@ public class JSONLaunchParameter extends LaunchParameter {
this.persist = jsonObject.getBoolean(PERSIST); this.persist = jsonObject.getBoolean(PERSIST);
} }
if(jsonObject.has(USED_BY)){
this.usedBy = jsonObject.getString(USED_BY);
}
} }
/** /**
@ -96,6 +135,14 @@ public class JSONLaunchParameter extends LaunchParameter {
JSONObject obj = new JSONObject(); JSONObject obj = new JSONObject();
obj.put(PLUGIN_NAME, pluginName); obj.put(PLUGIN_NAME, pluginName);
if(pluginCapabilities!=null && !pluginCapabilities.isEmpty()){
JSONObject capabilities = new JSONObject();
for(String id : pluginCapabilities.keySet()){
capabilities.put(id, pluginCapabilities.get(id));
}
obj.put(PLUGIN_CAPABILITIES, capabilities);
}
JSONObject inputJsonObject = new JSONObject(); JSONObject inputJsonObject = new JSONObject();
for(String id : inputs.keySet()){ for(String id : inputs.keySet()){
inputJsonObject.put(id, inputs.get(id)); inputJsonObject.put(id, inputs.get(id));
@ -107,7 +154,11 @@ public class JSONLaunchParameter extends LaunchParameter {
} }
obj.put(PERSIST, true); obj.put(PERSIST, true);
if(usedBy!=null){
obj.put(USED_BY, usedBy);
}
return obj; return obj;
} }

View File

@ -0,0 +1,104 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration.jsonbased;
import org.json.JSONException;
import org.json.JSONObject;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class ScheduledTaskDurationInfo {
public static final String LAST = "last";
public static final String AVERAGE = "avg";
public static final String MIN = "min";
public static final String MAX = "max";
protected long last;
protected long avg;
protected long min;
protected long max;
public ScheduledTaskDurationInfo(long last, long avg, long min, long max){
this.last = last;
this.avg = avg;
this.min = min;
this.max = max;
}
public ScheduledTaskDurationInfo(JSONObject jsonObject) throws JSONException{
this.last = jsonObject.getLong(LAST);
this.avg = jsonObject.getLong(AVERAGE);
this.min = jsonObject.getLong(MIN);
this.max = jsonObject.getLong(MAX);
}
/**
* @return the last
*/
public long getLast() {
return last;
}
/**
* @param last the last to set
*/
public void setLast(long last) {
this.last = last;
}
/**
* @return the avg
*/
public long getAvg() {
return avg;
}
/**
* @param avg the avg to set
*/
public void setAvg(long avg) {
this.avg = avg;
}
/**
* @return the min
*/
public long getMin() {
return min;
}
/**
* @param min the min to set
*/
public void setMin(long min) {
this.min = min;
}
/**
* @return the max
*/
public long getMax() {
return max;
}
/**
* @param max the max to set
*/
public void setMax(long max) {
this.max = max;
}
public JSONObject toJSON() throws JSONException {
JSONObject obj = new JSONObject();
obj.put(LAST, last);
obj.put(AVERAGE, avg);
obj.put(MIN, min);
obj.put(MAX, max);
return obj;
}
}

View File

@ -3,10 +3,15 @@
*/ */
package org.gcube.vremanagement.executor.persistence; package org.gcube.vremanagement.executor.persistence;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URL; import java.net.URL;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ArrayNode;
@ -21,10 +26,13 @@ import org.ektorp.http.StdHttpClient.Builder;
import org.ektorp.impl.StdCouchDbConnector; import org.ektorp.impl.StdCouchDbConnector;
import org.ektorp.impl.StdCouchDbInstance; import org.ektorp.impl.StdCouchDbInstance;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.configuration.LaunchConfiguration; import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException; import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.plugin.PluginState;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -32,7 +40,7 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* *
*/ */
public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnector implements LaunchConfiguration { public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnector implements ScheduledTaskConfiguration {
private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class); private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class);
@ -40,6 +48,15 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
protected CouchDbConnector couchDbConnector; protected CouchDbConnector couchDbConnector;
protected static final String DB_NAME = "dbName"; protected static final String DB_NAME = "dbName";
protected static final String _ID_JSON_FIELD = "_id";
protected static final String _REV_JSON_FIELD = "_rev";
protected static final String TYPE_JSON_FIELD = "type";
protected static final String USED_BY_FIELD = "usedBy";
protected static final String EVOLUTION_TYPE = "evolution";
protected static final String SCHEDULED_TASK_TYPE = "scheduledTask";
public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception { public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception {
super(); super();
@ -65,12 +82,31 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
return result; return result;
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
couchDbConnector.getConnection().shutdown(); couchDbConnector.getConnection().shutdown();
} }
protected void updateItem(JSONObject obj) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(obj.toString());
couchDbConnector.update(node);
}
protected JSONObject getObjectByID(String id) throws Exception {
InputStream is = couchDbConnector.getAsStream(id);
StringWriter writer = new StringWriter();
IOUtils.copy(is, writer, "UTF-8");
JSONObject obj = new JSONObject(writer.toString());
return obj;
}
protected void createItem(String id, JSONObject obj) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(obj.toString());
createItem(node, id);
}
protected void createItem(JsonNode node, String id) throws Exception { protected void createItem(JsonNode node, String id) throws Exception {
if(id!=null && id.compareTo("")!=0){ if(id!=null && id.compareTo("")!=0){
couchDbConnector.create(id, node); couchDbConnector.create(id, node);
@ -79,6 +115,11 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
} }
} }
protected void deleteItem(String id) throws Exception {
JSONObject toDelete = getObjectByID(id);
toDelete.get("");
}
public final static String UUID_FIELD = "uuid"; public final static String UUID_FIELD = "uuid";
public final static String ITERATION_FIELD = "iteration"; public final static String ITERATION_FIELD = "iteration";
public final static String PLUGIN_NAME_FIELD = "pluginName"; public final static String PLUGIN_NAME_FIELD = "pluginName";
@ -98,6 +139,7 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
objectNode.put(TIMESTAMP_FIELD, timestamp); objectNode.put(TIMESTAMP_FIELD, timestamp);
objectNode.put(PLUGIN_NAME_FIELD, pluginName); objectNode.put(PLUGIN_NAME_FIELD, pluginName);
objectNode.put(STATE_FIELD, pluginState.toString()); objectNode.put(STATE_FIELD, pluginState.toString());
objectNode.put(TYPE_JSON_FIELD, EVOLUTION_TYPE);
createItem(objectNode, null); createItem(objectNode, null);
} }
@ -140,8 +182,14 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
} }
protected static final String MAP_REDUCE__DESIGN = "_design/"; protected static final String MAP_REDUCE__DESIGN = "_design/";
protected static final String PLUGIN_STATE = "pluginState";
protected static final String PLUGIN_STATE_DEPRECATED = "pluginStateDeprecated"; protected static final String PLUGIN_STATE_DOCUMENT = "pluginState";
protected static final String PLUGIN_STATE_VIEW = "pluginState";
protected static final String PLUGIN_STATE_DEPRECATED_VIEW = "pluginStateDeprecated";
protected static final String SCHEDULED_TASKS_DOCUMENT = "scheduledTasks";
protected static final String ACTIVE_VIEW = "active";
protected static final String ORPHAN_VIEW = "orphan";
/** /**
* @param uuid * @param uuid
@ -149,10 +197,10 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
* @return * @return
* @throws Exception * @throws Exception
*/ */
protected PluginState reallyQuery(String pluginName, UUID uuid, int iterationNumber) protected PluginState reallyQuery(String pluginName, UUID uuid, int iterationNumber)
throws Exception { throws Exception {
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE)); ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE_DOCUMENT));
ArrayNode startKey = new ObjectMapper().createArrayNode(); ArrayNode startKey = new ObjectMapper().createArrayNode();
@ -161,9 +209,9 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
if(pluginName!=null && pluginName.compareTo("")!=0){ if(pluginName!=null && pluginName.compareTo("")!=0){
startKey.add(pluginName); startKey.add(pluginName);
endKey.add(pluginName); endKey.add(pluginName);
query = query.viewName(PLUGIN_STATE); query = query.viewName(PLUGIN_STATE_VIEW);
}else{ }else{
query = query.viewName(PLUGIN_STATE_DEPRECATED); query = query.viewName(PLUGIN_STATE_DEPRECATED_VIEW);
} }
startKey.add(uuid.toString()); startKey.add(uuid.toString());
@ -183,7 +231,6 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
query.startKey(startKey); query.startKey(startKey);
query.endKey(endKey); query.endKey(endKey);
PluginState pluginState = null; PluginState pluginState = null;
ViewResult viewResult = query(query); ViewResult viewResult = query(query);
for (ViewResult.Row row : viewResult) { for (ViewResult.Row row : viewResult) {
@ -196,58 +243,88 @@ public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnect
if(pluginState==null){ if(pluginState==null){
throw new PluginStateNotRetrievedException(); throw new PluginStateNotRetrievedException();
} }
return pluginState; return pluginState;
} }
protected List<LaunchParameter> findOrphanedScheduledTasks(){
// TODO Implements after sweeper has been implemented
return null;
}
protected void freeOrphanedScheduledTasks(){
//List<LaunchParameter> orphaned = findOrphanedScheduledTasks();
// TODO
// TODO Implements after sweeper has been implemented
}
/* (non-Javadoc)
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#getAvailableScheduledTasks() /** {@inheritDoc} */
*/
@Override @Override
public List<LaunchParameter> getAvailableScheduledTasks() public List<LaunchParameter> getAvailableScheduledTasks()
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT));
return null; query = query.viewName(ORPHAN_VIEW);
List<LaunchParameter> ret = new ArrayList<LaunchParameter>();
ViewResult viewResult = query(query);
for (ViewResult.Row row : viewResult) {
//JsonNode key = row.getKeyAsNode();
JsonNode value = row.getValueAsNode();
try {
JSONObject obj = new JSONObject(value.toString());
JSONLaunchParameter jlp = new JSONLaunchParameter(obj);
ret.add(jlp);
} catch (ParseException | JSONException e) {
logger.error("Unable to parse result Row", e.getCause());
continue;
}
}
return ret;
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#addScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void addScheduledTask(LaunchParameter parameter) public void addScheduledTask(UUID uuid, String userID, LaunchParameter parameter)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub try {
JSONLaunchParameter jlp = new JSONLaunchParameter(parameter);
JSONObject obj = jlp.toJSON();
obj.append(TYPE_JSON_FIELD, EVOLUTION_TYPE);
obj.append(USED_BY_FIELD, userID);
createItem(uuid.toString(), obj);
} catch (Exception e) {
throw new SchedulePersistenceException(e.getCause());
}
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#reserveScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void reserveScheduledTask(LaunchParameter parameter) public void reserveScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
throw new UnsupportedOperationException();
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#removeScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void removeScheduledTask(LaunchParameter parameter) public void removeScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub try {
deleteItem(uuid.toString());
} catch (Exception e) {
throw new SchedulePersistenceException(e.getCause());
}
} }
/* (non-Javadoc) /** {@inheritDoc} */
* @see org.gcube.vremanagement.executor.configuration.LaunchConfiguration#releaseScheduledTask(org.gcube.vremanagement.executor.api.types.LaunchParameter)
*/
@Override @Override
public void releaseScheduledTask(LaunchParameter parameter) public void releaseScheduledTask(UUID uuid)
throws SchedulePersistenceException { throws SchedulePersistenceException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
throw new UnsupportedOperationException();
} }
} }

View File

@ -4,19 +4,22 @@
package org.gcube.vremanagement.executor.scheduler; package org.gcube.vremanagement.executor.scheduler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.vremanagement.executor.SmartExecutorInitalizator; import org.gcube.vremanagement.executor.SmartExecutorInitalizator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory;
import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException; import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.quartz.CronScheduleBuilder; import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder; import org.quartz.JobBuilder;
@ -125,6 +128,7 @@ public class SmartExecutorScheduler {
triggerBuilder.startAt(triggerStartTime); triggerBuilder.startAt(triggerStartTime);
} else { } else {
triggerBuilder.startNow(); triggerBuilder.startNow();
scheduling.setFirstStartTime(Calendar.getInstance().getTimeInMillis());
} }
if (scheduling.getEndTime() != null && scheduling.getEndTime().longValue()!=0) { if (scheduling.getEndTime() != null && scheduling.getEndTime().longValue()!=0) {
@ -133,10 +137,12 @@ public class SmartExecutorScheduler {
} }
try { try {
SmartExecutorInitalizator.getConfiguredTasks().addScheduledTask(parameter); ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
} catch (SchedulePersistenceException e) { String runningInstanceID = SmartExecutorInitalizator.getCtx().profile(GCoreEndpoint.class).id();
stc.addScheduledTask(uuid, runningInstanceID, parameter);
} catch (Exception e) {
logger.error("Unable to persist the scheduling", e.getCause()); logger.error("Unable to persist the scheduling", e.getCause());
} }
} else { } else {
triggerBuilder.startNow(); triggerBuilder.startNow();

View File

@ -14,7 +14,7 @@ import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration; import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling; import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.jsonbased.FileLaunchConfiguration; import org.gcube.vremanagement.executor.configuration.jsonbased.FileScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter; import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.json.JSONException; import org.json.JSONException;
@ -29,7 +29,7 @@ public class ConfiguredTasksTest {
public static final String TEST = "test"; public static final String TEST = "test";
public void checkOriginal(FileLaunchConfiguration parser, int size){ public void checkOriginal(FileScheduledTaskConfiguration parser, int size){
List<LaunchParameter> configuredTasks = parser.getConfiguredTasks(); List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
Assert.assertEquals(size, configuredTasks.size()); Assert.assertEquals(size, configuredTasks.size());
@ -70,7 +70,7 @@ public class ConfiguredTasksTest {
public void testLaunchConfiguredTask() throws SchedulePersistenceException, IOException, JSONException, ParseException { public void testLaunchConfiguredTask() throws SchedulePersistenceException, IOException, JSONException, ParseException {
String location = new File(".").getAbsolutePath(); String location = new File(".").getAbsolutePath();
location = location + "/src/test/resources"; location = location + "/src/test/resources";
FileLaunchConfiguration parser = new FileLaunchConfiguration(location); FileScheduledTaskConfiguration parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 3); checkOriginal(parser, 3);
@ -80,7 +80,7 @@ public class ConfiguredTasksTest {
JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs, true); JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs, true);
parser.addLaunch(added); parser.addLaunch(added);
parser = new FileLaunchConfiguration(location); parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 4); checkOriginal(parser, 4);
List<LaunchParameter> configuredTasks = parser.getConfiguredTasks(); List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
@ -94,7 +94,7 @@ public class ConfiguredTasksTest {
parser.releaseLaunch(parameter); parser.releaseLaunch(parameter);
parser = new FileLaunchConfiguration(location); parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 3); checkOriginal(parser, 3);
} }

View File

@ -3,9 +3,16 @@
*/ */
package org.gcube.vremanagement.executor.persistence; package org.gcube.vremanagement.executor.persistence;
import java.util.List;
import org.gcube.common.scope.api.ScopeProvider; import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -13,6 +20,8 @@ import org.junit.Test;
*/ */
public class SmartExecutorPersistenceConnectorTest { public class SmartExecutorPersistenceConnectorTest {
private static Logger logger = LoggerFactory.getLogger(SmartExecutorPersistenceConnectorTest.class);
public static final String[] SCOPES = new String[]{"/gcube", "/gcube/devsec"}; public static final String[] SCOPES = new String[]{"/gcube", "/gcube/devsec"};
public static final String GCUBE_SCOPE = SCOPES[0]; public static final String GCUBE_SCOPE = SCOPES[0];
public static final String GCUBE_DEVSEC_SCOPE = SCOPES[1]; public static final String GCUBE_DEVSEC_SCOPE = SCOPES[1];
@ -26,5 +35,19 @@ public class SmartExecutorPersistenceConnectorTest {
} }
@Test
public void getAvailableScheduledTasksTest() throws Exception {
ScopeProvider.instance.set(GCUBE_DEVSEC_SCOPE);
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
Assert.assertNotNull(stc);
Assert.assertEquals(CouchDBPersistenceConnector.class, stc.getClass());
List<LaunchParameter> lc = stc.getAvailableScheduledTasks();
logger.debug("Available Scheduled Tasks", lc);
}
} }

View File

@ -2,6 +2,9 @@
{ {
"pluginName" : "HelloWorld", "pluginName" : "HelloWorld",
"pluginFeatures" : {
"version" : "1.1.0"
},
"inputs" : { "inputs" : {
"sleepTime" : 1000, "sleepTime" : 1000,
"test" : 1 "test" : 1