Merged from private branch before release 4.3.0

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@141987 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-02-01 16:01:15 +00:00
parent 6253345147
commit 59dcdec5f6
51 changed files with 1205 additions and 1444 deletions

View File

@ -25,6 +25,7 @@
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="org.eclipse.jst.component.dependency" value="/WEB-INF/lib"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">

View File

@ -5,7 +5,13 @@
<wb-resource deploy-path="/" source-path="/src/main/webapp" tag="defaultRootSource"/>
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/java"/>
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/resources"/>
<dependent-module archiveName="smart-executor-api-1.4.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-api/smart-executor-api">
<dependent-module archiveName="smart-executor-client-1.4.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-client/smart-executor-client">
<dependency-type>uses</dependency-type>
</dependent-module>
<dependent-module archiveName="information-system-model-1.3.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/information-system-model/information-system-model">
<dependency-type>uses</dependency-type>
</dependent-module>
<dependent-module archiveName="smart-executor-api-1.5.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/smart-executor-api/smart-executor-api">
<dependency-type>uses</dependency-type>
</dependent-module>
<property name="java-output-path" value="/smart-executor/target/classes"/>

View File

@ -1,9 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<ReleaseNotes>
<Changeset component="org.gcube.vre-management.smart-executor.1.4.0" date="${buildDate}">
<Change></Change>
<Changeset component="org.gcube.vre-management.smart-executor.1.5.0" date="${buildDate}">
<Change></Change>
</Changeset>
<Changeset component="org.gcube.vre-management.smart-executor.1.4.0" date="2016-11-07">
<Change>SmartExecutor has been passed to Authorization 2.0 (refs #4944 #2112)</Change>
<Change>Provided to plugins the possibility to specify progress percentage (refs #440)</Change>
<Change>>Provided to plugins the possibility to define a custom notifier (refs #5089)</Change>
</Changeset>
<Changeset component="org.gcube.vre-management.smart-executor.1.3.0" date="2016-02-08">
<Change>Using Persistence (CouchDB) to save Scheduled Task configuration (refs #579)</Change>
<Change>Using Persistence (CouchDB) to save Scheduled Task configuration (refs #579)</Change>
<Change>Added Unscheduling feature for repetitive task (refs #521)</Change>
</Changeset>
<Changeset component="org.gcube.vre-management.smart-executor.1.2.0" date="2015-12-31">

View File

@ -1,7 +1,5 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>servicearchive</id>
<formats>
<format>tar.gz</format>

View File

@ -1,3 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<application mode='online'>
<group>${serviceClass}</group>
<name>${name}</name>

View File

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<Resource>
<ID></ID>
<Type>Service</Type>

View File

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<endpoints
xmlns="http://java.sun.com/xml/ns/jax-ws/ri/runtime"
version="2.0">

86
pom.xml
View File

@ -10,7 +10,7 @@
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<name>SmartExecutor</name>
<description>Smart Executor Service</description>
<packaging>war</packaging>
@ -21,8 +21,9 @@
<distroDirectory>${project.basedir}/distro</distroDirectory>
<serviceClass>VREManagement</serviceClass>
<wiki>https://wiki.gcube-system.org/gcube/SmartExecutor</wiki>
<jackson.version>2.2.3</jackson.version>
</properties>
<scm>
<connection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/${project.artifactId}</connection>
<developerConnection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/${project.artifactId}</developerConnection>
@ -40,11 +41,19 @@
</dependency>
<dependency>
<groupId>org.gcube.distribution</groupId>
<artifactId>maven-smartgears-bom</artifactId>
<artifactId>gcube-smartgears-bom</artifactId>
<version>LATEST</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.gcube.information-system</groupId>
<artifactId>information-system-bom</artifactId>
<version>LATEST</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
@ -59,7 +68,7 @@
<artifactId>discovery-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gcube.core</groupId>
<artifactId>common-smartgears</artifactId>
@ -74,7 +83,7 @@
<groupId>org.gcube.core</groupId>
<artifactId>common-smartgears-app</artifactId>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-client</artifactId>
@ -85,13 +94,44 @@
<artifactId>common-authorization</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-client</artifactId>
<version>[1.4.0-SNAPSHOT,2.0.0-SNAPSHOT]</version>
</dependency>
<dependency>
<groupId>org.gcube.information-system</groupId>
<artifactId>resource-registry-publisher</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gcube.information-system</groupId>
<artifactId>information-system-model</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-graphdb</artifactId>
</dependency>
<dependency>
<groupId>com.tinkerpop</groupId>
<artifactId>frames</artifactId>
</dependency>
<dependency>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-api</artifactId>
<version>[1.4.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
@ -108,45 +148,23 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
<type>jar</type>
</dependency>
<!-- CouchDB libraries -->
<dependency>
<groupId>org.ektorp</groupId>
<artifactId>org.ektorp</artifactId>
<version>1.3.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.7</version>
<type>jar</type>
</dependency>
<!-- END CouchDB libraries -->
<!-- Tests -->
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.acme</groupId>
<artifactId>HelloWorldPlugin</artifactId>
<version>[1.2.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
@ -180,7 +198,7 @@
<configuration>
<warName>smart-executor</warName>
<failOnMissingWebXml>false</failOnMissingWebXml>
<!-- webXml>src\main\webapp\WEB-INF\web.xml</webXml -->
<!-- webXml>src\main\webapp\WEB-INF\web.xml</webXml -->
</configuration>
</plugin>
</plugins>

View File

@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
/**
* Effective implementation of Executor
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
@WebService(
portName = "SmartExecutorPort",

View File

@ -1,11 +1,14 @@
package org.gcube.vremanagement.executor;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.gcube.common.authorization.client.Constants;
import org.gcube.common.authorization.library.AuthorizationEntry;
import org.gcube.common.authorization.library.ClientType;
import org.gcube.common.authorization.library.provider.ClientInfo;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.resources.gcore.Resource;
import org.gcube.common.resources.gcore.Resources;
@ -35,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SmartExecutorInitializator implements ApplicationManager {
@ -47,7 +50,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
public static final long JOIN_TIMEOUT = 1000;
public static String getScopeFromToken(){
public static String getCurrentScope(){
String token = SecurityTokenProvider.instance.get();
AuthorizationEntry authorizationEntry;
try {
@ -58,6 +61,39 @@ public class SmartExecutorInitializator implements ApplicationManager {
return authorizationEntry.getContext();
}
public static ClientInfo getClientInfo() {
String token = SecurityTokenProvider.instance.get();
AuthorizationEntry authorizationEntry;
try {
authorizationEntry = Constants.authorizationService().get(token);
} catch (Exception e) {
return new ClientInfo() {
/**
* Generated Serial Version UID
*/
private static final long serialVersionUID = 8311873203596762883L;
@Override
public ClientType getType() {
return ClientType.USER;
}
@Override
public List<String> getRoles() {
return new ArrayList<>();
}
@Override
public String getId() {
return "UNKNOWN";
}
};
}
return authorizationEntry.getClientInfo();
}
/**
* Publish the provided resource on all Service Scopes retrieved from
* Context
@ -72,7 +108,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
RegistryPublisher registryPublisher = RegistryPublisherFactory.create();
try {
logger.debug("Trying to publish to {}:\n{}", getScopeFromToken(), stringWriter);
logger.debug("Trying to publish to {}:\n{}", getCurrentScope(), stringWriter);
registryPublisher.create(resource);
} catch (Exception e) {
logger.error("The resource was not published", e);
@ -93,7 +129,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
RegistryPublisher registryPublisher = RegistryPublisherFactory.create();
String id = resource.id();
logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getScopeFromToken());
logger.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, getCurrentScope());
registryPublisher.remove(resource);
@ -224,11 +260,11 @@ public class SmartExecutorInitializator implements ApplicationManager {
for (ServiceEndpoint serviceEndpoint : serviceEndpoints) {
try {
logger.debug("Trying to unpublish the old ServiceEndpoint with ID {} from scope {}",
serviceEndpoint.id(), getScopeFromToken());
serviceEndpoint.id(), getCurrentScope());
unPublishResource(serviceEndpoint);
} catch(Exception e){
logger.debug("Exception tryng to unpublish the old ServiceEndpoint with ID {} from scope {}",
serviceEndpoint.id(), getScopeFromToken(), e);
serviceEndpoint.id(), getCurrentScope(), e);
}
}
}catch(Exception e){
@ -245,7 +281,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
*/
@Override
public void onInit() {
String scope = getScopeFromToken();
String scope = getCurrentScope();
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor is Starting on scope {}\n"
@ -299,7 +335,7 @@ public class SmartExecutorInitializator implements ApplicationManager {
"\n-------------------------------------------------------\n"
+ "Smart Executor is Stopping on scope {}\n"
+ "-------------------------------------------------------",
getScopeFromToken());
getCurrentScope());
SmartExecutorScheduler.getInstance().stopAll();
@ -310,14 +346,14 @@ public class SmartExecutorInitializator implements ApplicationManager {
} catch (Exception e) {
logger.error("Unable to correctly close {} for scope {}",
SmartExecutorPersistenceConnector.class.getSimpleName(),
getScopeFromToken(), e);
getCurrentScope(), e);
}
logger.trace(
"\n-------------------------------------------------------\n"
+ "Smart Executor Stopped Successfully on scope {}\n"
+ "-------------------------------------------------------",
getScopeFromToken());
getCurrentScope());
}
}

View File

@ -1,68 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import java.util.List;
import java.util.UUID;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public interface ScheduledTaskConfiguration {
public static final String SCOPE = "scope";
/**
* Retrieve from the #SmartExecutorPersistenceConnector the orphaned
* Scheduled tasks
* @return the list of orphaned Scheduled
* @throws SchedulePersistenceException if fails
*/
public List<LaunchParameter> getAvailableScheduledTasks() throws SchedulePersistenceException;
/**
* Return the Scheduled Task if any, null otherwise
* @param uuid which identify the Scheduled Task
* @return LaunchParameter of the Scheduled task if any, null otherwise
* @throws SchedulePersistenceException if fails
*/
public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException;
/**
* Create a Scheduled Task on persistence
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
* @param parameter
* @throws SchedulePersistenceException if fails
*/
public void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter) throws SchedulePersistenceException;
/**
* Reserve an orphan Scheduled tasks
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
* @throws SchedulePersistenceException if fails
*/
public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException;
/**
* Remove from persistence the Scheduled Task.
* @param uuid the uuid which (will) identify the task on the SmartExecutor instance
* @param parameter
* @throws SchedulePersistenceException
*/
public void removeScheduledTask(UUID uuid)throws SchedulePersistenceException;
/**
* Release the Scheduled Task leaving it as orphan on persistence
* @param uuid the uuid which (will) identify the task on the SmartExecutor
* instance
* @throws SchedulePersistenceException
*/
public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException;
}

View File

@ -1,18 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class ScheduledTaskConfigurationFactory {
public static ScheduledTaskConfiguration getLaunchConfiguration() throws Exception {
return (ScheduledTaskConfiguration) SmartExecutorPersistenceFactory.getPersistenceConnector();
}
}

View File

@ -1,193 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
import org.gcube.vremanagement.executor.utils.IOUtility;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class FileScheduledTaskConfiguration implements ScheduledTaskConfiguration {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(FileScheduledTaskConfiguration.class);
protected String configurationFileLocation;
protected List<LaunchParameter> configuredTasks;
public static final String CONFIG_TASK_FILENAME = "definedTasks.json";
public FileScheduledTaskConfiguration() throws Exception {
this(ContextProvider.get().persistence().location());
}
public FileScheduledTaskConfiguration(String location) throws IOException, JSONException {
this.configurationFileLocation = location;
this.configuredTasks = new ArrayList<LaunchParameter>();
this.configuredTasks = retriveConfiguredTask();
}
protected Scheduling getScheduling(JSONObject jsonObject)
throws JSONException, ParseException, ScopeNotMatchException {
return new JSONScheduling(jsonObject);
}
protected static String configurationFileName(String configurationFileLocation){
return configurationFileLocation + "/" + CONFIG_TASK_FILENAME;
}
public List<LaunchParameter> retriveConfiguredTask()
throws IOException, JSONException {
String configuredTasksDefinition = IOUtility.readFile(configurationFileName(configurationFileLocation));
List<LaunchParameter> tasks = new ArrayList<LaunchParameter>();
JSONArray jsonArray = new JSONArray(configuredTasksDefinition);
for(int i=0; i<jsonArray.length(); i++){
try {
JSONObject jsonObject = jsonArray.getJSONObject(i);
JSONLaunchParameter parameter = new JSONLaunchParameter(jsonObject);
tasks.add(parameter);
} catch (Exception e) {
logger.error("Error launching configurad Task", e.getCause());
}
}
return tasks;
}
protected void emptyConfigurationFile(String fileName)
throws FileNotFoundException {
PrintWriter writer = new PrintWriter(fileName);
writer.print("");
writer.close();
}
protected void writeOnConfigurationFile() throws JSONException, IOException, ParseException{
String fileName = configurationFileName(configurationFileLocation);
JSONArray jsonArray = new JSONArray();
for(LaunchParameter launchParameter : configuredTasks){
JSONLaunchParameter jsonLaunchParameter = new JSONLaunchParameter(launchParameter);
jsonArray.put(jsonLaunchParameter.toJSON());
}
String jsonArrayString = jsonArray.toString();
emptyConfigurationFile(fileName);
FileUtils.writeStringToFile(new File(fileName), jsonArrayString);
}
@Override
public synchronized void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter) throws SchedulePersistenceException{
try {
addLaunch(new JSONLaunchParameter(parameter));
} catch (ParseException e) {
throw new SchedulePersistenceException(e.getCause());
}
}
public synchronized void addLaunch(JSONLaunchParameter parameter) throws SchedulePersistenceException {
configuredTasks.add(parameter);
try {
writeOnConfigurationFile();
} catch (JSONException | IOException | ParseException e) {
throw new SchedulePersistenceException();
}
}
public void releaseLaunch(LaunchParameter parameter)
throws SchedulePersistenceException {
try {
removeLaunch(new JSONLaunchParameter(parameter));
} catch (JSONException | IOException | ParseException e) {
throw new SchedulePersistenceException(e.getCause());
}
}
protected synchronized void removeLaunch(JSONLaunchParameter parameter)
throws ParseException, JSONException, IOException {
configuredTasks.remove(parameter);
writeOnConfigurationFile();
}
/**
* @return the configuredTasks
*/
public List<LaunchParameter> getConfiguredTasks() {
return configuredTasks;
}
/**
* @param configuredTasks the configuredTasks to set
*/
public void setConfiguredTasks(List<LaunchParameter> configuredTasks) {
this.configuredTasks = configuredTasks;
}
/** {@inheritDoc} */
@Override
public List<LaunchParameter> getAvailableScheduledTasks()
throws SchedulePersistenceException {
// TODO Auto-generated method stub
return null;
}
/** {@inheritDoc} */
@Override
public void reserveScheduledTask(UUID uuid, String consumerID)
throws SchedulePersistenceException {
// TODO Auto-generated method stub
}
/** {@inheritDoc} */
@Override
public void removeScheduledTask(UUID uuid)
throws SchedulePersistenceException {
// TODO Auto-generated method stub
}
/** {@inheritDoc} */
@Override
public void releaseScheduledTask(UUID uuid)
throws SchedulePersistenceException {
// TODO Auto-generated method stub
}
/* (non-Javadoc)
* @see org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration#getScheduledTask(java.util.UUID)
*/
@Override
public LaunchParameter getScheduledTask(UUID uuid)
throws SchedulePersistenceException {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1,199 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class JSONLaunchParameter extends LaunchParameter {
private static Logger logger = LoggerFactory.getLogger(JSONLaunchParameter.class);
public static final String PLUGIN_NAME = "pluginName";
public static final String PLUGIN_CAPABILITIES = "pluginCapabilites";
public static final String INPUTS = "inputs";
public static final String SCHEDULING = "scheduling";
public static final String USED_BY = "usedBy";
public static final String SCOPE = "SCOPE";
/**
* Contains the GCOREEndpoint (aka Running Instance) ID
*/
protected String usedBy;
protected String scope;
@SuppressWarnings("unused")
private JSONLaunchParameter(){}
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs) {
super(pluginName, inputs);
}
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs) {
super(pluginName, pluginCapabilities, inputs);
this.scope = SmartExecutorInitializator.getScopeFromToken();
}
public JSONLaunchParameter(String pluginName, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
super(pluginName, inputs, scheduling);
this.scope = SmartExecutorInitializator.getScopeFromToken();
}
public JSONLaunchParameter(String pluginName, Map<String, String> pluginCapabilities, Map<String, Object> inputs, Scheduling scheduling) throws ParseException {
super(pluginName, pluginCapabilities, inputs, scheduling);
this.scope = SmartExecutorInitializator.getScopeFromToken();
}
public JSONLaunchParameter(LaunchParameter parameter) throws ParseException {
super(parameter.getPluginName(), parameter.getPluginCapabilities(), parameter.getInputs(), parameter.getScheduling());
this.scheduling = new JSONScheduling(parameter.getScheduling());
this.scope = SmartExecutorInitializator.getScopeFromToken();
}
public JSONLaunchParameter(JSONObject jsonObject) throws JSONException, ParseException, ScopeNotMatchException {
super();
this.pluginName = jsonObject.getString(PLUGIN_NAME);
this.pluginCapabilities = null;
if(jsonObject.has(PLUGIN_CAPABILITIES)){
this.pluginCapabilities = new HashMap<String, String>();
JSONObject capabilities = jsonObject.getJSONObject(PLUGIN_CAPABILITIES);
JSONArray names = capabilities.names();
for(int j=0; j<names.length(); j++){
String key = names.getString(j);
this.pluginCapabilities.put(key, capabilities.getString(key));
}
}
this.inputs = new HashMap<String, Object>();
JSONObject inputsJsonObject = jsonObject.getJSONObject(INPUTS);
JSONArray names = inputsJsonObject.names();
for(int j=0; j<names.length(); j++){
String key = names.getString(j);
this.inputs.put(key, inputsJsonObject.get(key));
}
if(jsonObject.has(SCHEDULING)){
JSONObject schedulingJsonObject = jsonObject.getJSONObject(SCHEDULING);
this.scheduling = new JSONScheduling(schedulingJsonObject);
}
if(jsonObject.has(USED_BY)){
this.usedBy = jsonObject.getString(USED_BY);
}
this.scope = SmartExecutorInitializator.getScopeFromToken();
if(jsonObject.has(SCOPE)){
String jsonScope = jsonObject.getString(SCOPE);
if(jsonScope.compareTo(scope)!=0){
String message = String.format("The current scope %s differs from the one provide in %s provided as argument %s.",
scope, JSONObject.class.getSimpleName(), jsonScope);
logger.error(message);
throw new ScopeNotMatchException(message);
}
}
}
/**
* @return the scheduling
*/
public JSONScheduling getScheduling() {
return (JSONScheduling) scheduling;
}
/**
* @param scheduling the scheduling to set
*/
public void setScheduling(JSONScheduling scheduling) {
this.scheduling = scheduling;
}
public JSONObject toJSON() throws JSONException {
JSONObject obj = new JSONObject();
obj.put(PLUGIN_NAME, pluginName);
if(pluginCapabilities!=null && !pluginCapabilities.isEmpty()){
JSONObject capabilities = new JSONObject();
for(String id : pluginCapabilities.keySet()){
capabilities.put(id, pluginCapabilities.get(id));
}
obj.put(PLUGIN_CAPABILITIES, capabilities);
}
JSONObject inputJsonObject = new JSONObject();
for(String id : inputs.keySet()){
inputJsonObject.put(id, inputs.get(id));
}
obj.put(INPUTS, inputJsonObject);
if(scheduling!=null){
obj.put(SCHEDULING, getScheduling().toJSON());
}
if(usedBy!=null){
obj.put(USED_BY, usedBy);
}
if(scope!=null){
obj.put(SCOPE, scope);
}
return obj;
}
public String toString(){
try {
return toJSON().toString();
} catch (JSONException e) {
return String.format("%s : %s", this.getClass().getSimpleName(),
super.toString());
}
}
/**
* @return the usedBy
*/
public String getUsedBy() {
return usedBy;
}
/**
* @param usedBy the usedBy to set
*/
public void setUsedBy(String usedBy) {
this.usedBy = usedBy;
}
/**
* @return the scope
*/
public String getScope() {
return scope;
}
/**
* @param scope the scope to set
*/
public void setScope(String scope) {
this.scope = scope;
}
}

