Alessandro Pieve 2016-06-28 13:02:34 +00:00
parent 1571c06111
commit 5ab1d06365
1 changed files with 67 additions and 88 deletions

View File

@ -7,8 +7,6 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.gcube.documentstore.persistence.PersistenceBackend;
import org.gcube.documentstore.persistence.PersistenceBackendConfiguration;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
@ -23,7 +21,6 @@ import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.N1qlQueryResult;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -31,40 +28,42 @@ import com.couchbase.client.java.query.N1qlQueryResult;
*/
public class PersistenceCouchBase extends PersistenceBackend {
private static final Logger logger = LoggerFactory
.getLogger(PersistenceCouchBase.class);
public static final String URL_PROPERTY_KEY = "URL";
//public static final String USERNAME_PROPERTY_KEY = "username";
public static final String PASSWORD_PROPERTY_KEY = "password";
public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName";
/*Different bucket for aggregated*/
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
public static final String BUCKET_STORAGE_TYPE="StorageUsageRecord";
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_SERVICE_TYPE="ServiceUsageRecord";
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
public static final String BUCKET_PORTLET_TYPE="PortletUsageRecord";
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_JOB_TYPE="JobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
public static final String BUCKET_TASK_TYPE="TaskUsageRecord";
private static final Logger logger = LoggerFactory
.getLogger(PersistenceCouchBase.class);
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
.connectTimeout(8 * 1000) // 8 Seconds in milliseconds
.keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds
.build();
protected Cluster cluster;
/* One Bucket for type*/
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
.connectTimeout(8 * 1000) // 8 Seconds in milliseconds
.keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds
.build();
protected Cluster cluster;
/* One Bucket for type*/
protected Bucket bucketStorage;
protected String bucketNameStorage;
@ -79,88 +78,70 @@ public class PersistenceCouchBase extends PersistenceBackend {
protected Bucket bucketTask;
protected String bucketNameTask;
/*DISABLE */
//protected Bucket bucket;
private Map <String, Bucket> connectionMap;
//protected Bucket bucket;
private Map <String, Bucket> connectionMap;
/**
* {@inheritDoc}
*/
@Override
protected void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception {
String url = configuration.getProperty(URL_PROPERTY_KEY);
//String username = configuration.getProperty(USERNAME_PROPERTY_KEY);
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
cluster = CouchbaseCluster.create(ENV, url);
logger.trace("cluster: {}",url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY);
connectionMap = new HashMap<String, Bucket>();
bucketStorage = cluster.openBucket(bucketNameStorage, password);
connectionMap.put(BUCKET_STORAGE_TYPE, bucketStorage);
logger.trace("open bucket: {} with {}",bucketNameStorage,BUCKET_STORAGE_TYPE);
bucketService = cluster.openBucket(bucketNameService, password);
connectionMap.put(BUCKET_SERVICE_TYPE, bucketService);
logger.trace("open bucket: {} with {}",bucketNameService,BUCKET_SERVICE_TYPE);
bucketJob= cluster.openBucket(bucketNameJob, password);
connectionMap.put(BUCKET_JOB_TYPE, bucketJob);
logger.trace("open bucket: {} with {}",bucketNameJob,BUCKET_JOB_TYPE);
bucketPortlet= cluster.openBucket(bucketNamePortlet, password);
connectionMap.put(BUCKET_PORTLET_TYPE, bucketPortlet);
logger.trace("open bucket: {} with {}",bucketNamePortlet,BUCKET_PORTLET_TYPE);
try {
cluster = CouchbaseCluster.create(ENV, url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY);
connectionMap = new HashMap<String, Bucket>();
bucketStorage = cluster.openBucket(bucketNameStorage, password);
connectionMap.put(BUCKET_STORAGE_TYPE, bucketStorage);
bucketService = cluster.openBucket(bucketNameService, password);
connectionMap.put(BUCKET_SERVICE_TYPE, bucketService);
bucketJob= cluster.openBucket(bucketNameJob, password);
connectionMap.put(BUCKET_JOB_TYPE, bucketJob);
bucketPortlet= cluster.openBucket(bucketNamePortlet, password);
connectionMap.put(BUCKET_PORTLET_TYPE, bucketPortlet);
bucketTask= cluster.openBucket(bucketNameTask, password);
connectionMap.put(BUCKET_TASK_TYPE, bucketTask);
} catch(Exception e) {
logger.trace("Bucket connection error");
}
bucketTask= cluster.openBucket(bucketNameTask, password);
connectionMap.put(BUCKET_TASK_TYPE, bucketTask);
logger.trace("open bucket: {} with {}",bucketNameTask,BUCKET_TASK_TYPE);
/*DISABLE
bucket = cluster.openBucket(
configuration.getProperty(BUCKET_NAME_PROPERTY_KEY), password);
*/
}
protected JsonDocument createItem(JsonObject jsonObject, String id,String recordType) throws Exception {
JsonDocument doc = JsonDocument.create(id, jsonObject);
logger.trace("record type:{} accounting on:{}",recordType,connectionMap.get(recordType).toString());
return connectionMap.get(recordType).upsert(doc);
//return bucket.upsert(doc);
}
public static JsonNode usageRecordToJsonNode(Record record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.valueToTree(record.getResourceProperties());
return node;
}
public static Record jsonNodeToUsageRecord(JsonNode jsonNode) throws Exception {
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
@ -168,8 +149,8 @@ public class PersistenceCouchBase extends PersistenceBackend {
Record record = RecordUtility.getRecord(result);
return record;
}
/**
* {@inheritDoc}
*/
@ -177,10 +158,8 @@ public class PersistenceCouchBase extends PersistenceBackend {
protected void reallyAccount(Record record) throws Exception {
JsonNode node = PersistenceCouchBase.usageRecordToJsonNode(record);
JsonObject jsonObject = JsonObject.fromJson(node.toString());
//get a bucket association
String recordType=record.getRecordType();
createItem(jsonObject, record.getId(),recordType);
}