Discover bucket in relation of dinamically discovered buckets. fixes #9612

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib-couchbase@152685 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-09-05 15:38:18 +00:00
parent 0507bd691c
commit 0dbaeb4c63
6 changed files with 278 additions and 151 deletions

View File

@ -1,7 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<ReleaseNotes>
<Changeset component="org.gcube.data-publishing.document-store-lib-couchbase.1-4-0" date="${buildDate}">
<Changeset component="org.gcube.data-publishing.document-store-lib-couchbase.1-5-0" date="${buildDate}">
<Change>Added isConnectionActive() in PersistenceBackend</Change>
<Change>Discover bucket in relation of dinamically discovered buckets #9612</Change>
<Change>Used DSMapper to get Record from JsonNode</Change>
</Changeset>
<Changeset component="org.gcube.data-publishing.document-store-lib-couchbase.1-4-0" date="2017-06-07">
<Change>Fixed bug on shutdown() method to properly close the connection</Change>
</Changeset>
<Changeset component="org.gcube.data-publishing.document-store-lib-couchbase.1-3-1" date="2017-05-02">

32
pom.xml
View File

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib-couchbase</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<name>Document Store CouchBase Connector</name>
<description>Document Store Connector for CouchBase</description>
@ -45,18 +45,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>core-io</artifactId>
<version>[1.2.3,2.0.0)</version>
<scope>compile</scope>
<version>2.2.7</version>
</dependency>
<!-- Test Dependency -->
@ -72,12 +65,27 @@
<version>1.0.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>common-authorization</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-lib</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>

View File