View File

@ -1,123 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration.jsonbased;
import java.security.InvalidParameterException;
import java.text.ParseException;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
import org.json.JSONException;
import org.json.JSONObject;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
class JSONScheduling extends Scheduling {
protected static Logger logger = LoggerFactory.getLogger(JSONScheduling.class);
public static final String CRON_EXPRESSION = "cronExpression";
public static final String DELAY = "delay";
public static final String SCHEDULING_TIMES = "schedulingTimes";
public static final String FIRST_START_TIME = "firstStartTime";
public static final String END_TIME = "endTime";
public static final String PREVIUOS_EXECUTION_MUST_BE_COMPLETED = "previuosExecutionsMustBeCompleted";
public static final String GLOBAL = "global";
public JSONScheduling(Scheduling scheduling) throws ParseException {
super();
if(scheduling==null){
throw new InvalidParameterException("Scheduling null");
}
CronExpression cronExpression = null;
if(scheduling.getCronExpression()!=null){
cronExpression = new CronExpression(scheduling.getCronExpression());
}
init(cronExpression, scheduling.getDelay(),
scheduling.getSchedulingTimes(),
scheduling.getFirtStartTime(), scheduling.getEndTime(),
scheduling.mustPreviousExecutionsCompleted(), scheduling.getGlobal());
}
public JSONScheduling(JSONObject jsonObject) throws JSONException,
ParseException, ScopeNotMatchException {
super();
CronExpression cronExpression = null;
if (jsonObject.has(CRON_EXPRESSION)) {
String cronExpressionString = jsonObject.getString(CRON_EXPRESSION);
cronExpression = new CronExpression(cronExpressionString);
}
Integer delay = null;
if (jsonObject.has(DELAY)) {
delay = jsonObject.getInt(DELAY);
}
Integer schedulingTimes = 0;
if (jsonObject.has(SCHEDULING_TIMES)) {
schedulingTimes = jsonObject.getInt(SCHEDULING_TIMES);
}
Long firstStartTime = null;
if (jsonObject.has(FIRST_START_TIME)) {
firstStartTime = jsonObject.getLong(FIRST_START_TIME);
}
Long endTime = null;
if (jsonObject.has(END_TIME)) {
endTime = jsonObject.getLong(END_TIME);
}
Boolean previuosExecutionsMustBeCompleted = false;
if (jsonObject.has(PREVIUOS_EXECUTION_MUST_BE_COMPLETED)) {
previuosExecutionsMustBeCompleted = jsonObject
.getBoolean(PREVIUOS_EXECUTION_MUST_BE_COMPLETED);
}
Boolean global = false;
if (jsonObject.has(GLOBAL)) {
global = jsonObject
.getBoolean(PREVIUOS_EXECUTION_MUST_BE_COMPLETED);
}
init(cronExpression, delay, schedulingTimes.intValue(), firstStartTime,
endTime, previuosExecutionsMustBeCompleted, global);
}
public JSONObject toJSON() throws JSONException {
JSONObject obj = new JSONObject();
if (this.cronExpression != null) {
obj.put(CRON_EXPRESSION, this.cronExpression);
}
if (this.delay != null) {
obj.put(DELAY, this.delay.intValue());
}
obj.put(SCHEDULING_TIMES, this.schedulingTimes);
if (this.firstStartTime != null) {
obj.put(FIRST_START_TIME, this.firstStartTime);
}
if (this.endTime != null) {
obj.put(END_TIME, this.endTime);
}
obj.put(PREVIUOS_EXECUTION_MUST_BE_COMPLETED, this.previuosExecutionsMustBeCompleted);
obj.put(GLOBAL, global==null ? false : global.booleanValue());
return obj;
}
}

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class AlreadyInFinalStateException extends Exception {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class InvalidPluginStateEvolutionException extends Exception {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class MaxIterationRuntimeException extends RuntimeException {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class PluginStateNotRetrievedException extends Exception {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SchedulePersistenceException extends Exception {

View File

@ -6,7 +6,7 @@ package org.gcube.vremanagement.executor.exception;
import org.quartz.SchedulerException;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SchedulerNotFoundException extends SchedulerException {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class ScopeNotMatchException extends Exception {

View File

@ -4,7 +4,7 @@
package org.gcube.vremanagement.executor.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class StopRuntimeException extends RuntimeException {

View File

@ -0,0 +1,64 @@
/**
*
*/
package org.gcube.vremanagement.executor.json;
import org.gcube.common.authorization.library.provider.ClientInfo;
import org.gcube.common.authorization.library.provider.ContainerInfo;
import org.gcube.common.authorization.library.provider.ExternalServiceInfo;
import org.gcube.common.authorization.library.provider.ServiceInfo;
import org.gcube.common.authorization.library.provider.UserInfo;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class ObjectMapperManager {
protected static final ObjectMapper mapper;
static{
mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerSubtypes(ScheduledTask.class);
mapper.registerSubtypes(RunOn.class);
mapper.registerSubtypes(Ref.class);
mapper.registerSubtypes(PluginDeclaration.class);
mapper.registerSubtypes(PluginStateEvolution.class);
mapper.addMixIn(ClientInfo.class, ClientInfoMixIn.class);
//mapper.registerSubtypes(ClientInfo.class);
mapper.registerSubtypes(UserInfo.class);
mapper.registerSubtypes(ServiceInfo.class);
mapper.registerSubtypes(ExternalServiceInfo.class);
mapper.registerSubtypes(ContainerInfo.class);
}
/**
* @return
*/
public static ObjectMapper getObjectMapper() {
return mapper;
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY)
class ClientInfoMixIn {
}
}

View File

@ -3,9 +3,7 @@
*/
package org.gcube.vremanagement.executor.persistence;
import java.net.URI;
import java.security.Key;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -21,18 +19,20 @@ import org.gcube.resources.discovery.icclient.ICFactory;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
public class SmartExecutorPersistenceConfiguration {
public final String SERVICE_ENDPOINT_CATEGORY = "VREManagement";
//public final String SERVICE_ENDPOINT_NAME = "SmartExecutor";
public final String SERVICE_ENDPOINT_NAME = "SmartExecutorPersistenceConfiguration";
protected static final String PERSISTENCE_CLASS_NAME = "persistenceClassName";
protected static final String TARGET_SCOPE = "targetScope";
protected URI uri;
protected String url;
protected String username;
protected String password;
@ -46,9 +46,9 @@ public class SmartExecutorPersistenceConfiguration {
init();
}
public SmartExecutorPersistenceConfiguration(URI uri, String username, String password){
public SmartExecutorPersistenceConfiguration(String url, String username, String password){
init();
this.uri = uri;
this.url = url;
this.username = username;
this.password = password;
}
@ -62,15 +62,15 @@ public class SmartExecutorPersistenceConfiguration {
/**
* @return the uri
*/
public URI getUri() {
return uri;
public String getURL() {
return url;
}
/**
* @param uri the uri to set
*/
public void setUri(URI uri) {
this.uri = uri;
public void setURL(String url) {
this.url = url;
}
/**
@ -114,19 +114,23 @@ public class SmartExecutorPersistenceConfiguration {
return value;
}
protected ServiceEndpoint getServiceEndpoint(String serviceEndpointCategory, String serviceEndpointName, String persistenceClassName){
public ServiceEndpoint getServiceEndpoint(String serviceEndpointCategory, String serviceEndpointName, String persistenceClassName){
SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class);
query.addCondition(String.format("$resource/Profile/Category/text() eq '%s'", serviceEndpointCategory));
query.addCondition(String.format("$resource/Profile/Name/text() eq '%s'", serviceEndpointName));
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", PERSISTENCE_CLASS_NAME));
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", persistenceClassName));
query.addCondition(String.format("$resource/Profile/AccessPoint/Interface/Endpoint/@EntryName eq '%s'", persistenceClassName));
/*
* Used in old version
* query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", PERSISTENCE_CLASS_NAME));
* query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", persistenceClassName));
*/
query.setResult("$resource");
DiscoveryClient<ServiceEndpoint> client = ICFactory.clientFor(ServiceEndpoint.class);
List<ServiceEndpoint> serviceEndpoints = client.submit(query);
if(serviceEndpoints.size()>1){
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", TARGET_SCOPE));
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getScopeFromToken()));
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", SmartExecutorInitializator.getCurrentScope()));
serviceEndpoints = client.submit(query);
}
return serviceEndpoints.get(0);
@ -139,10 +143,8 @@ public class SmartExecutorPersistenceConfiguration {
protected void setValues(ServiceEndpoint serviceEndpoint, String persistenceClassName) throws Exception{
Group<AccessPoint> accessPoints = serviceEndpoint.profile().accessPoints();
for(AccessPoint accessPoint : accessPoints){
Collection<Property> properties = accessPoint.propertyMap().values();
if(properties.contains(new ServiceEndpoint.Property().nameAndValue(PERSISTENCE_CLASS_NAME, persistenceClassName))){
this.uri = new URI(accessPoint.address());
if(accessPoint.name().compareTo(persistenceClassName)==0){
this.url = accessPoint.address();
this.username = accessPoint.username();
String encryptedPassword = accessPoint.password();

View File

@ -10,12 +10,13 @@ import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
/**
* Model the connector which create or open the connection to DB.
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification {
public abstract class SmartExecutorPersistenceConnector extends PluginStateNotification implements ScheduledTaskPersistence {
public SmartExecutorPersistenceConnector() {
super(new HashMap<String, String>());
@ -35,8 +36,8 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber)
throws Exception;
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) throws Exception;
/**
* Retrieve the status of the iterationNumber of the last running/run {@link Plugin} which is/was identified
* by the UUID passed as parameter
@ -44,7 +45,6 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
* @return the actual/last {@link PluginState} of the Plugin
* @throws Exception if fails
*/
public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid)
throws Exception;
public abstract PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception;
}

View File

@ -7,12 +7,12 @@ import java.util.HashMap;
import java.util.Map;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.persistence.couchdb.CouchDBPersistenceConnector;
import org.gcube.vremanagement.executor.persistence.orientdb.OrientDBPersistenceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public abstract class SmartExecutorPersistenceFactory {
@ -42,7 +42,7 @@ public abstract class SmartExecutorPersistenceFactory {
* @return the persistenceConnector
*/
public static synchronized SmartExecutorPersistenceConnector getPersistenceConnector() throws Exception {
String scope = SmartExecutorInitializator.getScopeFromToken();
String scope = SmartExecutorInitializator.getCurrentScope();
SmartExecutorPersistenceConnector persistence =
getPersistenceConnector(scope);
@ -51,12 +51,12 @@ public abstract class SmartExecutorPersistenceFactory {
SmartExecutorPersistenceConnector.class.getSimpleName(),
scope, Map.class.getSimpleName());
String className = CouchDBPersistenceConnector.class.getSimpleName();
String className = OrientDBPersistenceConnector.class.getSimpleName();
SmartExecutorPersistenceConfiguration configuration =
new SmartExecutorPersistenceConfiguration(className);
persistence = new CouchDBPersistenceConnector(configuration);
persistenceConnectors.put(SmartExecutorInitializator.getScopeFromToken(),
persistence = new OrientDBPersistenceConnector(scope, configuration);
persistenceConnectors.put(SmartExecutorInitializator.getCurrentScope(),
persistence);
}
@ -64,7 +64,7 @@ public abstract class SmartExecutorPersistenceFactory {
}
public static synchronized void closePersistenceConnector() throws Exception {
String scope = SmartExecutorInitializator.getScopeFromToken();
String scope = SmartExecutorInitializator.getCurrentScope();
SmartExecutorPersistenceConnector persistence =
getPersistenceConnector(scope);
if(persistence!=null){

View File

@ -1,389 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.persistence.couchdb;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URL;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
import org.ektorp.CouchDbConnector;
import org.ektorp.CouchDbInstance;
import org.ektorp.DocumentNotFoundException;
import org.ektorp.UpdateConflictException;
import org.ektorp.ViewQuery;
import org.ektorp.ViewResult;
import org.ektorp.http.HttpClient;
import org.ektorp.http.StdHttpClient;
import org.ektorp.http.StdHttpClient.Builder;
import org.ektorp.impl.StdCouchDbConnector;
import org.ektorp.impl.StdCouchDbInstance;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.exception.ScopeNotMatchException;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class CouchDBPersistenceConnector extends SmartExecutorPersistenceConnector implements ScheduledTaskConfiguration {
private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class);
protected CouchDbInstance couchDbInstance;
protected CouchDbConnector couchDbConnector;
protected static final String DB_NAME = "dbName";
protected static final String _ID_JSON_FIELD = "_id";
protected static final String _REV_JSON_FIELD = "_rev";
protected static final String TYPE_JSON_FIELD = "type";
public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception {
super();
prepareConnection(configuration);
}
protected HttpClient initHttpClient(URL url, String username, String password){
Builder builder = new StdHttpClient.Builder().url(url);
builder.username(username).password(password);
HttpClient httpClient = builder.build();
return httpClient;
}
protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception {
logger.debug("Preparing Connection for {}", this.getClass().getSimpleName());
HttpClient httpClient = initHttpClient(configuration.getUri().toURL(), configuration.getUsername(), configuration.getPassword());
couchDbInstance = new StdCouchDbInstance(httpClient);
couchDbConnector = new StdCouchDbConnector(configuration.getProperty(DB_NAME), couchDbInstance);
}
protected ViewResult query(ViewQuery query){
ViewResult result = couchDbConnector.queryView(query);
return result;
}
@Override
public void close() throws Exception {
couchDbConnector.getConnection().shutdown();
}
protected void updateItem(JSONObject obj) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(obj.toString());
couchDbConnector.update(node);
}
protected JSONObject getObjectByID(String id) throws Exception {
InputStream is = couchDbConnector.getAsStream(id);
StringWriter writer = new StringWriter();
IOUtils.copy(is, writer, "UTF-8");
JSONObject obj = new JSONObject(writer.toString());
return obj;
}
protected void createItem(JSONObject obj, String id) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(obj.toString());
createItem(node, id);
}
protected void createItem(JsonNode node, String id) throws Exception {
if(id!=null && id.compareTo("")!=0){
couchDbConnector.create(id, node);
}else{
couchDbConnector.create(node);
}
}
protected void deleteItem(String id, String revision) throws UpdateConflictException, Exception {
if(revision==null || revision.compareTo("")==0){
JSONObject toDelete = getObjectByID(id);
revision = toDelete.getString(_REV_JSON_FIELD);
}
couchDbConnector.delete(id, revision);
}
/**
* {@inheritDoc}
*/
@Override
public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, Exception e) throws Exception {
ObjectNode objectNode = PluginStateEvolutionObjectNode.getObjectMapper(pluginStateEvolution);
createItem(objectNode, null);
}
/**
* {@inheritDoc}
*/
@Override
@Deprecated
public PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber)
throws Exception {
return reallyQuery(null, uuid, iterationNumber);
}
/**
* {@inheritDoc}
*/
@Override
@Deprecated
public PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception {
return reallyQuery(null, uuid, LAST);
}
protected final static int LAST = -1;
/* *
* {@inheritDoc}
* /
@Override
public PluginState getPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid, int iterationNumber)
throws Exception {
return reallyQuery(pluginDeclaration, uuid, iterationNumber);
}
/* *
* {@inheritDoc}
* /
@Override
public PluginState getLastPluginInstanceState(PluginDeclaration pluginDeclaration, UUID uuid) throws Exception {
return reallyQuery(pluginDeclaration, uuid, LAST);
}
*/
protected static final String MAP_REDUCE__DESIGN = "_design/";
protected static final String PLUGIN_STATE_DOCUMENT = "pluginState";
protected static final String PLUGIN_STATE = "pluginState";
protected static final String PLUGIN_STATE_VIEW_ABANDONED = "pluginStateABANDONED";
protected static final String SCHEDULED_TASKS_DOCUMENT = "scheduledTasks";
protected static final String ACTIVE_VIEW = "active";
protected static final String ORPHAN_VIEW = "orphan";
protected static final String USED_BY_FIELD = "usedBy";
protected static final String STOPPED = "stopped";
protected static final String RESERVED_BY = "reservedBy";
protected static final String PREVIOUSLY_USED_BY = "previouslyUsedBy";
protected static final String RESERVATION_TIMESTAMP = "reservationTimestamp";
protected static final String SCHEDULED_TASK_TYPE = "scheduledTask";
/**
* @param uuid
* @param iterationNumber -1 means LAST
* @return
* @throws Exception
*/
protected PluginStateEvolution reallyQuery(PluginDeclaration pluginDeclaration, UUID uuid, int iterationNumber)
throws Exception {
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, PLUGIN_STATE_DOCUMENT));
String scope = SmartExecutorInitializator.getScopeFromToken();
ArrayNode startKey = new ObjectMapper().createArrayNode();
startKey.add(scope);
ArrayNode endKey = new ObjectMapper().createArrayNode();
endKey.add(scope);
if(pluginDeclaration!=null && pluginDeclaration.getName()!=null && pluginDeclaration.getName().compareTo("")!=0){
startKey.add(pluginDeclaration.getName());
endKey.add(pluginDeclaration.getName());
query = query.viewName(PLUGIN_STATE_VIEW_ABANDONED);
}else{
query = query.viewName(PLUGIN_STATE);
}
startKey.add(uuid.toString());
endKey.add(uuid.toString());
if(iterationNumber != LAST){
startKey.add(iterationNumber);
endKey.add(iterationNumber);
startKey.add(1);
endKey.add("{}");
}else{
// Adding time interval
startKey.add(1);
endKey.add("{}");
}
query.startKey(startKey);
query.endKey(endKey);
query.reduce(false);
PluginStateEvolution pluginStateEvolution = null;
ViewResult viewResult = query(query);
for (ViewResult.Row row : viewResult) {
//JsonNode key = row.getKeyAsNode();
JsonNode value = row.getValueAsNode();
pluginStateEvolution = PluginStateEvolutionObjectNode.getPluginStateEvolution(value);
}
if(pluginStateEvolution==null){
throw new PluginStateNotRetrievedException();
}
return pluginStateEvolution;
}
protected List<LaunchParameter> findOrphanedScheduledTasks(){
// TODO Implements after sweeper has been implemented
return null;
}
protected void freeOrphanedScheduledTasks(){
//List<LaunchParameter> orphaned = findOrphanedScheduledTasks();
// TODO
// TODO Implements after sweeper has been implemented
}
/** {@inheritDoc} */
@Override
public List<LaunchParameter> getAvailableScheduledTasks()
throws SchedulePersistenceException {
ViewQuery query = new ViewQuery().designDocId(String.format("%s%s", MAP_REDUCE__DESIGN, SCHEDULED_TASKS_DOCUMENT));
query = query.viewName(ORPHAN_VIEW);
String scope = SmartExecutorInitializator.getScopeFromToken();
ArrayNode startKey = new ObjectMapper().createArrayNode();
startKey.add(scope);
ArrayNode endKey = new ObjectMapper().createArrayNode();
endKey.add(scope);
endKey.add("{}");
query.startKey(startKey);
query.endKey(endKey);
List<LaunchParameter> ret = new ArrayList<LaunchParameter>();
ViewResult viewResult = query(query);
for (ViewResult.Row row : viewResult) {
//JsonNode key = row.getKeyAsNode();
JsonNode value = row.getValueAsNode();
try {
JSONObject obj = new JSONObject(value.toString());
JSONLaunchParameter jlp = new JSONLaunchParameter(obj);
ret.add(jlp);
} catch (ParseException | JSONException e) {
logger.error("Unable to parse result Row", e.getCause());
continue;
} catch (ScopeNotMatchException ex){
logger.error("The result row does not macth the current Scope. This should indicate a query error.", ex.getCause());
continue;
}
}
return ret;
}
/** {@inheritDoc} */
@Override
public void addScheduledTask(UUID uuid, String consumerID, LaunchParameter parameter)
throws SchedulePersistenceException {
try {
JSONLaunchParameter jlp = new JSONLaunchParameter(parameter);
JSONObject obj = jlp.toJSON();
obj.append(TYPE_JSON_FIELD, SCHEDULED_TASK_TYPE);
obj.append(USED_BY_FIELD, consumerID);
obj.append(ScheduledTaskConfiguration.SCOPE, SmartExecutorInitializator.getScopeFromToken());
createItem(obj, uuid.toString());
} catch (Exception e) {
logger.error("Error Adding Scheduled Task UUID : {}, Consumer : {}, LaunchParameter : {}",
uuid, consumerID, parameter, e);
throw new SchedulePersistenceException(e.getCause());
}
}
/** {@inheritDoc} */
@Override
public void reserveScheduledTask(UUID uuid, String consumerID) throws SchedulePersistenceException {
try {
JSONObject obj = getObjectByID(uuid.toString());
// TODO change it
String previousConsumerID = obj.getString(USED_BY_FIELD);
obj.put(PREVIOUSLY_USED_BY, previousConsumerID);
obj.remove(USED_BY_FIELD);
obj.put(RESERVED_BY, consumerID);
obj.put(RESERVATION_TIMESTAMP, Calendar.getInstance().getTimeInMillis());
updateItem(obj);
} catch (Exception e) {
logger.error("Error Reserving Scheduled Task UUID : {} Consumer : {}",
uuid, consumerID, e);
throw new SchedulePersistenceException(e.getCause());
}
}
/** {@inheritDoc} */
@Override
public void removeScheduledTask(UUID uuid) throws SchedulePersistenceException {
try {
JSONObject obj = getObjectByID(uuid.toString());
obj.remove(USED_BY_FIELD);
obj.put(STOPPED, true);
updateItem(obj);
} catch (Exception e) {
logger.error("Error Removing Scheduled Task UUID : {}", uuid, e);
throw new SchedulePersistenceException(e.getCause());
}
}
/** {@inheritDoc} */
@Override
public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException {
try {
JSONObject obj = getObjectByID(uuid.toString());
obj.remove(USED_BY_FIELD);
updateItem(obj);
} catch (Exception e) {
logger.error("Error Releasing Scheduled Task UUID : {}", uuid, e);
throw new SchedulePersistenceException(e.getCause());
}
}
/** {@inheritDoc} */
@Override
public LaunchParameter getScheduledTask(UUID uuid) throws SchedulePersistenceException {
try {
JSONObject jsonObject = getObjectByID(uuid.toString());
return new JSONLaunchParameter(jsonObject);
} catch (DocumentNotFoundException e) {
return null;
} catch (Exception e) {
throw new SchedulePersistenceException(e.getCause());
}
}
}

View File

@ -1,169 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.persistence.couchdb;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class PluginStateEvolutionObjectNode {
protected static final String EVOLUTION_TYPE = "evolution";
public final static String UUID_FIELD = "uuid";
public final static String ITERATION_FIELD = "iteration";
public final static String PLUGIN_DECLARATION_FIELD = "pluginDeclaration";
public final static String PLUGIN_DECLARATION_NAME_FIELD = "name";
public final static String PLUGIN_DECLARATION_DESCRIPTION_FIELD = "description";
public final static String PLUGIN_DECLARATION_VERSION_FIELD = "version";
public final static String PLUGIN_DECLARATION_HOST_DISCOVERED_CAPABILITIES_FIELD = "hostDiscoveredCapabilities";
public final static String TIMESTAMP_FIELD = "timestamp";
public final static String STATE_FIELD = "state";
protected static final String RUN_ON_FIELD = "runOn";
public final static String GHN_HOSTNAME_FIELD = "ghnHostname";
public final static String GHN_ID_FIELD = "ghnID";
public final static String SCOPE_FIELD = "scope";
public final static String LOCALHOST = "localhost";
public final static String PERCENTAGE = "percentage";
protected static ObjectNode getRunOn(){
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
try {
GCoreEndpoint gCoreEndpoint = ContextProvider.get().profile(GCoreEndpoint.class);
objectNode.put(GHN_ID_FIELD, gCoreEndpoint.profile().ghnId());
objectNode.put(GHN_HOSTNAME_FIELD, ContextProvider.get().container().configuration().hostname());
}catch(Exception e){
objectNode.put(GHN_ID_FIELD, LOCALHOST + "_" + UUID.randomUUID());
objectNode.put(GHN_HOSTNAME_FIELD, LOCALHOST);
}
return objectNode;
}
protected static ObjectNode getPluginInfo(PluginDeclaration pluginDeclaration){
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put(PLUGIN_DECLARATION_NAME_FIELD, pluginDeclaration.getName());
objectNode.put(PLUGIN_DECLARATION_DESCRIPTION_FIELD, pluginDeclaration.getDescription());
objectNode.put(PLUGIN_DECLARATION_VERSION_FIELD, pluginDeclaration.getVersion());
Map<String, String> capabilites = pluginDeclaration.getSupportedCapabilities();
ObjectNode capabilitiesObjectNode = objectMapper.createObjectNode();
if(capabilites!=null){
for(String key : capabilites.keySet()){
capabilitiesObjectNode.put(key, capabilites.get(key));
}
}
objectNode.put(PLUGIN_DECLARATION_HOST_DISCOVERED_CAPABILITIES_FIELD, capabilitiesObjectNode);
return objectNode;
}
protected static PluginDeclaration getPluginDeclaration(final JsonNode jsonNode){
PluginDeclaration pluginDeclaration = new PluginDeclaration() {
@Override
public void init() throws Exception {}
@Override
public String getVersion() {
return jsonNode.get(PLUGIN_DECLARATION_VERSION_FIELD).asText();
}
@Override
public Map<String, String> getSupportedCapabilities() {
Map<String, String> capabilities = new HashMap<>();
JsonNode node = jsonNode.get(PLUGIN_DECLARATION_VERSION_FIELD);
Iterator<String> iterator = node.getFieldNames();
while(iterator.hasNext()) {
String key = iterator.next();
capabilities.put(key, node.get(key).asText());
}
return capabilities;
}
@Override
public Class<? extends Plugin<? extends PluginDeclaration>> getPluginImplementation() {
return null;
}
@Override
public String getName() {
return jsonNode.get(PLUGIN_DECLARATION_NAME_FIELD).asText();
}
@Override
public String getDescription() {
return jsonNode.get(PLUGIN_DECLARATION_DESCRIPTION_FIELD).asText();
}
};
return pluginDeclaration;
}
public static void addScope(ObjectNode objectNode){
objectNode.put(SCOPE_FIELD, SmartExecutorInitializator.getScopeFromToken());
}
public static ObjectNode getObjectMapper(PluginStateEvolution pluginStateEvolution){
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put(UUID_FIELD, pluginStateEvolution.getUuid().toString());
objectNode.put(ITERATION_FIELD, pluginStateEvolution.getIteration());
objectNode.put(TIMESTAMP_FIELD, pluginStateEvolution.getTimestamp());
objectNode.put(PLUGIN_DECLARATION_FIELD, getPluginInfo(pluginStateEvolution.getPluginDeclaration()));
objectNode.put(STATE_FIELD, pluginStateEvolution.getPluginState().toString());
objectNode.put(PERCENTAGE, pluginStateEvolution.getPercentage());
addScope(objectNode);
objectNode.put(CouchDBPersistenceConnector.TYPE_JSON_FIELD, EVOLUTION_TYPE);
try {
objectNode.put(RUN_ON_FIELD, getRunOn());
}catch(Exception e){
// TODO
}
return objectNode;
}
public static PluginStateEvolution getPluginStateEvolution(JsonNode jsonNode)
throws InvalidPluginStateEvolutionException{
UUID uuid = UUID.fromString(jsonNode.get(UUID_FIELD).asText());
int iteration = jsonNode.get(ITERATION_FIELD).asInt();
long timestamp = jsonNode.get(TIMESTAMP_FIELD).asInt();
PluginDeclaration pluginDeclaration = getPluginDeclaration(jsonNode.get(PLUGIN_DECLARATION_FIELD));
PluginState pluginState = PluginState.valueOf(jsonNode.get(STATE_FIELD).asText());
int percentage = jsonNode.get(PERCENTAGE).asInt();
return new PluginStateEvolution(uuid, iteration, timestamp, pluginDeclaration, pluginState, percentage);
}
}

View File

@ -0,0 +1,487 @@
/**
*
*/
package org.gcube.vremanagement.executor.persistence.orientdb;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.RunOn;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.orientechnologies.orient.core.db.OPartitionedDatabasePool;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class OrientDBPersistenceConnector extends
SmartExecutorPersistenceConnector {
private static final Logger logger = LoggerFactory
.getLogger(OrientDBPersistenceConnector.class);
protected final static int LAST = -1;
protected final String scope;
protected final String SCOPE = "scope";
protected final String UUID = "uuid";
protected final String ITERATION = "iteration";
protected final String TIMESTAMP = "timestamp";
protected final String RUN_ON = "runOn";
protected OPartitionedDatabasePool oPartitionedDatabasePool;
protected ObjectMapper mapper;
public OrientDBPersistenceConnector(String scope,
SmartExecutorPersistenceConfiguration configuration)
throws Exception {
super();
this.scope = scope;
prepareConnection(configuration);
this.mapper = ObjectMapperManager.getObjectMapper();
}
public OrientDBPersistenceConnector(
SmartExecutorPersistenceConfiguration configuration)
throws Exception {
this(SmartExecutorInitializator.getCurrentScope(), configuration);
}
protected void prepareConnection(
SmartExecutorPersistenceConfiguration configuration)
throws Exception {
logger.debug("Preparing Connection for {}", this.getClass()
.getSimpleName());
String url = configuration.getURL();
String username = configuration.getUsername();
String password = configuration.getPassword();
this.oPartitionedDatabasePool = new OPartitionedDatabasePool(url,
username, password);
}
protected void prepareObjectMapper() {
this.mapper = new ObjectMapper();
}
@Override
public void close() throws Exception {
oPartitionedDatabasePool.close();
}
protected PluginStateEvolution getPluginStateEvolution(UUID uuid,
int iterationNumber) throws Exception {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
String type = PluginStateEvolution.class.getSimpleName();
Map<String, Object> params = new HashMap<String, Object>();
params.put(UUID, uuid.toString());
params.put(SCOPE, scope);
OSQLSynchQuery<ODocument> query = null;
if (iterationNumber != LAST) {
query = new OSQLSynchQuery<ODocument>(
String.format(
"SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1",
type, SCOPE, SCOPE, UUID, UUID, ITERATION,
ITERATION, TIMESTAMP));
params.put(ITERATION, iterationNumber);
} else {
query = new OSQLSynchQuery<ODocument>(
String.format(
"SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC",
type, SCOPE, SCOPE, UUID, UUID, ITERATION));
}
List<ODocument> result = query.execute(params);
ODocument resDoc = null;
if (iterationNumber != LAST) {
resDoc = result.get(0);
} else {
// TODO manage better
long maxTimestamp = 0;
for (ODocument oDoc : result) {
long tm = (long) oDoc.field(TIMESTAMP);
if (maxTimestamp <= tm) {
maxTimestamp = tm;
resDoc = oDoc;
}
}
}
String json = resDoc.toJSON("class");
PluginStateEvolution pluginStateEvolution = mapper.readValue(json,
PluginStateEvolution.class);
return pluginStateEvolution;
} catch (Exception e) {
throw new PluginStateNotRetrievedException(e);
} finally {
db.close();
}
}
@Override
public PluginStateEvolution getPluginInstanceState(UUID uuid,
int iterationNumber) throws Exception {
return getPluginStateEvolution(uuid, iterationNumber);
}
@Override
public PluginStateEvolution getLastPluginInstanceState(UUID uuid)
throws Exception {
return getPluginStateEvolution(uuid, LAST);
}
@Override
public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution,
Exception exception) throws Exception {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
ODocument doc = new ODocument(
PluginStateEvolution.class.getSimpleName());
String json = mapper.writeValueAsString(pluginStateEvolution);
doc.fromJSON(json);
doc.field(SCOPE, scope);
doc.save();
db.commit();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw e;
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void addScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
ODocument doc = new ODocument(ScheduledTask.class.getSimpleName());
long timestamp = Calendar.getInstance().getTimeInMillis();
doc.field(TIMESTAMP, timestamp);
String json = mapper.writeValueAsString(scheduledTask);
doc.fromJSON(json);
doc.save();
db.commit();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
db.close();
}
}
protected boolean isOrphan(ScheduledTask scheduledTask) throws Exception {
try {
UUID uuid = scheduledTask.getUUID();
RunOn runOn = scheduledTask.getRunOn();
String address = runOn.getEService().getAddress();
SpecificEndpointDiscoveryFilter specificEndpointDiscoveryFilter = new SpecificEndpointDiscoveryFilter(
address);
String pluginName = scheduledTask.getLaunchParameter()
.getPluginName();
try {
SmartExecutorProxy proxy = ExecutorPlugin
.getExecutorProxy(pluginName, null, null,
specificEndpointDiscoveryFilter).build();
proxy.getStateEvolution(uuid.toString());
} catch (Exception e) {
// The instance was not found or the request failed.
// The scheduledTask is considered orphan
logger.trace("{} is considered orphan.", ObjectMapperManager
.getObjectMapper().writeValueAsString(scheduledTask));
return true;
}
} catch (Exception e) {
String string = ObjectMapperManager.getObjectMapper()
.writeValueAsString(scheduledTask);
logger.error("Error while checking orphanity of " + string
+ ". Considering as not orphan.");
}
return false;
}
@Override
public List<ScheduledTask> getOrphanScheduledTasks(
List<? extends PluginDeclaration> pluginDeclarations)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
String type = ScheduledTask.class.getSimpleName();
OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(
String.format("SELECT * FROM %s WHERE %s = :%s", type)
// TODO filter for task the instance can run
);
List<ODocument> result = query.execute();
List<ScheduledTask> scheduledTasks = new ArrayList<>();
for (ODocument doc : result) {
String json = doc.toJSON("class");
ScheduledTask scheduledTask = mapper.readValue(json,
ScheduledTask.class);
try {
if (isOrphan(scheduledTask)) {
scheduledTasks.add(scheduledTask);
}
} catch (Exception e) {
logger.error(
"An Exception occurred while evaluating if {} is orphan",
json);
}
}
return scheduledTasks;
} catch (Exception e) {
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
protected ODocument getScheduledTaskDocument(ODatabaseDocumentTx db,
UUID uuid) throws SchedulePersistenceException {
try {
String type = ScheduledTask.class.getSimpleName();
Map<String, Object> params = new HashMap<String, Object>();
params.put(UUID, uuid.toString());
OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(
String.format("SELECT FROM %s WHERE %s = :%s", type, UUID,
UUID));
List<ODocument> result = query.execute(params);
if (result.size() > 1) {
String error = String.format(
"Found more than one %s with UUID=%s. %s. %s.", type,
uuid.toString(),
"This is really strange and should not occur",
"Please contact the smart-executor administrator");
logger.error(error);
throw new SchedulePersistenceException(error);
} else if (result.size() == 0) {
String error = String.format("No %s with UUID=%s found.", type,
uuid.toString());
logger.error(error);
throw new SchedulePersistenceException(error);
}
return result.get(0);
} catch (Exception e) {
throw new SchedulePersistenceException(e);
}
}
@Override
public ScheduledTask getScheduledTask(UUID uuid)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
ODocument doc = getScheduledTaskDocument(db, uuid);
String json = doc.toJSON("class");
return mapper.readValue(json, ScheduledTask.class);
} catch (Exception e) {
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void reserveScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
UUID uuid = scheduledTask.getUUID();
ODocument doc = getScheduledTaskDocument(db, uuid);
long timestamp = doc.field(TIMESTAMP);
if (timestamp != scheduledTask.getTimestamp()) {
throw new SchedulePersistenceException(
"The ScheduledTask has been already reserved.");
}
RunOn runOn = ScheduledTask.generateRunOn();
String json = mapper.writeValueAsString(runOn);
doc.field(RUN_ON, json);
timestamp = Calendar.getInstance().getTimeInMillis();
doc.field(TIMESTAMP, timestamp);
doc.save();
db.commit();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void removeScheduledTask(UUID uuid)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
ODocument doc = getScheduledTaskDocument(db, uuid);
doc.delete();
db.commit();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void removeScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
UUID uuid = scheduledTask.getUUID();
ODocument doc = getScheduledTaskDocument(db, uuid);
long timestamp = doc.field(TIMESTAMP);
if (timestamp != scheduledTask.getTimestamp()) {
throw new SchedulePersistenceException(
"The ScheduledTask has been changed.");
}
doc.delete();
db.commit();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void releaseScheduledTask(UUID uuid)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
ODocument doc = getScheduledTaskDocument(db, uuid);
doc.removeField(RUN_ON);
doc.save();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
@Override
public void releaseScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException {
ODatabaseDocumentTx db = null;
try {
db = oPartitionedDatabasePool.acquire();
UUID uuid = scheduledTask.getUUID();
ODocument doc = getScheduledTaskDocument(db, uuid);
long timestamp = doc.field(TIMESTAMP);
if (timestamp != scheduledTask.getTimestamp()) {
throw new SchedulePersistenceException(
"The ScheduledTask has been changed.");
}
doc.removeField(RUN_ON);
doc.save();
} catch (Exception e) {
if (db != null) {
db.rollback();
}
throw new SchedulePersistenceException(e);
} finally {
if (db != null) {
db.close();
}
}
}
}

View File

@ -9,7 +9,7 @@ import org.gcube.vremanagement.executor.plugin.Plugin;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class PercentageSetterImpl<T extends Plugin<? extends PluginDeclaration>> implements PercentageSetter {

View File

@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory;
* This is a singleton class which discover on classpath the available plugins
* and map the plugin name to its implementation class.
* The plugin implementation class can be retrieved using its name.
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
@SuppressWarnings("deprecation")
public class PluginManager {

View File

@ -19,7 +19,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> implements Runnable {
@ -66,8 +66,9 @@ public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> imple
public void run(){
try {
setState(PluginState.RUNNING);
plugin.launch(inputs);
this.plugin.setUUID(uuid);
this.plugin.setIterationNumber(iterationNumber);
this.plugin.launch(inputs);
setState(PluginState.DONE);
} catch (AlreadyInFinalStateException e1) {
return;

View File

@ -0,0 +1,121 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduledtask;
import java.util.UUID;
import org.gcube.common.authorization.library.provider.ClientInfo;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint;
import org.gcube.common.resources.gcore.HostingNode;
import org.gcube.common.resources.gcore.utils.Group;
import org.gcube.smartgears.Constants;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property=Scheduling.CLASS_PROPERTY)
public class ScheduledTask {
protected Long timestamp;
protected UUID uuid;
protected LaunchParameter launchParameter;
protected String scope;
protected ClientInfo clientInfo;
protected RunOn runOn;
protected ScheduledTask(){}
public ScheduledTask(UUID uuid, LaunchParameter launchParameter) {
this(uuid, launchParameter, generateRunOn());
}
public ScheduledTask(UUID uuid, LaunchParameter launchParameter, RunOn runOn) {
this.uuid = uuid;
this.launchParameter = launchParameter;
this.scope = SmartExecutorInitializator.getCurrentScope();
this.clientInfo = SmartExecutorInitializator.getClientInfo();
this.runOn = runOn;
}
/**
* @return the timestamp
*/
public Long getTimestamp() {
return timestamp;
}
/**
* @return the uuid
*/
public UUID getUUID() {
return uuid;
}
/**
* @return the scope
*/
public String getScope() {
return scope;
}
/**
* @return the clientInfo
*/
public ClientInfo getClientInfo() {
return clientInfo;
}
/**
* @return the runOn
*/
public RunOn getRunOn() {
return runOn;
}
/**
* @param runOn the runOn to set
*/
public static RunOn generateRunOn() {
HostingNode hostingNode = ContextProvider.get().profile(HostingNode.class);
Ref hostingNodeRef = new Ref(hostingNode.id(), hostingNode.profile().description().name());
GCoreEndpoint gCoreEndpoint = ContextProvider.get().profile(GCoreEndpoint.class);
String address = "";
Group<Endpoint> endpoints = gCoreEndpoint.profile().endpoints();
for(Endpoint endpoint : endpoints){
if(endpoint.name().contains(Constants.remote_management)){
continue;
}else{
address = endpoint.uri().toString();
break;
}
}
Ref eServiceRef = new Ref(gCoreEndpoint.id(), address);
RunOn runOn = new RunOn(hostingNodeRef, eServiceRef);
return runOn;
}
/**
* @return the launchParameter
*/
public LaunchParameter getLaunchParameter(){
return launchParameter;
}
}

View File

@ -1,27 +1,21 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration.jsonbased;
package org.gcube.vremanagement.executor.scheduledtask;
import org.json.JSONException;
import org.json.JSONObject;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
* @author Luca Frosini (ISTI - CNR)
*/
public class ScheduledTaskDurationInfo {
public static final String LAST = "last";
public static final String AVERAGE = "avg";
public static final String MIN = "min";
public static final String MAX = "max";
protected long last;
protected long avg;
protected long min;
protected long max;
protected ScheduledTaskDurationInfo(){}
public ScheduledTaskDurationInfo(long last, long avg, long min, long max){
this.last = last;
this.avg = avg;
@ -29,13 +23,6 @@ public class ScheduledTaskDurationInfo {
this.max = max;
}
public ScheduledTaskDurationInfo(JSONObject jsonObject) throws JSONException{
this.last = jsonObject.getLong(LAST);
this.avg = jsonObject.getLong(AVERAGE);
this.min = jsonObject.getLong(MIN);
this.max = jsonObject.getLong(MAX);
}
/**
* @return the last
*/
@ -92,13 +79,4 @@ public class ScheduledTaskDurationInfo {
this.max = max;
}
public JSONObject toJSON() throws JSONException {
JSONObject obj = new JSONObject();
obj.put(LAST, last);
obj.put(AVERAGE, avg);
obj.put(MIN, min);
obj.put(MAX, max);
return obj;
}
}

View File

@ -0,0 +1,107 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduledtask;
import java.util.List;
import java.util.UUID;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public interface ScheduledTaskPersistence {
public static final String SCOPE = "scope";
/**
* Retrieve from the #SmartExecutorPersistenceConnector the orphaned
* Scheduled tasks
* @param pluginDeclarations
* @return the list of orphaned Scheduled
* @throws SchedulePersistenceException
* if fails
*/
public List<ScheduledTask> getOrphanScheduledTasks(
List<? extends PluginDeclaration> pluginDeclarations)
throws SchedulePersistenceException;
/**
* Return the Scheduled Task if any, null otherwise
*
* @param uuid
* which identify the Scheduled Task
* @return LaunchParameter of the Scheduled task if any, null otherwise
* @throws SchedulePersistenceException
* if fails
*/
public ScheduledTask getScheduledTask(UUID uuid)
throws SchedulePersistenceException;
/**
* Create a Scheduled Task on persistence
*
* @param uuid
* the uuid which (will) identify the task on the SmartExecutor
* instance
* @param parameter
* @throws SchedulePersistenceException
* if fails
*/
public void addScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException;
/**
* Release the Scheduled Task leaving it as orphan on persistence
*
* @param uuid
* the uuid which (will) identify the task on the SmartExecutor
* instance
* @throws SchedulePersistenceException
*/
public void releaseScheduledTask(UUID uuid)
throws SchedulePersistenceException;
/**
* Remove from persistence the Scheduled Task.
*
* @param scheduledTask
* @param parameter
* @throws SchedulePersistenceException
*/
public void removeScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException;
/**
* Remove from persistence the Scheduled Task.
*
* @param uuid
* the uuid which (will) identify the task on the SmartExecutor
* instance
* @param parameter
* @throws SchedulePersistenceException
*/
public void removeScheduledTask(UUID uuid)
throws SchedulePersistenceException;
/**
* Release the Scheduled Task leaving it as orphan on persistence
*
* @param scheduledTask
* @throws SchedulePersistenceException
*/
public void releaseScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException;
/**
* Reserve an orphan Scheduled tasks
*
* @param scheduledTask
* @throws SchedulePersistenceException
* if fails
*/
public void reserveScheduledTask(ScheduledTask scheduledTask)
throws SchedulePersistenceException;
}

View File

@ -0,0 +1,18 @@
/**
*
*/
package org.gcube.vremanagement.executor.scheduledtask;
import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class ScheduledTaskPersistenceFactory {
public static ScheduledTaskPersistence getLaunchConfiguration() throws Exception {
return (ScheduledTaskPersistence) SmartExecutorPersistenceFactory.getPersistenceConnector();
}
}

View File

@ -16,7 +16,7 @@ import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
* in the running plugin evolution.
* Future use of this possibility are possibility to send an email to
* the job owner, notify a registered process. Send a tweet and so on.
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
@Deprecated
public class JobCompletedNotification extends PluginStateNotification {

View File

@ -11,12 +11,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.smartgears.ContextProvider;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory;
import org.gcube.vremanagement.executor.exception.InputsNullException;
import org.gcube.vremanagement.executor.exception.LaunchException;
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
@ -25,6 +21,9 @@ import org.gcube.vremanagement.executor.exception.SchedulerNotFoundException;
import org.gcube.vremanagement.executor.exception.SchedulerRemoveException;
import org.gcube.vremanagement.executor.exception.UnableToInterruptTaskException;
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistenceFactory;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
@ -43,7 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
public class SmartExecutorScheduler {
@ -136,8 +135,8 @@ public class SmartExecutorScheduler {
triggerBuilder = getTriggerBuilderWithScheduling(uuid, scheduling);
if (scheduling.getFirtStartTime() != null && scheduling.getFirtStartTime().longValue()!=0) {
Date triggerStartTime = new Date(scheduling.getFirtStartTime());
if (scheduling.getFirstStartTime() != null && scheduling.getFirstStartTime().longValue()!=0) {
Date triggerStartTime = new Date(scheduling.getFirstStartTime());
triggerBuilder.startAt(triggerStartTime);
} else {
triggerBuilder.startNow();
@ -150,11 +149,11 @@ public class SmartExecutorScheduler {
}
try {
String runningInstanceID = ContextProvider.get().profile(GCoreEndpoint.class).id();
logger.debug("Going to persist Scheduled Task {} which will be assigned to Running Instance {}. LaunchParameters : {} ",
uuid.toString(), runningInstanceID, parameter);
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
stc.addScheduledTask(uuid, runningInstanceID, parameter);
ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration();
ScheduledTask scheduledTask = new ScheduledTask(uuid, parameter);
logger.debug("Going to persist Scheduled Task {} : {} ",
scheduledTask);
stc.addScheduledTask(scheduledTask);
} catch (Exception e) {
logger.error("Unable to persist Scheduled Task {}", uuid.toString(), e.getCause());
}
@ -287,7 +286,7 @@ public class SmartExecutorScheduler {
protected void removeFromPersistence(boolean global, UUID uuid, boolean remove) throws SchedulePersistenceException{
try {
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration();
if(remove){
logger.debug("Going to remove the SmartExecutor Scheduled Task {} from global scheduling", uuid);
stc.removeScheduledTask(uuid);

View File

@ -37,7 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SmartExecutorTask implements InterruptableJob {

View File

@ -8,7 +8,7 @@ import org.quartz.JobExecutionException;
import org.quartz.JobListener;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SmartExecutorTaskListener implements JobListener {

View File

@ -0,0 +1,90 @@
/**
*
*/
package org.gcube.vremanagement.executor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.gcube.common.authorization.client.Constants;
import org.gcube.common.authorization.client.exceptions.ObjectNotFound;
import org.gcube.common.authorization.library.AuthorizationEntry;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class ScopedTest {
private static final Logger logger = LoggerFactory.getLogger(ScopedTest.class);
protected static final String PROPERTIES_FILENAME = "token.properties";
private static final String GCUBE_DEVNEXT_VARNAME = "GCUBE_DEVNEXT";
public static final String GCUBE_DEVNEXT;
private static final String GCUBE_DEVNEXT_NEXTNEXT_VARNAME = "GCUBE_DEVNEXT_NEXTNEXT";
public static final String GCUBE_DEVNEXT_NEXTNEXT;
public static final String GCUBE_DEVSEC_VARNAME = "GCUBE_DEVSEC";
public static final String GCUBE_DEVSEC;
public static final String GCUBE_DEVSEC_DEVVRE_VARNAME = "GCUBE_DEVSEC_DEVVRE";
public static final String GCUBE_DEVSEC_DEVVRE;
public static final String DEFAULT_TEST_SCOPE;
public static final String ALTERNATIVE_TEST_SCOPE;
static {
Properties properties = new Properties();
InputStream input = ScopedTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME);
try {
// load the properties file
properties.load(input);
} catch (IOException e) {
throw new RuntimeException(e);
}
GCUBE_DEVNEXT = properties.getProperty(GCUBE_DEVNEXT_VARNAME);
GCUBE_DEVNEXT_NEXTNEXT = properties.getProperty(GCUBE_DEVNEXT_NEXTNEXT_VARNAME);
GCUBE_DEVSEC = properties.getProperty(GCUBE_DEVSEC_VARNAME);
GCUBE_DEVSEC_DEVVRE = properties.getProperty(GCUBE_DEVSEC_DEVVRE_VARNAME);
DEFAULT_TEST_SCOPE = GCUBE_DEVNEXT;
ALTERNATIVE_TEST_SCOPE = GCUBE_DEVSEC;
}
public static String getCurrentScope(String token) throws ObjectNotFound, Exception{
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
String context = authorizationEntry.getContext();
logger.info("Context of token {} is {}", token, context);
return context;
}
public static void setContext(String token) throws ObjectNotFound, Exception{
SecurityTokenProvider.instance.set(token);
ScopeProvider.instance.set(getCurrentScope(token));
}
@BeforeClass
public static void beforeClass() throws Exception{
setContext(DEFAULT_TEST_SCOPE);
}
@AfterClass
public static void afterClass() throws Exception{
SecurityTokenProvider.instance.reset();
ScopeProvider.instance.reset();
}
}

View File

@ -0,0 +1,104 @@
/**
*
*/
package org.gcube.vremanagement.executor;
import java.io.IOException;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException;
import org.gcube.vremanagement.executor.json.ObjectMapperManager;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.Ref;
import org.gcube.vremanagement.executor.plugin.RunOn;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SerializationTest extends ScopedTest {
private static Logger logger = LoggerFactory.getLogger(SerializationTest.class);
@Test
public void testScheduling() throws JsonGenerationException, JsonMappingException, IOException {
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("Hello", "World");
long sleepTime = 10000;
inputs.put("sleepTime", sleepTime);
Scheduling scheduling = new Scheduling(20);
scheduling.setGlobal(true);
LaunchParameter launchParameter = new LaunchParameter("HelloWorld", inputs, scheduling);
logger.debug("{} to be Marshalled : {}", launchParameter.getClass().getSimpleName(), launchParameter);
ObjectMapper objectMapper = new ObjectMapper();
String launchParameterJSONString = objectMapper.writeValueAsString(launchParameter);
logger.debug("Marshalled : {}", launchParameterJSONString);
LaunchParameter launchParameterUnmarshalled = objectMapper.readValue(launchParameterJSONString, LaunchParameter.class);
logger.debug("UnMarshalled : {}", launchParameterUnmarshalled);
}
@Test
public void testScheduledTask() throws JsonGenerationException, JsonMappingException, IOException {
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("Hello", "World");
long sleepTime = 10000;
inputs.put("sleepTime", sleepTime);
Scheduling scheduling = new Scheduling(20);
scheduling.setGlobal(true);
LaunchParameter launchParameter = new LaunchParameter("HelloWorld", inputs, scheduling);
UUID uuid = UUID.randomUUID();
Ref hostingNode = new Ref(UUID.randomUUID().toString(), "localhost");
Ref eService = new Ref(UUID.randomUUID().toString(), "localhost");
RunOn runOn = new RunOn(hostingNode, eService);
ScheduledTask scheduledTask = new ScheduledTask(uuid, launchParameter, runOn);
logger.debug("{} to be Marshalled : {}", scheduledTask.getClass().getSimpleName(), launchParameter);
ObjectMapper mapper = ObjectMapperManager.getObjectMapper();
String scheduledTaskJSONString = mapper.writeValueAsString(scheduledTask);
logger.debug("Marshalled : {}", scheduledTaskJSONString);
ScheduledTask scheduledTaskUnmarshalled = mapper.readValue(scheduledTaskJSONString, ScheduledTask.class);
logger.debug("UnMarshalled : {}", scheduledTaskUnmarshalled);
}
@Test
public void testPluginEvolutionState() throws JsonGenerationException, JsonMappingException, IOException, InvalidPluginStateEvolutionException {
PluginStateEvolution pes = new PluginStateEvolution(UUID.randomUUID(), 1, Calendar.getInstance().getTimeInMillis(), new HelloWorldPluginDeclaration(), PluginState.RUNNING, 10);
logger.debug("{} to be Marshalled : {}", pes.getClass().getSimpleName(), pes);
ObjectMapper objectMapper = new ObjectMapper();
String scheduledTaskJSONString = objectMapper.writeValueAsString(pes);
logger.debug("Marshalled : {}", scheduledTaskJSONString);
PluginStateEvolution pesUnmarshalled = objectMapper.readValue(scheduledTaskJSONString, PluginStateEvolution.class);
logger.debug("UnMarshalled : {}", pesUnmarshalled);
}
}

View File

@ -15,7 +15,7 @@ import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
import org.junit.Test;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SmartExecutorImplTest {

View File

@ -1,22 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Before;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class TokenBasedTests {
@Before
public void before(){
SecurityTokenProvider.instance.set("7c66c94c-7f6e-49cd-9a34-909cd3832f3e-98187548");
ScopeProvider.instance.set("/gcube/devNext/NextNext");
}
}

View File

@ -1,106 +0,0 @@
/**
*
*/
package org.gcube.vremanagement.executor.configuration;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.acme.HelloWorldPlugin;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.TokenBasedTests;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.api.types.Scheduling;
import org.gcube.vremanagement.executor.configuration.jsonbased.FileScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.jsonbased.JSONLaunchParameter;
import org.gcube.vremanagement.executor.exception.SchedulePersistenceException;
import org.json.JSONException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class ConfiguredTasksTest extends TokenBasedTests {
private static Logger logger = LoggerFactory.getLogger(ConfiguredTasksTest.class);
public static final String TEST = "test";
public void checkOriginal(FileScheduledTaskConfiguration parser, int size){
List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
Assert.assertEquals(size, configuredTasks.size());
JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(0);
Assert.assertEquals(HelloWorldPluginDeclaration.NAME, parameter.getPluginName());
Map<String, Object> inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
Assert.assertEquals(1, inputs.get(TEST));
Assert.assertEquals(null, parameter.getScheduling());
parameter = (JSONLaunchParameter) configuredTasks.get(1);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
Assert.assertEquals(2, inputs.get(TEST));
Scheduling scheduling = parameter.getScheduling();
Assert.assertEquals(null, scheduling.getCronExpression());
Assert.assertEquals(new Integer(2000), scheduling.getDelay());
Assert.assertEquals(2, scheduling.getSchedulingTimes());
Assert.assertEquals(null, scheduling.getFirtStartTime());
Assert.assertEquals(null, scheduling.getEndTime());
Assert.assertEquals(false, scheduling.mustPreviousExecutionsCompleted());
Assert.assertEquals(true, scheduling.getGlobal());
parameter = (JSONLaunchParameter) configuredTasks.get(2);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
Assert.assertEquals(3, inputs.get(TEST));
Assert.assertEquals(null, parameter.getScheduling());
Assert.assertEquals(true, scheduling.getGlobal());
}
public static final String TASK_FILE_PATH = "/src/test/resources/";
@Test
public void testLaunchConfiguredTask() throws SchedulePersistenceException, IOException, JSONException, ParseException {
File file = new File(".", TASK_FILE_PATH);
String location = file.getAbsolutePath();
logger.trace("File location : {}", location);
FileScheduledTaskConfiguration parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 3);
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put(HelloWorldPlugin.SLEEP_TIME, 1000);
inputs.put(TEST, 4);
JSONLaunchParameter added = new JSONLaunchParameter(HelloWorldPluginDeclaration.NAME, inputs);
parser.addLaunch(added);
parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 4);
List<LaunchParameter> configuredTasks = parser.getConfiguredTasks();
JSONLaunchParameter parameter = (JSONLaunchParameter) configuredTasks.get(3);
Assert.assertEquals(parameter.getPluginName(), HelloWorldPluginDeclaration.NAME);
inputs = parameter.getInputs();
Assert.assertEquals(1000, inputs.get(HelloWorldPlugin.SLEEP_TIME));
Assert.assertEquals(4, inputs.get(TEST));
Assert.assertEquals(null, parameter.getScheduling());
parser.releaseLaunch(parameter);
parser = new FileScheduledTaskConfiguration(location);
checkOriginal(parser, 3);
}
}

View File

@ -9,23 +9,23 @@ import java.util.List;
import java.util.UUID;
import org.acme.HelloWorldPluginDeclaration;
import org.gcube.vremanagement.executor.TokenBasedTests;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfiguration;
import org.gcube.vremanagement.executor.configuration.ScheduledTaskConfigurationFactory;
import org.gcube.vremanagement.executor.persistence.couchdb.CouchDBPersistenceConnector;
import org.gcube.vremanagement.executor.ScopedTest;
import org.gcube.vremanagement.executor.persistence.orientdb.OrientDBPersistenceConnector;
import org.gcube.vremanagement.executor.plugin.PluginState;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistence;
import org.gcube.vremanagement.executor.scheduledtask.ScheduledTaskPersistenceFactory;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests {
public class SmartExecutorPersistenceConnectorTest extends ScopedTest {
private static Logger logger = LoggerFactory.getLogger(SmartExecutorPersistenceConnectorTest.class);
@ -33,7 +33,7 @@ public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests {
public void getConnectionTest() throws Exception {
SmartExecutorPersistenceConnector persistenceConnector = SmartExecutorPersistenceFactory.getPersistenceConnector();
Assert.assertNotNull(persistenceConnector);
Assert.assertEquals(CouchDBPersistenceConnector.class, persistenceConnector.getClass());
Assert.assertEquals(OrientDBPersistenceConnector.class, persistenceConnector.getClass());
SmartExecutorPersistenceFactory.closePersistenceConnector();
}
@ -55,20 +55,25 @@ public class SmartExecutorPersistenceConnectorTest extends TokenBasedTests {
endTime = Calendar.getInstance().getTimeInMillis();
}
PluginState ps = persistenceConnector.getPluginInstanceState(uuid, 1).getPluginState();
PluginStateEvolution pse = persistenceConnector.getPluginInstanceState(uuid, 1);
PluginState ps = pse.getPluginState();
Assert.assertEquals(states[i], ps);
}
PluginStateEvolution pse = persistenceConnector.getLastPluginInstanceState(uuid);
PluginState ps = pse.getPluginState();
Assert.assertEquals(states[states.length-1], ps);
SmartExecutorPersistenceFactory.closePersistenceConnector();
}
@Test
public void getAvailableScheduledTasksTest() throws Exception {
ScheduledTaskConfiguration stc = ScheduledTaskConfigurationFactory.getLaunchConfiguration();
ScheduledTaskPersistence stc = ScheduledTaskPersistenceFactory.getLaunchConfiguration();
Assert.assertNotNull(stc);
Assert.assertEquals(CouchDBPersistenceConnector.class, stc.getClass());
Assert.assertEquals(OrientDBPersistenceConnector.class, stc.getClass());
List<LaunchParameter> lc = stc.getAvailableScheduledTasks();
List<ScheduledTask> lc = stc.getOrphanScheduledTasks(null);
logger.debug("Available Scheduled Tasks : {}", lc);
}

View File

@ -7,7 +7,7 @@ import org.junit.Assert;
import org.junit.Test;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
public class PluginManagerTest {

View File

@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*
*/
public class RunnablePluginTest {

View File

@ -25,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
* @author Luca Frosini (ISTI - CNR)
*/
public class SmartExecutorSchedulerTest {

View File

@ -1,3 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">