From 1daca05f525b15ef8db42968faa37db9af439a41 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 25 Aug 2015 16:03:55 +0000 Subject: [PATCH] refs #89: Save Task Evolution on NoSQL global DB https://support.d4science.org/issues/89 Implementing Feature git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@117707 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 28 ++++ .../executor/SmartExecutorImpl.java | 10 +- .../executor/SmartExecutorInitalizator.java | 13 +- .../CouchDBPersistenceConnector.java | 116 ++++++++++++++ .../persistence/JDBCPersistenceConnector.java | 9 +- ...SmartExecutorPersistenceConfiguration.java | 149 ++++++++++++++++++ .../executor/ExecutorImplTest.java | 2 +- .../JDBCPersistenceConnectorTest.java | 2 +- 8 files changed, 309 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/gcube/vremanagement/executor/persistence/CouchDBPersistenceConnector.java create mode 100644 src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java diff --git a/pom.xml b/pom.xml index 29e36ed..f488e64 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,15 @@ + + org.gcube.resources.discovery + ic-client + provided + + + org.gcube.core + common-encryption + org.slf4j slf4j-api @@ -68,11 +77,30 @@ [3.0.1, 4.0.0) provided + + com.h2database h2 [1.4.185, 1.5.0) + + + + + org.ektorp + org.ektorp + 1.3.0 + jar + + + org.codehaus.jackson + jackson-core-asl + 1.9.7 + jar + + + diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java index 3fc85ad..dc7913a 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorImpl.java @@ -11,7 +11,7 @@ import org.gcube.vremanagement.executor.exception.InputsNullException; import org.gcube.vremanagement.executor.exception.LaunchException; import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException; import org.gcube.vremanagement.executor.exception.PluginNotFoundException; -import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector; +import org.gcube.vremanagement.executor.persistence.PersistenceConnector; import org.gcube.vremanagement.executor.plugin.PluginState; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; import org.slf4j.Logger; @@ -50,8 +50,8 @@ public class SmartExecutorImpl implements SmartExecutor { public PluginState getState(String executionIdentifier) throws PluginInstanceNotFoundException, ExecutorException { try { - JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector(); - return jdbcPersistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier)); + PersistenceConnector persistenceConnector = SmartExecutorInitalizator.getPersistenceConnector(); + return persistenceConnector.getLastPluginInstanceState(UUID.fromString(executionIdentifier)); } catch (Exception e) { throw new PluginInstanceNotFoundException(); } @@ -62,8 +62,8 @@ public class SmartExecutorImpl implements SmartExecutor { public PluginState getIterationState(String executionIdentifier, int iterationNumber) throws PluginInstanceNotFoundException, ExecutorException { try { - JDBCPersistenceConnector jdbcPersistenceConnector = SmartExecutorInitalizator.getJdbcPersistenceConnector(); - return jdbcPersistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber); + PersistenceConnector persistenceConnector = SmartExecutorInitalizator.getPersistenceConnector(); + return persistenceConnector.getPluginInstanceState(UUID.fromString(executionIdentifier), iterationNumber); } catch (Exception e) { throw new PluginInstanceNotFoundException(); } diff --git a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitalizator.java b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitalizator.java index dcb5a58..6d89c5b 100644 --- a/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitalizator.java +++ b/src/main/java/org/gcube/vremanagement/executor/SmartExecutorInitalizator.java @@ -35,6 +35,7 @@ import org.gcube.smartgears.context.application.ApplicationContext; import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent; import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler; import org.gcube.vremanagement.executor.persistence.JDBCPersistenceConnector; +import org.gcube.vremanagement.executor.persistence.PersistenceConnector; import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.pluginmanager.PluginManager; import org.gcube.vremanagement.executor.scheduler.SmartExecutorScheduler; @@ -63,7 +64,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler { /** * Represent the connector to DB */ - protected static JDBCPersistenceConnector jdbcPersistenceConnector; + protected static PersistenceConnector persistenceConnector; /** * The application context @@ -78,8 +79,8 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler { /** * @return the jdbcPersistenceConnector */ - public static JDBCPersistenceConnector getJdbcPersistenceConnector() { - return jdbcPersistenceConnector; + public static PersistenceConnector getPersistenceConnector() { + return persistenceConnector; } /** @@ -338,12 +339,14 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler { } try { - jdbcPersistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location()); + persistenceConnector = new JDBCPersistenceConnector(ctx.persistence().location()); } catch (Exception e) { logger.error("Unable to initialize PersistenceConnector. The Service will be aborted", e); return; } + + logger.debug( "\n-------------------------------------------------------\n" + "Smart Executor Started Successfully\n" @@ -378,7 +381,7 @@ public class SmartExecutorInitalizator extends ApplicationLifecycleHandler { } try { - jdbcPersistenceConnector.close(); + persistenceConnector.close(); } catch (Exception e) { logger.error("Unable to close Persistence", e); } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/CouchDBPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/CouchDBPersistenceConnector.java new file mode 100644 index 0000000..331e8bf --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/CouchDBPersistenceConnector.java @@ -0,0 +1,116 @@ +/** + * + */ +package org.gcube.vremanagement.executor.persistence; + +import java.net.URL; +import java.util.UUID; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; +import org.ektorp.CouchDbConnector; +import org.ektorp.CouchDbInstance; +import org.ektorp.ViewQuery; +import org.ektorp.ViewResult; +import org.ektorp.http.HttpClient; +import org.ektorp.http.StdHttpClient; +import org.ektorp.http.StdHttpClient.Builder; +import org.ektorp.impl.StdCouchDbConnector; +import org.ektorp.impl.StdCouchDbInstance; +import org.gcube.vremanagement.executor.plugin.PluginState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public class CouchDBPersistenceConnector extends PersistenceConnector { + + private static final Logger logger = LoggerFactory.getLogger(CouchDBPersistenceConnector.class); + + protected CouchDbInstance couchDbInstance; + protected CouchDbConnector couchDbConnector; + + protected static final String DB_NAME = "dbName"; + + public CouchDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception { + prepareConnection(configuration); + } + + protected HttpClient initHttpClient(URL url, String username, String password){ + Builder builder = new StdHttpClient.Builder().url(url); + builder.username(username).password(password); + HttpClient httpClient = builder.build(); + return httpClient; + } + + protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception { + logger.debug("Preparing Connection for {}", this.getClass().getSimpleName()); + HttpClient httpClient = initHttpClient(configuration.getUri().toURL(), configuration.getUsername(), configuration.getPassword()); + couchDbInstance = new StdCouchDbInstance(httpClient); + couchDbConnector = new StdCouchDbConnector(configuration.getProperty(DB_NAME), couchDbInstance); + } + + protected ViewResult query(ViewQuery query){ + ViewResult result = couchDbConnector.queryView(query); + return result; + } + + + @Override + public void close() throws Exception { + couchDbConnector.getConnection().shutdown(); + } + + protected void createItem(JsonNode node, String id) throws Exception { + if(id!=null && id.compareTo("")!=0){ + couchDbConnector.create(id, node); + }else{ + couchDbConnector.create(node); + } + } + + public final static String UUID_FIELD = "uuid"; + public final static String ITERATION_FIELD = "iteration"; + public final static String PLUGIN_NAME_FIELD = "pluginName"; + public final static String TIMESTAMP_FIELD = "timestamp"; + public final static String STATE_FIELD = "state"; + + /** + * {@inheritDoc} + */ + @Override + public void pluginStateEvolution(UUID uuid, int iteration, long timestamp, + String pluginName, PluginState pluginState) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put(UUID_FIELD, uuid.toString()); + objectNode.put(ITERATION_FIELD, iteration); + objectNode.put(TIMESTAMP_FIELD, timestamp); + objectNode.put(PLUGIN_NAME_FIELD, pluginName); + objectNode.put(STATE_FIELD, pluginState.toString()); + createItem(objectNode, null); + } + + /** + * {@inheritDoc} + */ + @Override + public PluginState getPluginInstanceState(UUID uuid, int iterationNumber) + throws Exception { + // TODO Auto-generated method stub + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public PluginState getLastPluginInstanceState(UUID uuid) throws Exception { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnector.java b/src/main/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnector.java index d44c328..251d112 100644 --- a/src/main/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnector.java +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnector.java @@ -158,7 +158,7 @@ public class JDBCPersistenceConnector extends PersistenceConnector { } @Override - public void addEvolution(UUID uuid, int iteration, long timestamp, String pluginName, PluginState pluginState) + public void pluginStateEvolution(UUID uuid, int iteration, long timestamp, String pluginName, PluginState pluginState) throws Exception { connection.setAutoCommit(false); // transaction block start @@ -193,11 +193,4 @@ public class JDBCPersistenceConnector extends PersistenceConnector { connection.close(); } - /**{@inheritDoc} */ - @Override - public void pluginStateEvolution(UUID uuid, int iteration, - long timestamp, String pluginName, PluginState pluginState) throws Exception { - addEvolution(uuid, iteration, timestamp, pluginName, pluginState); - } - } diff --git a/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java new file mode 100644 index 0000000..0c4326e --- /dev/null +++ b/src/main/java/org/gcube/vremanagement/executor/persistence/SmartExecutorPersistenceConfiguration.java @@ -0,0 +1,149 @@ +/** + * + */ +package org.gcube.vremanagement.executor.persistence; + +import java.net.URI; +import java.security.Key; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.gcube.common.encryption.StringEncrypter; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.Property; +import org.gcube.common.resources.gcore.utils.Group; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.resources.discovery.icclient.ICFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public class SmartExecutorPersistenceConfiguration { + + public final String SERVICE_ENDPOINT_CATEGORY = "VREManagement"; + public final String SERVICE_ENDPOINT_NAME = "SmartExecutor"; + + protected static final String PERSISTENCE_CLASS_NAME = "persistenceClassName"; + + protected URI uri; + protected String username; + protected String password; + + protected Map propertyMap; + + protected void init(){ + this.propertyMap = new HashMap(); + } + + public SmartExecutorPersistenceConfiguration(){ + init(); + } + + public SmartExecutorPersistenceConfiguration(URI uri, String username, String password){ + init(); + this.uri = uri; + this.username = username; + this.password = password; + } + + public SmartExecutorPersistenceConfiguration(String persistenceClassName) throws Exception { + init(); + ServiceEndpoint serviceEndpoint = getServiceEndpoint(SERVICE_ENDPOINT_CATEGORY, SERVICE_ENDPOINT_NAME, persistenceClassName); + setValues(serviceEndpoint,persistenceClassName); + } + + /** + * @return the uri + */ + public URI getUri() { + return uri; + } + + /** + * @param uri the uri to set + */ + public void setUri(URI uri) { + this.uri = uri; + } + + /** + * @return the username + */ + public String getUsername() { + return username; + } + + /** + * @param username the username to set + */ + public void setUsername(String username) { + this.username = username; + } + + /** + * @return the password + */ + public String getPassword() { + return password; + } + + /** + * @param password the password to set + */ + public void setPassword(String password) { + this.password = password; + } + + /** + * @return the propertyMap + * @throws Exception + */ + public String getProperty(String propertyKey) throws Exception { + Property propertyValue = propertyMap.get(propertyKey); + String value = propertyValue.value(); + if(propertyValue.isEncrypted()){ + value = decrypt(value); + } + return value; + } + + protected ServiceEndpoint getServiceEndpoint(String serviceEndpointCategory, String serviceEndpointName, String persistenceClassName){ + SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); + query.addCondition(String.format("$resource/Profile/Category/text() eq '%s'", serviceEndpointCategory)); + query.addCondition(String.format("$resource/Profile/Name/text() eq '%s'", serviceEndpointName)); + query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Name/text() eq '%s'", PERSISTENCE_CLASS_NAME)); + query.addCondition(String.format("$resource/Profile/AccessPoint/Properties/Property/Value/text() eq '%s'", persistenceClassName)); + query.setResult("$resource"); + + DiscoveryClient client = ICFactory.clientFor(ServiceEndpoint.class); + List serviceEndpoints = client.submit(query); + return serviceEndpoints.get(0); + } + + private static String decrypt(String encrypted, Key... key) throws Exception { + return StringEncrypter.getEncrypter().decrypt(encrypted); + } + + protected void setValues(ServiceEndpoint serviceEndpoint, String persistenceClassName) throws Exception{ + Group accessPoints = serviceEndpoint.profile().accessPoints(); + for(AccessPoint accessPoint : accessPoints){ + Collection properties = accessPoint.propertyMap().values(); + + if(properties.contains(new ServiceEndpoint.Property().nameAndValue(PERSISTENCE_CLASS_NAME, persistenceClassName))){ + this.uri = new URI(accessPoint.address()); + this.username = accessPoint.username(); + + String encryptedPassword = accessPoint.password(); + String password = decrypt(encryptedPassword); + + this.password = password; + this.propertyMap = accessPoint.propertyMap(); + } + } + } + +} diff --git a/src/test/java/org/gcube/vremanagement/executor/ExecutorImplTest.java b/src/test/java/org/gcube/vremanagement/executor/ExecutorImplTest.java index 93f5745..f7c0fbf 100644 --- a/src/test/java/org/gcube/vremanagement/executor/ExecutorImplTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/ExecutorImplTest.java @@ -51,7 +51,7 @@ public class ExecutorImplTest { @Test public void myJavaTest() throws Exception{ - SmartExecutorInitalizator.jdbcPersistenceConnector = new JDBCPersistenceConnector("."); + SmartExecutorInitalizator.persistenceConnector = new JDBCPersistenceConnector("."); Map inputs = new HashMap(); long sleepTime = 10000; diff --git a/src/test/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnectorTest.java b/src/test/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnectorTest.java index cd0c08a..e6de6b1 100644 --- a/src/test/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnectorTest.java +++ b/src/test/java/org/gcube/vremanagement/executor/persistence/JDBCPersistenceConnectorTest.java @@ -36,7 +36,7 @@ public class JDBCPersistenceConnectorTest { PluginState[] states = PluginState.values(); for(int i=0; i