refs #1665: Create Accounting Analytics Persistence CouchBase

https://support.d4science.org/issues/1665

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-analytics-persistence-couchbase@122185 82a268e6-3cf1-43bd-a215-b396298e98cf
feature/19115
Luca Frosini 8 years ago
parent 9499d0ca05
commit 6700f27202

@ -66,7 +66,7 @@
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<!-- END CouchBase libraries -->

@ -6,6 +6,7 @@ package org.gcube.accounting.analytics.persistence.couchbase;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Set;
import org.gcube.accounting.analytics.Filter;
@ -20,6 +21,11 @@ import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.query.Select;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -34,8 +40,15 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
public static final String PASSWORD_PROPERTY_KEY = "password";
public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName";
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
.queryEnabled(true)
.build();
protected Cluster cluster;
protected Bucket bucket;
protected String bucketName;
/**
* {@inheritDoc}
@ -46,9 +59,9 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
// String username = configuration.getProperty(USERNAME_PROPERTY_KEY);
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
cluster = CouchbaseCluster.create(url);
bucket = cluster.openBucket(
configuration.getProperty(BUCKET_NAME_PROPERTY_KEY), password);
cluster = CouchbaseCluster.create(ENV, url);
bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY);
bucket = cluster.openBucket(bucketName, password);
}
/**
@ -67,7 +80,13 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
@SuppressWarnings("rawtypes") Class<? extends AggregatedRecord> recordClass,
TemporalConstraint temporalConstraint, List<Filter> filters)
throws Exception {
// TODO Auto-generated method stub
N1qlQueryResult result = bucket.query(Select.select("*").from(bucketName).limit(10));
List<N1qlQueryRow> rows = result.allRows();
for(N1qlQueryRow row : rows){
logger.debug(row.toString());
}
return null;
}

@ -0,0 +1 @@
org.gcube.accounting.analytics.persistence.couchbase.AccountingPersistenceQueryCouchBase

@ -0,0 +1,27 @@
/**
*
*/
package org.gcube.accounting.analytics.persistence;
import org.gcube.accounting.analytics.persistence.couchbase.AccountingPersistenceQueryCouchBase;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AccountingPersistenceQueryConfigurationTest {
private static Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryConfigurationTest.class);
@Test
public void test() throws Exception {
ScopeProvider.instance.set("/gcube/devsec/devVRE");
AccountingPersistenceBackendQueryConfiguration acbqc = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryCouchBase.class);
logger.debug("{}", acbqc);
}
}