@ -3,20 +3,21 @@
*/
package org.gcube.documentstore.persistence;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.documentstore.persistence.connections.Connection;
import org.gcube.documentstore.persistence.connections.Connections;
import org.gcube.documentstore.persistence.connections.Nodes;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
@ -24,6 +25,7 @@ import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.fasterxml.jackson.databind.JsonNode;
/**
* @author Luca Frosini (ISTI - CNR)
@ -31,217 +33,181 @@ import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
*/
public class PersistenceCouchBase extends PersistenceBackend {
private static final Logger logger = LoggerFactory
.getLogger(PersistenceCouchBase.class);
private static final Logger logger = LoggerFactory.getLogger(PersistenceCouchBase.class);
public static final String URL_PROPERTY_KEY = "URL";
//public static final String USERNAME_PROPERTY_KEY = "username";
// public static final String USERNAME_PROPERTY_KEY = "username";
public static final String PASSWORD_PROPERTY_KEY = "password";
public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName";
/*Different bucket for aggregated*/
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
public static final String BUCKET_STORAGE_TYPE="StorageUsageRecord";
public static final String BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord";
public static final String BUCKET_STORAGE_STATUS_TYPE="StorageStatusRecord";
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_SERVICE_TYPE="ServiceUsageRecord";
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
public static final String BUCKET_PORTLET_TYPE="PortletUsageRecord";
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_JOB_TYPE="JobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
public static final String BUCKET_TASK_TYPE="TaskUsageRecord";
public static final Integer TIMEOUT_BUCKET=180;
public static final Integer ALIVE_INTERVAL=3600;
public static final long TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(180);
public static final long ALIVE_INTERVAL = TimeUnit.HOURS.toMillis(1);
private final static String AGGREGATED_PREFIX = "Aggregated";
protected Map<String, String> bucketNames;
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
.connectTimeout(TIMEOUT_BUCKET * 1000) // 180 Seconds in milliseconds
.keepAliveInterval(ALIVE_INTERVAL * 1000) // 3600 Seconds in milliseconds
.build();
protected static final CouchbaseEnvironment ENV;
private Nodes nodes;
private String password ;
/**
* {@inheritDoc}
*/
private String password;
static {
ENV = DefaultCouchbaseEnvironment.builder()
.connectTimeout(TIMEOUT_BUCKET * 1000)
.keepAliveInterval(ALIVE_INTERVAL * 1000)
.build();
}
@Override
protected void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception {
String url = configuration.getProperty(URL_PROPERTY_KEY);
password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
nodes= new Nodes(url);
logger.debug("PersistenceCouchBase prepareConnection url:{} and now is connectionsMap:{}",url,Connections.connectionsMap );
nodes = new Nodes(url);
logger.debug("PersistenceCouchBase prepareConnection url:{} and now is connectionsMap:{}", url,
Connections.connectionsMap);
bucketNames = new HashMap<>();
bucketNames.put(BUCKET_STORAGE_TYPE, configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_STORAGE_STATUS_TYPE, configuration.getProperty(BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_SERVICE_TYPE, configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_JOB_TYPE, configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_PORTLET_TYPE, configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_TASK_TYPE, configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY));
Map<String, Class<? extends Record>> recordClasses = RecordUtility.getRecordClassesFound();
for (Class<? extends Record> recordClass : recordClasses.values()) {
Record recordInstance = recordClass.newInstance();
if (recordInstance instanceof UsageRecord && !(recordInstance instanceof AggregatedUsageRecord<?,?>)) {
try {
@SuppressWarnings("unchecked")
Class<? extends UsageRecord> usageRecordClazz = (Class<? extends UsageRecord>) recordClass;
logger.debug("Trying to get the Bucket for {}", usageRecordClazz);
String recordType = recordInstance.getRecordType();
String bucketName = configuration.getProperty(AGGREGATED_PREFIX + recordType);
logger.debug("Bucket for {} is {}.", usageRecordClazz, bucketName);
bucketNames.put(recordType, bucketName);
}catch (Exception e) {
logger.info("Unable to open Bucket for type {}", recordClass, e);
}
}
}
}
@Override
protected void openConnection() throws Exception {
synchronized (Connections.connectionsMap) {
if (!Connections.connectionsMap.containsKey(nodes)){
//open cluster and add into map
//logger.trace("PersistenceCouchBase openConnection bucketNames :{}",bucketNames);
if (!Connections.connectionsMap.containsKey(nodes)) {
// open cluster and add into map
// logger.trace("PersistenceCouchBase openConnection bucketNames
// :{}",bucketNames);
Cluster cluster = null;
try {
cluster = CouchbaseCluster.create(ENV, nodes.getNodes());
Connections.connectionsMap.put(nodes, new Connection(cluster));
logger.trace("PersistenceCouchBase openConnection insert nodes:{}",Connections.connectionsMap );
} catch(Exception e) {
cluster.disconnect();
logger.error("Bucket connection error", e);
logger.trace("PersistenceCouchBase openConnection insert nodes:{}", Connections.connectionsMap);
} catch (Exception e) {
if(cluster!=null){
cluster.disconnect();
}
logger.error("Bucket connection error", e);
throw e;
}
}
else{
//logger.debug("PersistenceCouchBase openConnection contains node use an existing cluster env");
}
} else {
// logger.debug("PersistenceCouchBase openConnection contains
// node use an existing cluster env");
}
}
}
protected Bucket getBucketConnection(String recordType){
//Bucket bucket = connectionMap.get(recordType);
protected Bucket getBucketConnection(String recordType) {
// Bucket bucket = connectionMap.get(recordType);
Bucket bucket = null;
synchronized (Connections.connectionsMap) {
/*
String bucketNamesValue=bucketNames.get(recordType);
logger.trace("bucketNamesValue:{}",bucketNamesValue);
Map<Nodes, Connection>conMap=Connections.connectionsMap;
logger.trace("conMap:{}",conMap.toString());
logger.debug("nodes:{}",nodes.toString());
Map<String,Bucket> mapStringBucket=conMap.get(nodes).getBucketsMap();
logger.trace("mapStringBucket:{}",mapStringBucket.toString());
bucket =mapStringBucket.get(bucketNamesValue);
*/
//Old code
bucket =Connections.connectionsMap.get(nodes).getBucketsMap().get(bucketNames.get(recordType));
bucket = Connections.connectionsMap.get(nodes).getBucketsMap().get(bucketNames.get(recordType));
try {
//logger.debug("PersistenceCouchBase getBucketConnection recordType:{}, bucket name:{}",recordType,bucketNames.get(recordType));
if(bucket == null){
//bucket = cluster.openBucket(recordType, password);
bucket = Connections.connectionsMap.get(nodes).getCluster().openBucket(bucketNames.get(recordType), password);
logger.trace("PersistenceCouchBase getBucketConnection bucket close, open:{}",bucket.toString() );
//connectionMap.put(recordType, bucket);
// logger.debug("PersistenceCouchBase getBucketConnection
// recordType:{}, bucket
// name:{}",recordType,bucketNames.get(recordType));
if (bucket == null) {
// bucket = cluster.openBucket(recordType, password);
bucket = Connections.connectionsMap.get(nodes).getCluster().openBucket(bucketNames.get(recordType),
password);
logger.trace("PersistenceCouchBase getBucketConnection bucket close, open:{}", bucket.toString());
// connectionMap.put(recordType, bucket);
Connections.connectionsMap.get(nodes).getBucketsMap().put(bucketNames.get(recordType), bucket);
logger.trace("PersistenceCouchBase getBucketConnection connectionMap:{}",Connections.connectionsMap.get(nodes).getBucketsMap());
logger.trace("PersistenceCouchBase getBucketConnection connectionMap:{}",
Connections.connectionsMap.get(nodes).getBucketsMap());
}
} catch(Exception e) {
logger.error("getBucketConnection connection error", e);
} catch (Exception e) {
logger.error("getBucketConnection connection error", e);
throw e;
}
}
}
return bucket;
}
protected JsonDocument createItem(JsonObject jsonObject, String id,String recordType) throws Exception {
protected JsonDocument createItem(JsonObject jsonObject, String id, String recordType) throws Exception {
JsonDocument doc = JsonDocument.create(id, jsonObject);
return getBucketConnection(recordType).upsert(doc);
}
public static JsonNode usageRecordToJsonNode(Record record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.valueToTree(record.getResourceProperties());
JsonNode node = DSMapper.getObjectMapper().valueToTree(DSMapper.marshal(record));
return node;
}
public static Record jsonNodeToUsageRecord(JsonNode jsonNode) throws Exception {
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
Map<String, ? extends Serializable> result = mapper.convertValue(jsonNode, Map.class);
Record record = RecordUtility.getRecord(result);
return record;
return DSMapper.getObjectMapper().convertValue(jsonNode, Record.class);
}
/**
* {@inheritDoc}
*/
@Override
protected void reallyAccount(Record record) throws Exception {
JsonNode node = PersistenceCouchBase.usageRecordToJsonNode(record);
JsonObject jsonObject = JsonObject.fromJson(node.toString());
//get a bucket association
String recordType=record.getRecordType();
createItem(jsonObject, record.getId(),recordType);
protected void reallyAccount(Record record) throws Exception {
JsonObject jsonObject = JsonObject.fromJson(DSMapper.marshal(record));
// get a bucket association
String recordType = record.getRecordType();
createItem(jsonObject, record.getId(), recordType);
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws Exception {
logger.trace("PersistenceCouchBase close" );
public boolean isConnectionActive() throws Exception {
return !Connections.connectionsMap.get(nodes).getBucketsMap().values().iterator().next().isClosed();
}
@Override
public boolean isConnectionActive() throws Exception{
return ! Connections.connectionsMap.get(nodes).getBucketsMap().get(BUCKET_SERVICE_NAME_PROPERTY_KEY).isClosed();
}
@Override
protected void closeAndClean() throws Exception {
protected void clean() throws Exception {
synchronized (Connections.connectionsMap) {
try {
if (!Connections.connectionsMap.isEmpty()){
for (Map.Entry<String, Bucket> entry : Connections.connectionsMap.get(nodes).getBucketsMap().entrySet())
{
if (!Connections.connectionsMap.isEmpty()) {
for (Map.Entry<String, Bucket> entry : Connections.connectionsMap.get(nodes).getBucketsMap()
.entrySet()) {
Boolean closed = entry.getValue().close();
if (!closed){
logger.warn("bucket not close :{}",entry.getKey());
if (!closed) {
logger.warn("bucket not close :{}", entry.getKey());
}
}
Boolean clusterClosed= Connections.connectionsMap.get(nodes).getCluster().disconnect();
if (!clusterClosed){
Boolean clusterClosed = Connections.connectionsMap.get(nodes).getCluster().disconnect();
if (!clusterClosed) {
logger.warn("cluster not disconnect");
}
Connections.connectionsMap.remove(nodes);
logger.trace("PersistenceCouchBase disconnect" );
}
else{
Connections.connectionsMap.remove(nodes);
logger.trace("PersistenceCouchBase disconnect");
} else {
logger.warn("cluster not open");
}
} catch(Exception e) {
logger.error("closeAndClean error with close and clean", e);
} catch (Exception e) {
logger.error("closeAndClean error with close and clean", e);
throw e;
}
}
}
}
@Override
protected void closeConnection() throws Exception {
logger.trace("PersistenceCouchBase closeConnection" );
logger.trace("PersistenceCouchBase closeConnection");
};
}

View File

@ -0,0 +1,38 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.util.concurrent.TimeUnit;
import org.gcube.common.authorization.client.exceptions.ObjectNotFound;
import org.gcube.testutility.ScopedTest;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class PersistenceCouchBaseTest extends ScopedTest {
private static final Logger logger = LoggerFactory.getLogger(PersistenceCouchBaseTest.class);
public static final long timeout = 5000;
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void persistenceIsCouchBase() throws ObjectNotFound, Exception {
logger.debug("Going to check if the Persistence is CouchBase");
PersistenceBackendFactory.setFallbackLocation(null);
FallbackPersistenceBackend fallbackPersistenceBackend = PersistenceBackendFactory.createFallback(ScopedTest.getCurrentContext());
PersistenceBackend persistenceBackend = PersistenceBackendFactory.rediscoverPersistenceBackend(fallbackPersistenceBackend, ScopedTest.getCurrentContext());
Assert.assertTrue(persistenceBackend instanceof PersistenceCouchBase);
}
}

View File

@ -0,0 +1,103 @@
/**
*
*/
package org.gcube.testutility;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.gcube.common.authorization.client.Constants;
import org.gcube.common.authorization.client.exceptions.ObjectNotFound;
import org.gcube.common.authorization.library.AuthorizationEntry;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class ScopedTest {
private static final Logger logger = LoggerFactory.getLogger(ScopedTest.class);
protected static final String PROPERTIES_FILENAME = "token.properties";
private static final String GCUBE_DEVNEXT_VARNAME = "GCUBE_DEVNEXT";
public static final String GCUBE_DEVNEXT;
private static final String GCUBE_DEVNEXT_NEXTNEXT_VARNAME = "GCUBE_DEVNEXT_NEXTNEXT";
public static final String GCUBE_DEVNEXT_NEXTNEXT;
public static final String GCUBE_DEVSEC_VARNAME = "GCUBE_DEVSEC";
public static final String GCUBE_DEVSEC;
public static final String GCUBE_DEVSEC_DEVVRE_VARNAME = "GCUBE_DEVSEC_DEVVRE";
public static final String GCUBE_DEVSEC_DEVVRE;
public static final String GCUBE_VARNAME = "GCUBE";
public static final String GCUBE;
public static final String DEFAULT_TEST_SCOPE;
public static final String ALTERNATIVE_TEST_SCOPE;
static {
Properties properties = new Properties();
InputStream input = ScopedTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME);
try {
// load the properties file
properties.load(input);
} catch (IOException e) {
throw new RuntimeException(e);
}
GCUBE_DEVNEXT = properties.getProperty(GCUBE_DEVNEXT_VARNAME);
GCUBE_DEVNEXT_NEXTNEXT = properties.getProperty(GCUBE_DEVNEXT_NEXTNEXT_VARNAME);
GCUBE_DEVSEC = properties.getProperty(GCUBE_DEVSEC_VARNAME);
GCUBE_DEVSEC_DEVVRE = properties.getProperty(GCUBE_DEVSEC_DEVVRE_VARNAME);
GCUBE = properties.getProperty(GCUBE_VARNAME);
DEFAULT_TEST_SCOPE = GCUBE_DEVNEXT;
ALTERNATIVE_TEST_SCOPE = GCUBE_DEVSEC_DEVVRE;
}
public static String getCurrentContext() throws ObjectNotFound, Exception{
String token = SecurityTokenProvider.instance.get();
return getCurrentContext(token);
}
public static String getCurrentContext(String token) throws ObjectNotFound, Exception{
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
String context = authorizationEntry.getContext();
logger.info("Context of token {} is {}", token, context);
return context;
}
public static void setContext(String token) throws ObjectNotFound, Exception{
SecurityTokenProvider.instance.set(token);
ScopeProvider.instance.set(getCurrentContext(token));
}
@BeforeClass
public static void beforeClass() throws Exception{
setContext(DEFAULT_TEST_SCOPE);
}
@AfterClass
public static void afterClass() throws Exception{
SecurityTokenProvider.instance.reset();
ScopeProvider.instance.reset();
}
}

View File

@ -0,0 +1,7 @@
GCUBE=0b47600a-1c53-4a47-b07b-03851cc28c8a-98187548
GCUBE_DEVNEXT=577013f6-2d07-4071-af7b-c3c6a064fbda-98187548
GCUBE_DEVNEXT_NEXTNEXT=7c66c94c-7f6e-49cd-9a34-909cd3832f3e-98187548
GCUBE_DEVSEC=a2c82e3a-82ca-4fd9-b37d-f1eb49a22bd5-98187548
GCUBE_DEVSEC_DEVVRE=4646ff97-40d1-443c-8cf9-5892957d3d64-98187548