Add a share connection into multiple threads

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib-couchbase@141603 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2017-01-17 11:11:18 +00:00
parent 4c557d140c
commit 6d41713016
5 changed files with 189 additions and 59 deletions

View File

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib-couchbase</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<name>Document Store CouchBase Connector</name>
<description>Document Store Connector for CouchBase</description>

View File

@ -4,11 +4,12 @@
package org.gcube.documentstore.persistence;
import java.io.Serializable;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.gcube.documentstore.persistence.connections.Connection;
import org.gcube.documentstore.persistence.connections.Connections;
import org.gcube.documentstore.persistence.connections.Nodes;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
@ -41,6 +42,9 @@ public class PersistenceCouchBase extends PersistenceBackend {
/*Different bucket for aggregated*/
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
public static final String BUCKET_STORAGE_TYPE="StorageUsageRecord";
public static final String BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord";
public static final String BUCKET_STORAGE_STATUS_TYPE="StorageStatusRecord";
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_SERVICE_TYPE="ServiceUsageRecord";
@ -57,6 +61,9 @@ public class PersistenceCouchBase extends PersistenceBackend {
public static final Integer TIMEOUT_BUCKET=180;
public static final Integer ALIVE_INTERVAL=3600;
protected Map<String, String> bucketNames;
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
@ -64,28 +71,10 @@ public class PersistenceCouchBase extends PersistenceBackend {
.keepAliveInterval(ALIVE_INTERVAL * 1000) // 3600 Seconds in milliseconds
.build();
protected Cluster cluster;
/* One Bucket for type*/
protected Bucket bucketStorage;
protected String bucketNameStorage;
private Nodes nodes;
protected Bucket bucketService;
protected String bucketNameService;
protected Bucket bucketPortlet;
protected String bucketNamePortlet;
protected Bucket bucketJob;
protected String bucketNameJob;
protected Bucket bucketTask;
protected String bucketNameTask;
private Map <String, Bucket> connectionMap;
//TEST
private static Integer count=0;
private String password ;
/**
* {@inheritDoc}
@ -93,47 +82,67 @@ public class PersistenceCouchBase extends PersistenceBackend {
@Override
protected void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception {
String url = configuration.getProperty(URL_PROPERTY_KEY);
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
nodes= new Nodes(url);
logger.debug("PersistenceCouchBase prepareConnection url:{} and now is connectionsMap:{}",url,Connections.connectionsMap );
try {
cluster = CouchbaseCluster.create(ENV, url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY);
connectionMap = new HashMap<String, Bucket>();
bucketStorage = cluster.openBucket( bucketNameStorage,password);
connectionMap.put(BUCKET_STORAGE_TYPE, bucketStorage);
bucketService = cluster.openBucket( bucketNameService,password);
connectionMap.put(BUCKET_SERVICE_TYPE, bucketService);
bucketJob = cluster.openBucket( bucketNameJob,password);
connectionMap.put(BUCKET_JOB_TYPE, bucketJob);
bucketPortlet = cluster.openBucket( bucketNamePortlet,password);
connectionMap.put(BUCKET_PORTLET_TYPE, bucketPortlet);
bucketTask = cluster.openBucket( bucketNameTask,password);
connectionMap.put(BUCKET_TASK_TYPE, bucketTask);
} catch(Exception e) {
cluster.disconnect();
logger.error("Bucket connection error", e);
throw e;
}
bucketNames = new HashMap<>();
bucketNames.put(BUCKET_STORAGE_TYPE, configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_STORAGE_STATUS_TYPE, configuration.getProperty(BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_SERVICE_TYPE, configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_JOB_TYPE, configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_PORTLET_TYPE, configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY));
bucketNames.put(BUCKET_TASK_TYPE, configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY));
}
@Override
protected void openConnection() throws Exception {
synchronized (Connections.connectionsMap) {
if (!Connections.connectionsMap.containsKey(nodes)){
//open cluster and add into map
logger.debug("PersistenceCouchBase openConnection bucketNames :{}",bucketNames);
Cluster cluster = null;
try {
cluster = CouchbaseCluster.create(ENV, nodes.getNodes());
Connections.connectionsMap.put(nodes, new Connection(cluster));
logger.debug("PersistenceCouchBase openConnection insert nodes:{}",Connections.connectionsMap );
} catch(Exception e) {
cluster.disconnect();
logger.error("Bucket connection error", e);
throw e;
}
}
else{
logger.debug("PersistenceCouchBase openConnection contains node use an existing cluster env");
}
}
}
protected Bucket getBucketConnection(String recordType){
//Bucket bucket = connectionMap.get(recordType);
Bucket bucket = null;
synchronized (Connections.connectionsMap) {
bucket =Connections.connectionsMap.get(nodes).getBucketsMap().get(bucketNames.get(recordType));
logger.debug("PersistenceCouchBase getBucketConnection recordType:{}, bucket name:{}",recordType,bucketNames.get(recordType));
if(bucket == null){
//bucket = cluster.openBucket(recordType, password);
bucket = Connections.connectionsMap.get(nodes).getCluster().openBucket(bucketNames.get(recordType), password);
logger.debug("PersistenceCouchBase getBucketConnection bucket close, open:{}",bucket.toString() );
//connectionMap.put(recordType, bucket);
Connections.connectionsMap.get(nodes).getBucketsMap().put(bucketNames.get(recordType), bucket);
logger.debug("PersistenceCouchBase getBucketConnection connectionMap:{}",Connections.connectionsMap.get(nodes).getBucketsMap());
}
}
return bucket;
}
protected JsonDocument createItem(JsonObject jsonObject, String id,String recordType) throws Exception {
JsonDocument doc = JsonDocument.create(id, jsonObject);
return connectionMap.get(recordType).upsert(doc);
return getBucketConnection(recordType).upsert(doc);
}
public static JsonNode usageRecordToJsonNode(Record record) throws Exception {
@ -163,12 +172,40 @@ public class PersistenceCouchBase extends PersistenceBackend {
createItem(jsonObject, record.getId(),recordType);
}
@Override
protected void disconnect() throws Exception {
synchronized (Connections.connectionsMap) {
for (Map.Entry<String, Bucket> entry : Connections.connectionsMap.get(nodes).getBucketsMap().entrySet())
{
Boolean closed = entry.getValue().close();
if (!closed){
logger.warn("bucket not close :{}",entry.getKey());
}
}
Boolean clusterClosed= Connections.connectionsMap.get(nodes).getCluster().disconnect();
if (!clusterClosed){
logger.warn("cluster not disconnect");
}
Connections.connectionsMap.remove(nodes);
logger.debug("PersistenceCouchBase disconnect" );
}
};
/**
* {@inheritDoc}
*/
@Override
public void close() throws Exception {
cluster.disconnect();
logger.debug("PersistenceCouchBase close" );
//Boolean closed =connections.connectionsMap.get(nodes).getCluster().disconnect();
//cluster.disconnect();
}
}

View File

@ -0,0 +1,35 @@
package org.gcube.documentstore.persistence.connections;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
public class Connection {
Map<String, Bucket> bucketsMap = Collections.synchronizedMap(new HashMap<String, Bucket>());
Cluster cluster;
public Connection(Cluster cluster) {
super();
this.cluster = cluster;
}
public Map<String, Bucket> getBucketsMap() {
return bucketsMap;
}
public Cluster getCluster(){
return this.cluster;
}
@Override
public String toString() {
return "Connection [bucketsMap=" + bucketsMap + ", cluster=" + cluster
+ "]";
}
}

View File

@ -0,0 +1,10 @@
package org.gcube.documentstore.persistence.connections;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class Connections {
public static Map<Nodes, Connection> connectionsMap = Collections.synchronizedMap(new HashMap<Nodes, Connection>());
}

View File

@ -0,0 +1,48 @@
package org.gcube.documentstore.persistence.connections;
public class Nodes {
private String nodes;
public Nodes(String nodes) {
super();
this.nodes = nodes;
}
public String getNodes() {
return nodes;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((nodes == null) ? 0 : nodes.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Nodes other = (Nodes) obj;
if (nodes == null) {
if (other.nodes != null)
return false;
} else if (!nodes.equals(other.nodes))
return false;
return true;
}
@Override
public String toString() {
return "Nodes [nodes=" + nodes + "]";
}
}