From 6d41713016472613211a44155286abf24f889273 Mon Sep 17 00:00:00 2001 From: Alessandro Pieve Date: Tue, 17 Jan 2017 11:11:18 +0000 Subject: [PATCH] Add a share connection into multiple threads git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib-couchbase@141603 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 2 +- .../persistence/PersistenceCouchBase.java | 153 +++++++++++------- .../persistence/connections/Connection.java | 35 ++++ .../persistence/connections/Connections.java | 10 ++ .../persistence/connections/Nodes.java | 48 ++++++ 5 files changed, 189 insertions(+), 59 deletions(-) create mode 100644 src/main/java/org/gcube/documentstore/persistence/connections/Connection.java create mode 100644 src/main/java/org/gcube/documentstore/persistence/connections/Connections.java create mode 100644 src/main/java/org/gcube/documentstore/persistence/connections/Nodes.java diff --git a/pom.xml b/pom.xml index d8310e8..93fbd3e 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.gcube.data.publishing document-store-lib-couchbase - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT Document Store CouchBase Connector Document Store Connector for CouchBase diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceCouchBase.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceCouchBase.java index 3a7c0fa..3492687 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceCouchBase.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceCouchBase.java @@ -4,11 +4,12 @@ package org.gcube.documentstore.persistence; import java.io.Serializable; -import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeoutException; +import org.gcube.documentstore.persistence.connections.Connection; +import org.gcube.documentstore.persistence.connections.Connections; +import org.gcube.documentstore.persistence.connections.Nodes; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; @@ -41,6 +42,9 @@ public class PersistenceCouchBase extends PersistenceBackend { /*Different bucket for aggregated*/ public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord"; public static final String BUCKET_STORAGE_TYPE="StorageUsageRecord"; + + public static final String BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord"; + public static final String BUCKET_STORAGE_STATUS_TYPE="StorageStatusRecord"; public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord"; public static final String BUCKET_SERVICE_TYPE="ServiceUsageRecord"; @@ -57,6 +61,9 @@ public class PersistenceCouchBase extends PersistenceBackend { public static final Integer TIMEOUT_BUCKET=180; public static final Integer ALIVE_INTERVAL=3600; + + protected Map bucketNames; + /* The environment configuration */ protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment.builder() @@ -64,28 +71,10 @@ public class PersistenceCouchBase extends PersistenceBackend { .keepAliveInterval(ALIVE_INTERVAL * 1000) // 3600 Seconds in milliseconds .build(); - protected Cluster cluster; - /* One Bucket for type*/ - protected Bucket bucketStorage; - protected String bucketNameStorage; + private Nodes nodes; - protected Bucket bucketService; - protected String bucketNameService; - - protected Bucket bucketPortlet; - protected String bucketNamePortlet; - - protected Bucket bucketJob; - protected String bucketNameJob; - - protected Bucket bucketTask; - protected String bucketNameTask; - - private Map connectionMap; - - //TEST - private static Integer count=0; + private String password ; /** * {@inheritDoc} @@ -93,47 +82,67 @@ public class PersistenceCouchBase extends PersistenceBackend { @Override protected void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception { String url = configuration.getProperty(URL_PROPERTY_KEY); - String password = configuration.getProperty(PASSWORD_PROPERTY_KEY); + password = configuration.getProperty(PASSWORD_PROPERTY_KEY); + nodes= new Nodes(url); + logger.debug("PersistenceCouchBase prepareConnection url:{} and now is connectionsMap:{}",url,Connections.connectionsMap ); - try { - - cluster = CouchbaseCluster.create(ENV, url); - - bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY); - bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY); - bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY); - bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY); - bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY); - - connectionMap = new HashMap(); - - bucketStorage = cluster.openBucket( bucketNameStorage,password); - connectionMap.put(BUCKET_STORAGE_TYPE, bucketStorage); - - bucketService = cluster.openBucket( bucketNameService,password); - connectionMap.put(BUCKET_SERVICE_TYPE, bucketService); - - bucketJob = cluster.openBucket( bucketNameJob,password); - connectionMap.put(BUCKET_JOB_TYPE, bucketJob); - - bucketPortlet = cluster.openBucket( bucketNamePortlet,password); - connectionMap.put(BUCKET_PORTLET_TYPE, bucketPortlet); - - - bucketTask = cluster.openBucket( bucketNameTask,password); - connectionMap.put(BUCKET_TASK_TYPE, bucketTask); - - } catch(Exception e) { - cluster.disconnect(); - logger.error("Bucket connection error", e); - throw e; - } + bucketNames = new HashMap<>(); + bucketNames.put(BUCKET_STORAGE_TYPE, configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY)); + bucketNames.put(BUCKET_STORAGE_STATUS_TYPE, configuration.getProperty(BUCKET_STORAGE_STATUS_NAME_PROPERTY_KEY)); + bucketNames.put(BUCKET_SERVICE_TYPE, configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY)); + bucketNames.put(BUCKET_JOB_TYPE, configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY)); + bucketNames.put(BUCKET_PORTLET_TYPE, configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY)); + bucketNames.put(BUCKET_TASK_TYPE, configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY)); } + @Override + protected void openConnection() throws Exception { + synchronized (Connections.connectionsMap) { + if (!Connections.connectionsMap.containsKey(nodes)){ + //open cluster and add into map + logger.debug("PersistenceCouchBase openConnection bucketNames :{}",bucketNames); + Cluster cluster = null; + try { + cluster = CouchbaseCluster.create(ENV, nodes.getNodes()); + Connections.connectionsMap.put(nodes, new Connection(cluster)); + logger.debug("PersistenceCouchBase openConnection insert nodes:{}",Connections.connectionsMap ); + } catch(Exception e) { + cluster.disconnect(); + logger.error("Bucket connection error", e); + throw e; + } + } + else{ + logger.debug("PersistenceCouchBase openConnection contains node use an existing cluster env"); + } + } + + + } + protected Bucket getBucketConnection(String recordType){ + //Bucket bucket = connectionMap.get(recordType); + Bucket bucket = null; + synchronized (Connections.connectionsMap) { + bucket =Connections.connectionsMap.get(nodes).getBucketsMap().get(bucketNames.get(recordType)); + logger.debug("PersistenceCouchBase getBucketConnection recordType:{}, bucket name:{}",recordType,bucketNames.get(recordType)); + if(bucket == null){ + //bucket = cluster.openBucket(recordType, password); + bucket = Connections.connectionsMap.get(nodes).getCluster().openBucket(bucketNames.get(recordType), password); + logger.debug("PersistenceCouchBase getBucketConnection bucket close, open:{}",bucket.toString() ); + //connectionMap.put(recordType, bucket); + Connections.connectionsMap.get(nodes).getBucketsMap().put(bucketNames.get(recordType), bucket); + logger.debug("PersistenceCouchBase getBucketConnection connectionMap:{}",Connections.connectionsMap.get(nodes).getBucketsMap()); + } + } + return bucket; + } + + + protected JsonDocument createItem(JsonObject jsonObject, String id,String recordType) throws Exception { JsonDocument doc = JsonDocument.create(id, jsonObject); - return connectionMap.get(recordType).upsert(doc); + return getBucketConnection(recordType).upsert(doc); } public static JsonNode usageRecordToJsonNode(Record record) throws Exception { @@ -163,12 +172,40 @@ public class PersistenceCouchBase extends PersistenceBackend { createItem(jsonObject, record.getId(),recordType); } + @Override + protected void disconnect() throws Exception { + + synchronized (Connections.connectionsMap) { + for (Map.Entry entry : Connections.connectionsMap.get(nodes).getBucketsMap().entrySet()) + { + Boolean closed = entry.getValue().close(); + if (!closed){ + logger.warn("bucket not close :{}",entry.getKey()); + } + } + Boolean clusterClosed= Connections.connectionsMap.get(nodes).getCluster().disconnect(); + if (!clusterClosed){ + logger.warn("cluster not disconnect"); + } + Connections.connectionsMap.remove(nodes); + + logger.debug("PersistenceCouchBase disconnect" ); + + + + } + }; + /** * {@inheritDoc} */ @Override public void close() throws Exception { - cluster.disconnect(); + logger.debug("PersistenceCouchBase close" ); + //Boolean closed =connections.connectionsMap.get(nodes).getCluster().disconnect(); + //cluster.disconnect(); } + + } diff --git a/src/main/java/org/gcube/documentstore/persistence/connections/Connection.java b/src/main/java/org/gcube/documentstore/persistence/connections/Connection.java new file mode 100644 index 0000000..0b350f1 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/connections/Connection.java @@ -0,0 +1,35 @@ +package org.gcube.documentstore.persistence.connections; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; + +public class Connection { + + Map bucketsMap = Collections.synchronizedMap(new HashMap()); + Cluster cluster; + + public Connection(Cluster cluster) { + super(); + this.cluster = cluster; + } + + public Map getBucketsMap() { + return bucketsMap; + } + + public Cluster getCluster(){ + return this.cluster; + } + + @Override + public String toString() { + return "Connection [bucketsMap=" + bucketsMap + ", cluster=" + cluster + + "]"; + } + + +} diff --git a/src/main/java/org/gcube/documentstore/persistence/connections/Connections.java b/src/main/java/org/gcube/documentstore/persistence/connections/Connections.java new file mode 100644 index 0000000..73c0cb1 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/connections/Connections.java @@ -0,0 +1,10 @@ +package org.gcube.documentstore.persistence.connections; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class Connections { + + public static Map connectionsMap = Collections.synchronizedMap(new HashMap()); +} diff --git a/src/main/java/org/gcube/documentstore/persistence/connections/Nodes.java b/src/main/java/org/gcube/documentstore/persistence/connections/Nodes.java new file mode 100644 index 0000000..737a2b5 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/connections/Nodes.java @@ -0,0 +1,48 @@ +package org.gcube.documentstore.persistence.connections; + + +public class Nodes { + + private String nodes; + + public Nodes(String nodes) { + super(); + this.nodes = nodes; + } + + public String getNodes() { + return nodes; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((nodes == null) ? 0 : nodes.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Nodes other = (Nodes) obj; + if (nodes == null) { + if (other.nodes != null) + return false; + } else if (!nodes.equals(other.nodes)) + return false; + return true; + } + + @Override + public String toString() { + return "Nodes [nodes=" + nodes + "]"; + } + + +}