@ -0,0 +1,258 @@
/**
*
*/
package org.gcube.accounting.analytics.persistence;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.ResourceRecordQuery;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.exception.NoAvailableScopeException;
import org.gcube.accounting.analytics.exception.NoUsableAccountingPersistenceQueryFound;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AccountingPersistenceQueryFactoryTest {
private static Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryFactoryTest.class);
protected AccountingPersistenceBackendQuery apq;
@Before
public void before() throws NoAvailableScopeException, NoUsableAccountingPersistenceQueryFound{
ScopeProvider.instance.set("/gcube/devNext");
apq = AccountingPersistenceBackendQueryFactory.getInstance();
}
@After
public void after(){
ScopeProvider.instance.reset();
apq = null;
}
@Test
public void testNullFilters() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.setTimeInMillis(startTime.getTimeInMillis()-(1000*60*60*24*3));
Calendar endTime = Calendar.getInstance();
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.SECONDLY);
Map<Calendar, Info> infos = apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, null);
Assert.assertTrue(infos!=null);
for(Info info : infos.values()){
logger.debug(info.toString());
}
}
@Test
public void testEmptyFilters() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.setTimeInMillis(startTime.getTimeInMillis()-(1000*60*60*24*3));
Calendar endTime = Calendar.getInstance();
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.SECONDLY);
List<Filter> filters = new ArrayList<Filter>();
Map<Calendar, Info> infos = apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
for(Info info : infos.values()){
logger.debug(info.toString());
}
}
@Test(expected=Exception.class)
public void testFakeFilters() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.setTimeInMillis(startTime.getTimeInMillis()-(1000*60*60*24*3));
Calendar endTime = Calendar.getInstance();
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.SECONDLY);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new Filter("AUX", "AUX"));
apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, filters);
}
@Test
public void testFiltersGoodKeyFakeValue() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.setTimeInMillis(startTime.getTimeInMillis()-(1000*60*60*24*3));
Calendar endTime = Calendar.getInstance();
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.SECONDLY);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new Filter(ServiceUsageRecord.SERVICE_CLASS, "AUX"));
Map<Calendar, Info> infos = apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(infos.isEmpty());
}
public static final long timeout = 5000;
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void testFiltersGoodKeyGoodValue() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.setTimeInMillis(startTime.getTimeInMillis()-(1000*60*60*24*365));
Calendar endTime = Calendar.getInstance();
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.SECONDLY);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new Filter(ServiceUsageRecord.SERVICE_CLASS, TestUsageRecord.TEST_SERVICE_CLASS));
Map<Calendar, Info> infos = apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(!infos.isEmpty());
for(Info info : infos.values()){
logger.debug(info.toString());
}
filters = new ArrayList<Filter>();
filters.add(new Filter(StorageUsageRecord.RESOURCE_OWNER, TestUsageRecord.TEST_RESOUCE_OWNER));
infos = apq.query(AggregatedStorageUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(!infos.isEmpty());
for(Info info : infos.values()){
logger.debug(info.toString());
}
}
//@ Test
public void testRange() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.set(Calendar.MONTH, Calendar.AUGUST);
startTime.set(Calendar.DAY_OF_MONTH, 7);
Calendar endTime = Calendar.getInstance();
endTime.set(Calendar.MONTH, Calendar.SEPTEMBER);
endTime.set(Calendar.DAY_OF_MONTH, 7);
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.DAILY);
logger.trace("{} : {}", TemporalConstraint.class.getSimpleName(), temporalConstraint.toString());
List<Filter> filters = new ArrayList<Filter>();
/*
filters.add(new Filter(ServiceUsageRecord.SERVICE_CLASS, TestUsageRecord.TEST_SERVICE_CLASS));
Map<Calendar, Info> infos = apq.query(AggregatedServiceUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(!infos.isEmpty());
for(Info info : infos.values()){
logger.debug(info.toString());
}
*/
filters = new ArrayList<Filter>();
filters.add(new Filter(UsageRecord.CONSUMER_ID, "gianpaolo.coro"));
AccountingPersistenceBackendQuery apq = AccountingPersistenceBackendQueryFactory.getInstance();
Map<Calendar, Info> infos = apq.query(AggregatedStorageUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(!infos.isEmpty());
for(Info info : infos.values()){
logger.debug(info.toString());
}
ResourceRecordQuery resourceRecordQuery = new ResourceRecordQuery();
List<Info> padded = resourceRecordQuery.getInfo(AggregatedStorageUsageRecord.class, temporalConstraint, filters, true);
Assert.assertTrue(padded!=null);
Assert.assertTrue(!padded.isEmpty());
for(Info info : padded){
logger.debug(info.toString());
}
}
@Test
public void getKeysTest() throws Exception {
Set<String> keys = apq.getKeys(AggregatedServiceUsageRecord.class);
logger.debug("Got keys : {}", keys);
Set<String> required = AggregatedServiceUsageRecord.class.newInstance().getRequiredFields();
logger.debug("Required fields : {}", required);
Assert.assertTrue(required.containsAll(keys));
}
@Test
public void getPossibileValuesTest() throws Exception {
@SuppressWarnings("rawtypes")
Class[] classes = new Class[]{
AggregatedServiceUsageRecord.class,
AggregatedStorageUsageRecord.class
};
for(@SuppressWarnings("rawtypes") Class cls : classes){
@SuppressWarnings("unchecked")
Set<String> keys = apq.getKeys(cls);
for(String key : keys){
logger.debug("Querying Possible values of {} for key {}", cls.getSimpleName(), key);
@SuppressWarnings("unchecked")
Set<String> values = apq.getPossibleValuesForKey(cls, key);
logger.debug("Possible values of {} for key {} : {}", cls.getSimpleName(), key, values);
}
}
}
@Test
public void testMontly() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.set(Calendar.MONTH, Calendar.JANUARY);
startTime.set(Calendar.DAY_OF_MONTH, 1);
Calendar endTime = Calendar.getInstance();
endTime.set(Calendar.MONTH, Calendar.DECEMBER);
endTime.set(Calendar.DAY_OF_MONTH, 31);
TemporalConstraint temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.MONTHLY);
logger.trace("{} : {}", TemporalConstraint.class.getSimpleName(), temporalConstraint.toString());
List<Filter> filters = new ArrayList<Filter>();
AccountingPersistenceBackendQuery apq = AccountingPersistenceBackendQueryFactory.getInstance();
Map<Calendar, Info> infos = apq.query(AggregatedStorageUsageRecord.class, temporalConstraint, filters);
Assert.assertTrue(infos!=null);
Assert.assertTrue(!infos.isEmpty());
for(Info info : infos.values()){
logger.debug(info.toString());
}
ResourceRecordQuery resourceRecordQuery = new ResourceRecordQuery();
List<Info> padded = resourceRecordQuery.getInfo(AggregatedStorageUsageRecord.class, temporalConstraint, filters, true);
Assert.assertTrue(padded!=null);
Assert.assertTrue(!padded.isEmpty());
for(Info info : padded){
logger.debug(info.toString());
}
}
}

@ -0,0 +1,23 @@
package org.gcube.accounting.analytics.utils;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public abstract class IOUtility {
public static String readFile(String filePath) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
String line = null;
StringBuilder stringBuilder = new StringBuilder();
String ls = System.getProperty("line.separator");
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
stringBuilder.append(ls);
}
reader.close();
return stringBuilder.toString();
}
}

@ -0,0 +1,40 @@
/**
*
*/
package org.gcube.accounting.testutility;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class StressTestUtility {
private static final Logger logger = LoggerFactory.getLogger(StressTestUtility.class);
protected final static int DEFAULT_NUMBER_OF_RECORDS = 3000;
public static void stressTest(TestOperation operation) throws Exception {
stressTest(operation, DEFAULT_NUMBER_OF_RECORDS);
}
public static void stressTest(TestOperation operation, int runs) throws Exception {
Calendar startTestTime = new GregorianCalendar();
for(int i=0; i< runs; i++){
operation.operate(i);
}
Calendar stopTestTime = new GregorianCalendar();
double startMillis = startTestTime.getTimeInMillis();
double stopMillis = stopTestTime.getTimeInMillis();
double duration = stopMillis - startMillis;
double average = (duration/runs);
logger.debug("Duration (in millisec) : " + duration);
logger.debug("Average (in millisec) : " + average);
}
}

@ -0,0 +1,15 @@
/**
*
*/
package org.gcube.accounting.testutility;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface TestOperation {
public void operate(int i) throws Exception;
}

@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{0}: %msg%n</pattern>
</encoder>
</appender>
<logger name="org.gcube" level="TRACE" />
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>