refs #89: Save Task Evolution on NoSQL global DB
https://support.d4science.org/issues/89 Implementing Feature git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@117707 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
315edeebf1
commit
1daca05f52
28
pom.xml
28
pom.xml
|
@ -40,6 +40,15 @@
|
|||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.resources.discovery</groupId>
|
||||
<artifactId>ic-client</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.gcube.core</groupId>
|
||||
<artifactId>common-encryption</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
@ -68,11 +77,30 @@
|
|||
<version>[3.0.1, 4.0.0)</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- H2 libraries -->
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>[1.4.185, 1.5.0)</version>
|
||||
</dependency>
|
||||
<!-- END H2 libraries -->
|
||||
|
||||
<!-- CouchDB libraries -->
|
||||
<dependency>
|
||||
<groupId>org.ektorp</groupId>
|
||||
<artifactId>org.ektorp</artifactId>
|
||||
<version>1.3.0</version>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
<version>1.9.7</version>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
<!-- END CouchDB libraries -->
|
||||
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.gcube.vremanagement.executor.exception.InputsNullException;
|
|||
import org.gcube.vremanagement.executor.exception.LaunchException;
|
||||
import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException;
|
||||
import org.gcube.vremanagement.executor.exception.PluginNotFoundException;
|
||||
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
|
||||
import org.gcube.vremanagement.executor.persistence.PersistenceConnector;
|
||||
import org.gcube.vremanagement.executor.plugin.PluginState;
|
||||
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -50,8 +50,8 @@ public class SmartExecutorImpl implements SmartExecutor {
|
|||
public PluginState getState(String executionIdentifier)
|
||||
throws PluginInstanceNotFoundException, ExecutorException {
|
||||
try {
|
||||
JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector();
|
||||
return jdbcPersistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier));
|
||||
PersistenceConnector persistenceConnector = SmartExecutorInitalizator.getPersistenceConnector();
|
||||
return persistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier));
|
||||
} catch (Exception e) {
|
||||
throw new PluginInstanceNotFoundException();
|
||||
}
|
||||
|
@ -62,8 +62,8 @@ public class SmartExecutorImpl implements SmartExecutor {
|
|||
public PluginState getIterationState(String executionIdentifier, int iterationNumber)
|
||||
throws PluginInstanceNotFoundException, ExecutorException {
|
||||
try {
|
||||
JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector();
|
||||
return jdbcPersistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber);
|
||||
PersistenceConnector persistenceConnector = SmartExecutorInitalizator.getPersistenceConnector();
|
||||
return persistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber);
|
||||
} catch (Exception e) {
|
||||
throw new PluginInstanceNotFoundException();
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.gcube.smartgears.context.application.ApplicationContext;
|
|||
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent;
|
||||
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
|
||||
import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector;
|
||||
import org.gcube.vremanagement.executor.persistence.PersistenceConnector;
|
||||
import org.gcube.vremanagement.executor.plugin.PluginDeclaration;
|
||||
import org.gcube.vremanagement.executor.pluginmanager.PluginManager;
|
||||
import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler;
|
||||
|
@ -63,7 +64,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
|
|||
/**
|
||||
* Represent the connector to DB
|
||||
*/
|
||||
protected static JDBCPersistenceConnector jdbcPersistenceConnector;
|
||||
protected static PersistenceConnector persistenceConnector;
|
||||
|
||||
/**
|
||||
* The application context
|
||||
|
@ -78,8 +79,8 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
|
|||
/**
|
||||
* @return the jdbcPersistenceConnector
|
||||
*/
|
||||
public static JDBCPersistenceConnector getJdbcPersistenceConnector() {
|
||||
return jdbcPersistenceConnector;
|
||||
public static PersistenceConnector getPersistenceConnector() {
|
||||
return persistenceConnector;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -338,12 +339,14 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
|
|||
}
|
||||
|
||||
try {
|
||||
jdbcPersistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location());
|
||||
persistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location());
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to initialize PersistenceConnector. The Service will be aborted", e);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
logger.debug(
|
||||
"\n-------------------------------------------------------\n"
|
||||
+ "Smart Executor Started Successfully\n"
|
||||
|
@ -378,7 +381,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler {
|
|||
}
|
||||
|
||||
try {
|
||||
jdbcPersistenceConnector.close();
|
||||
persistenceConnector.close();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to close Persistence", e);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package org.gcube.vremanagement.executor.persistence;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.codehaus.jackson.JsonNode;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.node.ObjectNode;
|
||||
import org.ektorp.CouchDbConnector;
|
||||
import org.ektorp.CouchDbInstance;
|
||||
import org.ektorp.ViewQuery;
|
||||
import org.ektorp.ViewResult;
|
||||
import org.ektorp.http.HttpClient;
|
||||
import org.ektorp.http.StdHttpClient;
|
||||
import org.ektorp.http.StdHttpClient.Builder;
|
||||
import org.ektorp.impl.StdCouchDbConnector;
|
||||
import org.ektorp.impl.StdCouchDbInstance;
|
||||
import org.gcube.vremanagement.executor.plugin.PluginState;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
*
|
||||
*/
|
||||
public class CouchDBPersistenceConnector extends PersistenceConnector {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class);
|
||||
|
||||
protected CouchDbInstance couchDbInstance;
|
||||
protected CouchDbConnector couchDbConnector;
|
||||
|
||||
protected static final String DB_NAME = "dbName";
|
||||
|
||||
public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception {
|
||||
prepareConnection(configuration);
|
||||
}
|
||||
|
||||
protected HttpClient initHttpClient(URL url, String username, String password){
|
||||
Builder builder = new StdHttpClient.Builder().url(url);
|
||||
builder.username(username).password(password);
|
||||
HttpClient httpClient = builder.build();
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception {
|
||||
logger.debug("Preparing Connection for {}", this.getClass().getSimpleName());
|
||||
HttpClient httpClient = initHttpClient(configuration.getUri().toURL(), configuration.getUsername(), configuration.getPassword());
|
||||
couchDbInstance = new StdCouchDbInstance(httpClient);
|
||||
couchDbConnector = new StdCouchDbConnector(configuration.getProperty(DB_NAME), couchDbInstance);
|
||||
}
|
||||
|
||||
protected ViewResult query(ViewQuery query){
|
||||
ViewResult result = couchDbConnector.queryView(query);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
couchDbConnector.getConnection().shutdown();
|
||||
}
|
||||
|
||||
protected void createItem(JsonNode node, String id) throws Exception {
|
||||
if(id!=null && id.compareTo("")!=0){
|
||||
couchDbConnector.create(id, node);
|
||||
}else{
|
||||
couchDbConnector.create(node);
|
||||
}
|
||||
}
|
||||
|
||||
public final static String UUID_FIELD = "uuid";
|
||||
public final static String ITERATION_FIELD = "iteration";
|
||||
public final static String PLUGIN_NAME_FIELD = "pluginName";
|
||||
public final static String TIMESTAMP_FIELD = "timestamp";
|
||||
public final static String STATE_FIELD = "state";
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void pluginStateEvolution(UUID uuid, int iteration, long timestamp,
|
||||
String pluginName, PluginState pluginState) throws Exception {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
ObjectNode objectNode = objectMapper.createObjectNode();
|
||||
objectNode.put(UUID_FIELD, uuid.toString());
|
||||
objectNode.put(ITERATION_FIELD, iteration);
|
||||
objectNode.put(TIMESTAMP_FIELD, timestamp);
|
||||
objectNode.put(PLUGIN_NAME_FIELD, pluginName);
|
||||
objectNode.put(STATE_FIELD, pluginState.toString());
|
||||
createItem(objectNode, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public PluginState getPluginInstanceState(UUID uuid, int iterationNumber)
|
||||
throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public PluginState getLastPluginInstanceState(UUID uuid) throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -158,7 +158,7 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addEvolution(UUID uuid, int iteration, long timestamp, String pluginName, PluginState pluginState)
|
||||
public void pluginStateEvolution(UUID uuid, int iteration, long timestamp, String pluginName, PluginState pluginState)
|
||||
throws Exception {
|
||||
|
||||
connection.setAutoCommit(false); // transaction block start
|
||||
|
@ -193,11 +193,4 @@ public class JDBCPersistenceConnector extends PersistenceConnector {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
/**{@inheritDoc} */
|
||||
@Override
|
||||
public void pluginStateEvolution(UUID uuid, int iteration,
|
||||
long timestamp, String pluginName, PluginState pluginState) throws Exception {
|
||||
addEvolution(uuid, iteration, timestamp, pluginName, pluginState);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package org.gcube.vremanagement.executor.persistence;
|
||||
|
||||
import java.net.URI;
|
||||
import java.security.Key;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.gcube.common.encryption.StringEncrypter;
|
||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
||||
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
|
||||
import org.gcube.common.resources.gcore.utils.Group;
|
||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
||||
import org.gcube.resources.discovery.icclient.ICFactory;
|
||||
|
||||
/**
|
||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||
*/
|
||||
public class SmartExecutorPersistenceConfiguration {
|
||||
|
||||
public final String SERVICE_ENDPOINT_CATEGORY = "VREManagement";
|
||||
public final String SERVICE_ENDPOINT_NAME = "SmartExecutor";
|
||||
|
||||
protected static final String PERSISTENCE_CLASS_NAME = "persistenceClassName";
|
||||
|
||||
protected URI uri;
|
||||
protected String username;
|
||||
protected String password;
|
||||
|
||||
protected Map<String, Property> propertyMap;
|
||||
|
||||
protected void init(){
|
||||
this.propertyMap = new HashMap<String, Property>();
|
||||
}
|
||||
|
||||
public SmartExecutorPersistenceConfiguration(){
|
||||
init();
|
||||
}
|
||||
|
||||
public SmartExecutorPersistenceConfiguration(URI uri, String username, String password){
|
||||
init();
|
||||
this.uri = uri;
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public SmartExecutorPersistenceConfiguration(String persistenceClassName) throws Exception {
|
||||
init();
|
||||
ServiceEndpoint serviceEndpoint = getServiceEndpoint(SERVICE_ENDPOINT_CATEGORY, SERVICE_ENDPOINT_NAME, persistenceClassName);
|
||||
setValues(serviceEndpoint,persistenceClassName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the uri
|
||||
*/
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param uri the uri to set
|
||||
*/
|
||||
public void setUri(URI uri) {
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the username
|
||||
*/
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param username the username to set
|
||||
*/
|
||||
public void setUsername(String username) {
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the password
|
||||
*/
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param password the password to set
|
||||
*/
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the propertyMap
|
||||
* @throws Exception
|
||||
*/
|
||||
public String getProperty(String propertyKey) throws Exception {
|
||||
Property propertyValue = propertyMap.get(propertyKey);
|
||||
String value = propertyValue.value();
|
||||
if(propertyValue.isEncrypted()){
|
||||
value = decrypt(value);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
protected ServiceEndpoint getServiceEndpoint(String serviceEndpointCategory, String serviceEndpointName, String persistenceClassName){
|
||||
SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class);
|
||||
query.addCondition(String.format("$resource/Profile/Category/text() eq '%s'", serviceEndpointCategory));
|
||||
query.addCondition(String.format("$resource/Profile/Name/text() eq '%s'", serviceEndpointName));
|
||||
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", PERSISTENCE_CLASS_NAME));
|
||||
query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", persistenceClassName));
|
||||
query.setResult("$resource");
|
||||
|
||||
DiscoveryClient<ServiceEndpoint> client = ICFactory.clientFor(ServiceEndpoint.class);
|
||||
List<ServiceEndpoint> serviceEndpoints = client.submit(query);
|
||||
return serviceEndpoints.get(0);
|
||||
}
|
||||
|
||||
private static String decrypt(String encrypted, Key... key) throws Exception {
|
||||
return StringEncrypter.getEncrypter().decrypt(encrypted);
|
||||
}
|
||||
|
||||
protected void setValues(ServiceEndpoint serviceEndpoint, String persistenceClassName) throws Exception{
|
||||
Group<AccessPoint> accessPoints = serviceEndpoint.profile().accessPoints();
|
||||
for(AccessPoint accessPoint : accessPoints){
|
||||
Collection<Property> properties = accessPoint.propertyMap().values();
|
||||
|
||||
if(properties.contains(new ServiceEndpoint.Property().nameAndValue(PERSISTENCE_CLASS_NAME, persistenceClassName))){
|
||||
this.uri = new URI(accessPoint.address());
|
||||
this.username = accessPoint.username();
|
||||
|
||||
String encryptedPassword = accessPoint.password();
|
||||
String password = decrypt(encryptedPassword);
|
||||
|
||||
this.password = password;
|
||||
this.propertyMap = accessPoint.propertyMap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -51,7 +51,7 @@ public class ExecutorImplTest {
|
|||
@Test
|
||||
public void myJavaTest() throws Exception{
|
||||
|
||||
SmartExecutorInitalizator.jdbcPersistenceConnector = new JDBCPersistenceConnector(".");
|
||||
SmartExecutorInitalizator.persistenceConnector = new JDBCPersistenceConnector(".");
|
||||
|
||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
||||
long sleepTime = 10000;
|
||||
|
|
|
@ -36,7 +36,7 @@ public class JDBCPersistenceConnectorTest {
|
|||
PluginState[] states = PluginState.values();
|
||||
for(int i=0; i<states.length; i++){
|
||||
long timestamp = new Date().getTime();
|
||||
jdbcPersistenceConnector.addEvolution(uuid, 1, timestamp, HelloWorldPluginDeclaration.NAME, states[i]);
|
||||
jdbcPersistenceConnector.pluginStateEvolution(uuid, 1, timestamp, HelloWorldPluginDeclaration.NAME, states[i]);
|
||||
PluginState ps = jdbcPersistenceConnector.getPluginInstanceState(uuid, 1);
|
||||
Assert.assertEquals(states[i], ps);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue