2016-01-11 11:29:26 +01:00
/ * *
*
* /
package org.gcube.accounting.analytics.persistence.couchbase ;
2016-02-09 18:40:59 +01:00
import static com.couchbase.client.java.query.Select.select ;
import static com.couchbase.client.java.query.dsl.Expression.s ;
import static com.couchbase.client.java.query.dsl.Expression.x ;
2016-03-25 19:18:17 +01:00
import java.security.KeyException ;
import java.util.ArrayList ;
2016-01-11 11:29:26 +01:00
import java.util.Calendar ;
2016-03-25 19:18:17 +01:00
import java.util.Collection ;
2016-06-28 17:18:03 +02:00
import java.util.Collections ;
2016-03-25 19:18:17 +01:00
import java.util.Comparator ;
2016-06-28 17:18:03 +02:00
import java.util.HashMap ;
import java.util.Iterator ;
2016-01-11 11:29:26 +01:00
import java.util.List ;
2016-06-28 17:18:03 +02:00
import java.util.Map ;
2016-01-11 11:29:26 +01:00
import java.util.Set ;
2016-03-25 19:18:17 +01:00
import java.util.SortedMap ;
import java.util.SortedSet ;
import java.util.TreeMap ;
import java.util.TreeSet ;
2016-01-11 11:29:26 +01:00
import org.gcube.accounting.analytics.Filter ;
import org.gcube.accounting.analytics.Info ;
2016-03-25 19:18:17 +01:00
import org.gcube.accounting.analytics.NumberedFilter ;
2016-01-11 11:29:26 +01:00
import org.gcube.accounting.analytics.TemporalConstraint ;
2016-02-09 18:40:59 +01:00
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode ;
2016-03-25 19:18:17 +01:00
import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum ;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException ;
import org.gcube.accounting.analytics.exception.ValueException ;
2016-01-11 11:29:26 +01:00
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery ;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration ;
2016-03-25 19:18:17 +01:00
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery ;
2016-02-09 18:40:59 +01:00
import org.gcube.accounting.datamodel.BasicUsageRecord ;
import org.gcube.accounting.datamodel.UsageRecord ;
2016-02-25 17:56:47 +01:00
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration ;
2016-02-12 17:15:46 +01:00
import org.gcube.common.scope.api.ScopeProvider ;
2016-01-11 11:29:26 +01:00
import org.gcube.documentstore.records.AggregatedRecord ;
2016-06-28 17:18:03 +02:00
import org.gcube.documentstore.records.Record ;
import org.gcube.documentstore.records.RecordUtility ;
2016-02-09 18:40:59 +01:00
import org.json.JSONException ;
import org.json.JSONObject ;
2016-01-11 11:29:26 +01:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.couchbase.client.java.Bucket ;
import com.couchbase.client.java.Cluster ;
import com.couchbase.client.java.CouchbaseCluster ;
2016-03-25 19:18:17 +01:00
import com.couchbase.client.java.document.json.JsonArray ;
2016-02-09 18:40:59 +01:00
import com.couchbase.client.java.document.json.JsonObject ;
2016-01-11 16:41:52 +01:00
import com.couchbase.client.java.env.CouchbaseEnvironment ;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment ;
import com.couchbase.client.java.query.N1qlQueryResult ;
import com.couchbase.client.java.query.N1qlQueryRow ;
2016-02-09 18:40:59 +01:00
import com.couchbase.client.java.query.dsl.Expression ;
2016-03-25 19:18:17 +01:00
import com.couchbase.client.java.query.dsl.Sort ;
2016-06-28 17:18:03 +02:00
import com.couchbase.client.java.query.dsl.path.GroupByPath ;
import com.couchbase.client.java.query.dsl.path.OffsetPath ;
2016-03-25 19:18:17 +01:00
import com.couchbase.client.java.view.ViewQuery ;
import com.couchbase.client.java.view.ViewResult ;
import com.couchbase.client.java.view.ViewRow ;
2016-01-11 11:29:26 +01:00
/ * *
* @author Luca Frosini ( ISTI - CNR ) http : //www.lucafrosini.com/
2016-03-25 19:18:17 +01:00
*
2016-01-11 11:29:26 +01:00
* /
2016-03-25 19:18:17 +01:00
public class AccountingPersistenceQueryCouchBase implements
2016-06-28 17:18:03 +02:00
AccountingPersistenceBackendQuery {
2016-03-25 19:18:17 +01:00
private static final Logger logger = LoggerFactory
. getLogger ( AccountingPersistenceQueryCouchBase . class ) ;
2016-01-11 11:29:26 +01:00
2016-06-28 17:18:03 +02:00
2016-02-25 17:56:47 +01:00
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration . URL_PROPERTY_KEY ;
2016-03-25 19:18:17 +01:00
// public static final String USERNAME_PROPERTY_KEY =
// AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY;
2016-02-25 17:56:47 +01:00
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration . PASSWORD_PROPERTY_KEY ;
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY = " AggregatedStorageUsageRecord " ;
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY = " AggregatedServiceUsageRecord " ;
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY = " AggregatedPortletUsageRecord " ;
public static final String BUCKET_JOB_NAME_PROPERTY_KEY = " AggregatedJobUsageRecord " ;
public static final String BUCKET_TASK_NAME_PROPERTY_KEY = " AggregatedTaskUsageRecord " ;
public static final long ENV_TIME_OUT = 180000 ;
2016-01-11 16:41:52 +01:00
/* The environment configuration */
2016-03-25 19:18:17 +01:00
protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment
2016-06-28 17:18:03 +02:00
. builder ( ) . maxRequestLifetime ( ENV_TIME_OUT ) . queryTimeout ( ENV_TIME_OUT ) . queryEnabled ( true ) . build ( ) ;
2016-03-25 19:18:17 +01:00
2016-01-11 11:29:26 +01:00
protected Cluster cluster ;
2016-06-28 17:18:03 +02:00
/* One Bucket for type*/
protected Bucket bucketStorage ;
protected String bucketNameStorage ;
protected Bucket bucketService ;
protected String bucketNameService ;
protected Bucket bucketPortlet ;
protected String bucketNamePortlet ;
protected Bucket bucketJob ;
protected String bucketNameJob ;
protected Bucket bucketTask ;
protected String bucketNameTask ;
private Map < String , Bucket > connectionMap ;
2016-01-11 11:29:26 +01:00
/ * *
* { @inheritDoc }
* /
@Override
2016-03-25 19:18:17 +01:00
public void prepareConnection (
AccountingPersistenceBackendQueryConfiguration configuration )
2016-06-28 17:18:03 +02:00
throws Exception {
2016-01-11 11:29:26 +01:00
String url = configuration . getProperty ( URL_PROPERTY_KEY ) ;
2016-06-28 17:18:03 +02:00
2016-01-11 11:29:26 +01:00
String password = configuration . getProperty ( PASSWORD_PROPERTY_KEY ) ;
2016-01-11 16:41:52 +01:00
cluster = CouchbaseCluster . create ( ENV , url ) ;
2016-06-28 17:18:03 +02:00
logger . trace ( " env " + ENV . toString ( ) ) ;
bucketNameStorage = configuration . getProperty ( BUCKET_STORAGE_NAME_PROPERTY_KEY ) ;
bucketNameService = configuration . getProperty ( BUCKET_SERVICE_NAME_PROPERTY_KEY ) ;
bucketNameJob = configuration . getProperty ( BUCKET_JOB_NAME_PROPERTY_KEY ) ;
bucketNamePortlet = configuration . getProperty ( BUCKET_PORTLET_NAME_PROPERTY_KEY ) ;
bucketNameTask = configuration . getProperty ( BUCKET_TASK_NAME_PROPERTY_KEY ) ;
connectionMap = new HashMap < String , Bucket > ( ) ;
bucketStorage = cluster . openBucket ( bucketNameStorage , password ) ;
connectionMap . put ( BUCKET_STORAGE_NAME_PROPERTY_KEY , bucketStorage ) ;
bucketService = cluster . openBucket ( bucketNameService , password ) ;
connectionMap . put ( BUCKET_SERVICE_NAME_PROPERTY_KEY , bucketService ) ;
bucketJob = cluster . openBucket ( bucketNameJob , password ) ;
connectionMap . put ( BUCKET_JOB_NAME_PROPERTY_KEY , bucketJob ) ;
bucketPortlet = cluster . openBucket ( bucketNamePortlet , password ) ;
connectionMap . put ( BUCKET_PORTLET_NAME_PROPERTY_KEY , bucketPortlet ) ;
bucketTask = cluster . openBucket ( bucketNameTask , password ) ;
connectionMap . put ( BUCKET_TASK_NAME_PROPERTY_KEY , bucketTask ) ;
logger . trace ( " Open cluster Service Bucket Url: " + url + " Pwd: " + configuration . getProperty ( PASSWORD_PROPERTY_KEY ) +
" BucketName " + configuration . getProperty ( BUCKET_SERVICE_NAME_PROPERTY_KEY ) ) ;
2016-01-11 11:29:26 +01:00
}
2016-03-25 19:18:17 +01:00
2016-01-11 11:29:26 +01:00
/ * *
* { @inheritDoc }
* /
@Override
public void close ( ) throws Exception {
cluster . disconnect ( ) ;
}
2016-03-25 19:18:17 +01:00
protected Calendar getCalendar ( JSONObject obj ,
AggregationMode aggregationMode ) throws NumberFormatException ,
JSONException {
2016-02-09 18:40:59 +01:00
long millis ;
2016-03-25 19:18:17 +01:00
if ( obj . has ( AggregatedRecord . START_TIME ) ) {
2016-02-09 18:40:59 +01:00
millis = new Long ( obj . getString ( AggregatedRecord . START_TIME ) ) ;
2016-03-25 19:18:17 +01:00
logger . trace (
" The result {} was from an aggregated record. Using {} " ,
obj . toString ( ) , AggregatedRecord . START_TIME ) ;
} else {
2016-02-09 18:40:59 +01:00
millis = new Long ( obj . getString ( UsageRecord . CREATION_TIME ) ) ;
2016-03-25 19:18:17 +01:00
logger . trace ( " The result {} was from single record. Using {} " ,
obj . toString ( ) , UsageRecord . CREATION_TIME ) ;
2016-02-09 18:40:59 +01:00
}
2016-03-25 19:18:17 +01:00
Calendar calendar = TemporalConstraint . getAlignedCalendar ( millis ,
aggregationMode ) ;
logger . trace ( " {} has been aligned to {} " , millis ,
calendar . getTimeInMillis ( ) ) ;
2016-02-09 18:40:59 +01:00
return calendar ;
}
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
protected Map < Calendar , Info > selectQuery (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
String recordType = clz . newInstance ( ) . getRecordType ( ) ;
Expression expression = x ( BasicUsageRecord . SCOPE ) . eq ( s ( currentScope ) ) ;
expression = expression . and ( x ( BasicUsageRecord . RECORD_TYPE ) . eq (
s ( recordType ) ) ) ;
long startTime = temporalConstraint . getAlignedStartTime ( )
. getTimeInMillis ( ) ;
expression = expression . and ( x ( AggregatedRecord . START_TIME )
. gt ( startTime ) . or (
x ( AggregatedRecord . CREATION_TIME ) . gt ( startTime ) ) ) ;
long endTime = temporalConstraint . getAlignedEndTime ( ) . getTimeInMillis ( ) ;
expression = expression . and ( x ( AggregatedRecord . END_TIME ) . lt ( endTime ) )
. or ( x ( AggregatedRecord . CREATION_TIME ) . lt ( endTime ) ) ;
AggregationMode aggregationMode = temporalConstraint
. getAggregationMode ( ) ;
// TODO Aggregate Results
if ( filters ! = null ) {
for ( Filter filter : filters ) {
expression = expression . and ( x ( filter . getKey ( ) ) . eq (
s ( filter . getValue ( ) ) ) ) ;
}
}
GroupByPath groupByPath = select ( " * " ) . from ( connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) )
. where ( expression ) ;
Map < Calendar , Info > map = new HashMap < Calendar , Info > ( ) ;
// using the DSL
/ * e . g .
select sum ( dataVolume ) as Volume ,
sum ( operationCount ) as Operation ,
consumerId
from accounting_storage_test
group by order by Volume desc
* /
// N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(select("sum(dataVolume) as Volume ,sum(operationCount) as Operation , consumerId").from("accounting_storage").groupBy("consumerId").limit(5));
//logger.info("result"+result.toString());
N1qlQueryResult result = connectionMap . get ( clz . getSimpleName ( ) ) . query ( groupByPath ) ;
if ( ! result . finalSuccess ( ) ) {
logger . debug ( " {} failed : {} " ,
N1qlQueryResult . class . getSimpleName ( ) , result . errors ( ) ) ;
return map ;
}
List < N1qlQueryRow > rows = result . allRows ( ) ;
for ( N1qlQueryRow row : rows ) {
try {
logger . trace ( " Row : {} " , row . toString ( ) ) ;
JsonObject jsonObject = row . value ( ) . getObject ( clz . getSimpleName ( ) ) ;
logger . trace ( " JsonObject : {} " , row . toString ( ) ) ;
String recordString = jsonObject . toMap ( ) . toString ( ) ;
logger . trace ( " Record String : {} " , recordString ) ;
Record record = RecordUtility . getRecord ( recordString ) ;
JSONObject obj = new JSONObject ( jsonObject . toString ( ) ) ;
Calendar calendar = getCalendar ( obj , aggregationMode ) ;
if ( map . containsKey ( calendar ) ) {
Info info = map . get ( calendar ) ;
JSONObject value = info . getValue ( ) ;
jsonObject . toMap ( ) ;
} else {
map . put ( calendar , new Info ( calendar , obj ) ) ;
}
} catch ( Exception e ) {
logger . warn ( " Unable to eleborate result for {} " , row . toString ( ) ) ;
}
logger . trace ( " \ n \ n \ n " ) ;
}
return map ;
}
2016-03-25 19:18:17 +01:00
protected Calendar getCalendarFromArray ( JsonArray array )
throws JSONException {
boolean startFound = false ;
Calendar calendar = Calendar
. getInstance ( TemporalConstraint . DEFAULT_TIME_ZONE ) ;
int count = 0 ;
CalendarEnum [ ] calendarValues = CalendarEnum . values ( ) ;
for ( int i = 0 ; i < array . size ( ) ; i + + ) {
try {
int value = array . getInt ( i ) ;
int calendarValue = calendarValues [ count ] . getCalendarValue ( ) ;
if ( calendarValue = = Calendar . MONTH ) {
value - - ;
}
calendar . set ( calendarValue , value ) ;
count + + ;
startFound = true ;
} catch ( Exception e ) {
/ *
logger . trace ( " The provide value is not an int. {} " , array
. get ( i ) . toString ( ) ) ;
2016-06-28 17:18:03 +02:00
* /
2016-03-25 19:18:17 +01:00
if ( startFound ) {
break ;
}
}
}
for ( int j = count ; j < calendarValues . length ; j + + ) {
if ( calendarValues [ j ] . getCalendarValue ( ) = = Calendar . DAY_OF_MONTH ) {
calendar . set ( calendarValues [ j ] . getCalendarValue ( ) , 1 ) ;
} else {
calendar . set ( calendarValues [ j ] . getCalendarValue ( ) , 0 ) ;
}
}
return calendar ;
}
protected JsonArray getRangeKey ( long time , AggregationMode aggregationMode ,
boolean wildCard , boolean endKey ) throws JSONException {
JsonArray array = JsonArray . create ( ) ;
Calendar calendar = Calendar . getInstance ( ) ;
calendar . setTimeInMillis ( time ) ;
CalendarEnum [ ] values = CalendarEnum . values ( ) ;
if ( endKey ) {
calendar . add ( values [ aggregationMode . ordinal ( ) ] . getCalendarValue ( ) ,
1 ) ;
}
for ( int i = 0 ; i < = aggregationMode . ordinal ( ) ; i + + ) {
int value = calendar . get ( values [ i ] . getCalendarValue ( ) ) ;
if ( values [ i ] . getCalendarValue ( ) = = Calendar . MONTH ) {
value = value + 1 ;
}
array . add ( value ) ;
}
if ( wildCard ) {
array . add ( " {} " ) ;
}
return array ;
}
2016-06-28 17:18:03 +02:00
protected static final String MAP_REDUCE__DESIGN = " " ;
2016-03-25 19:18:17 +01:00
protected static final String MAP_REDUCE_ALL = " all " ;
2016-06-28 17:18:03 +02:00
2016-01-11 11:29:26 +01:00
/ * *
2016-03-25 19:18:17 +01:00
* Used in the name of map reduce to separate keys used as filter
2016-01-11 11:29:26 +01:00
* /
2016-03-25 19:18:17 +01:00
protected static final String KEYS_SEPARATOR = " __ " ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
protected String getDesignDocId (
Class < ? extends AggregatedRecord < ? , ? > > recordClass )
2016-06-28 17:18:03 +02:00
throws InstantiationException , IllegalAccessException {
2016-03-25 19:18:17 +01:00
return String . format ( " %s%s " , MAP_REDUCE__DESIGN , recordClass
. newInstance ( ) . getRecordType ( ) ) ;
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
public static String getMapReduceFunctionName (
Collection < String > collection ) {
String reduceFunction = MAP_REDUCE_ALL ;
2016-06-28 17:18:03 +02:00
if ( ! collection . isEmpty ( ) ) {
reduceFunction = null ;
for ( String property : collection ) {
if ( reduceFunction = = null ) {
reduceFunction = property ;
} else {
reduceFunction = reduceFunction + KEYS_SEPARATOR + property ;
}
2016-03-25 19:18:17 +01:00
}
}
return reduceFunction ;
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
protected SortedMap < Calendar , Info > mapReduceQuery (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
2016-06-28 17:18:03 +02:00
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
2016-03-25 19:18:17 +01:00
2016-02-12 17:15:46 +01:00
String currentScope = ScopeProvider . instance . get ( ) ;
2016-03-25 19:18:17 +01:00
JsonArray startKey = JsonArray . create ( ) ;
startKey . add ( currentScope ) ;
JsonArray endKey = JsonArray . create ( ) ;
endKey . add ( currentScope ) ;
AggregationMode aggregationMode = temporalConstraint
. getAggregationMode ( ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
JsonArray temporalStartKey = getRangeKey (
temporalConstraint . getStartTime ( ) ,
aggregationMode , false , false ) ;
JsonArray temporalEndKey = getRangeKey (
temporalConstraint . getEndTime ( ) ,
2016-06-28 17:18:03 +02:00
aggregationMode , false , false ) ;
2016-03-25 19:18:17 +01:00
Set < String > recordKeysSet = AccountingPersistenceQuery
. getQuerableKeys ( clz . newInstance ( ) ) ;
Collection < String > keys = new TreeSet < > ( ) ;
2016-06-28 17:18:03 +02:00
//JsonArray filterStartKey = JsonArray.create();
//JsonArray filterEndKey = JsonArray.create();
2016-03-25 19:18:17 +01:00
if ( filters ! = null & & filters . size ( ) ! = 0 ) {
2016-06-28 17:18:03 +02:00
// Sorting filter for call a mapreduce
Collections . sort ( filters , new Comparator < Filter > ( ) {
@Override
public int compare ( Filter filter1 , Filter filter2 )
{
return filter1 . getKey ( ) . compareTo ( filter2 . getKey ( ) ) ;
}
} ) ;
logger . trace ( " filter " + filters . toString ( ) ) ;
2016-03-25 19:18:17 +01:00
for ( Filter filter : filters ) {
String filterKey = filter . getKey ( ) ;
String filterValue = filter . getValue ( ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
if ( filterKey ! = null & & filterKey . compareTo ( " " ) ! = 0
& & recordKeysSet . contains ( filterKey ) ) {
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
if ( filterValue ! = null & & filterValue . compareTo ( " " ) ! = 0 ) {
if ( keys . contains ( filterKey ) ) {
throw new DuplicatedKeyFilterException (
" Only one value per Filter key is allowed " ) ;
}
2016-06-28 17:18:03 +02:00
startKey . add ( filterValue ) ;
endKey . add ( filterValue ) ;
//filterStartKey.add(filterValue);
//filterEndKey.add(filterValue);
2016-03-25 19:18:17 +01:00
keys . add ( filterKey ) ;
} else {
throw new KeyException (
2016-06-28 17:18:03 +02:00
String . format ( " Invalid %s : %s " ,
Filter . class . getSimpleName ( ) , filter . toString ( ) ) ) ;
2016-03-25 19:18:17 +01:00
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
} else {
throw new ValueException ( String . format ( " Invalid %s : %s " ,
Filter . class . getSimpleName ( ) , filter . toString ( ) ) ) ;
}
2016-02-09 18:40:59 +01:00
}
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
// +1 because mode start from 0
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode . ordinal ( ) + 1 + 1 ;
int groupLevel = scopeDateGroupLevel ;
if ( filters ! = null ) {
groupLevel + = keys . size ( ) ;
}
2016-02-09 18:40:59 +01:00
2016-03-25 19:18:17 +01:00
String designDocId = getDesignDocId ( clz ) ;
2016-06-28 17:18:03 +02:00
for ( Object temporal : temporalStartKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) )
startKey . add ( temporal ) ;
}
int count = 1 ;
for ( Object temporal : temporalEndKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) ) {
//couchbase exclude last value
if ( count = = temporalEndKey . size ( ) )
temporal = ( int ) temporal + 1 ;
endKey . add ( temporal ) ;
}
count + + ;
}
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
logger . trace ( " startKey:{} " + startKey ) ;
//startKey.add(temporalStartKey.toList());
//endKey.add(temporalEndKey.toList());
2016-03-25 19:18:17 +01:00
String viewName = getMapReduceFunctionName ( keys ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
2016-06-28 17:18:03 +02:00
query . inclusiveEnd ( ) ;
2016-03-25 19:18:17 +01:00
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . endKey ( endKey ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
query . descending ( false ) ;
2016-06-28 17:18:03 +02:00
logger . trace ( " Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ " Group Level : {}, Start Key : {}, End Key : {}, "
+ " temporalStartKey :{}, temporalEndKey :{} " ,
clz . getSimpleName ( ) , designDocId , viewName , groupLevel , startKey , endKey , temporalStartKey . toString ( ) , temporalEndKey . toString ( ) ) ;
2016-03-25 19:18:17 +01:00
SortedMap < Calendar , Info > infos = new TreeMap < > ( ) ;
ViewResult viewResult ;
try {
2016-06-28 17:18:03 +02:00
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
2016-03-25 19:18:17 +01:00
} catch ( Exception e ) {
2016-06-28 17:18:03 +02:00
logger . error ( e . getLocalizedMessage ( ) ) ;
2016-03-25 19:18:17 +01:00
throw e ;
2016-01-11 16:41:52 +01:00
}
2016-03-25 19:18:17 +01:00
for ( ViewRow row : viewResult ) {
JsonArray array = ( JsonArray ) row . key ( ) ;
Calendar calendar = getCalendarFromArray ( array ) ;
2016-01-11 11:29:26 +01:00
2016-03-25 19:18:17 +01:00
JsonObject value = ( JsonObject ) row . value ( ) ;
2016-06-28 17:18:03 +02:00
JSONObject obj = new JSONObject ( value . toString ( ) ) ;
2016-03-25 19:18:17 +01:00
Info info = new Info ( calendar , obj ) ;
infos . put ( calendar , info ) ;
}
2016-06-28 17:18:03 +02:00
if ( infos . isEmpty ( ) ) {
//exec a map reduce for found name key
query = ViewQuery . from ( designDocId , viewName ) ;
query . groupLevel ( groupLevel ) ;
query . descending ( false ) ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( e . getLocalizedMessage ( ) ) ;
throw e ;
}
ViewRow row = viewResult . allRows ( ) . get ( 0 ) ;
JsonArray array = getRangeKey (
temporalConstraint . getStartTime ( ) ,
aggregationMode , false , false ) ;
Calendar calendar = getCalendarFromArray ( array ) ;
JsonObject value = ( JsonObject ) row . value ( ) ;
JSONObject objJson = new JSONObject ( value . toString ( ) ) ;
JSONObject objJsontemplate = new JSONObject ( ) ;
Iterator < ? > iterateJson = objJson . keys ( ) ;
while ( iterateJson . hasNext ( ) ) {
String key = ( String ) iterateJson . next ( ) ;
objJsontemplate . put ( key , 0 ) ;
}
//generate an example object for json
Info info = new Info ( calendar , objJsontemplate ) ;
infos . put ( calendar , info ) ;
//break;
}
2016-03-25 19:18:17 +01:00
return infos ;
}
2016-06-28 17:18:03 +02:00
2016-01-11 11:29:26 +01:00
@Override
2016-03-25 19:18:17 +01:00
public SortedMap < Calendar , Info > getTimeSeries (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
2016-06-28 17:18:03 +02:00
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
SortedMap < Calendar , Info > map = mapReduceQuery ( clz , temporalConstraint , filters ) ;
return map ;
2016-01-11 11:29:26 +01:00
}
@Override
2016-03-25 19:18:17 +01:00
public SortedMap < NumberedFilter , SortedMap < Calendar , Info > > getTopValues (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
2016-06-28 17:18:03 +02:00
TemporalConstraint temporalConstraint , List < Filter > filters ,
String topKey , String orderingProperty ) throws Exception {
Comparator < NumberedFilter > comparator = new Comparator < NumberedFilter > ( ) {
@Override
public int compare ( NumberedFilter o1 , NumberedFilter o2 ) {
return - o1 . compareTo ( o2 ) ;
}
} ;
2016-03-29 10:58:33 +02:00
SortedMap < NumberedFilter , SortedMap < Calendar , Info > > ret =
2016-06-28 17:18:03 +02:00
new TreeMap < > ( comparator ) ;
2016-03-29 10:58:33 +02:00
SortedSet < NumberedFilter > top = getNextPossibleValues ( clz ,
temporalConstraint , filters , topKey , orderingProperty ) ;
2016-06-28 17:18:03 +02:00
2016-03-29 10:58:33 +02:00
for ( NumberedFilter nf : top ) {
filters . add ( nf ) ;
SortedMap < Calendar , Info > map =
mapReduceQuery ( clz , temporalConstraint , filters ) ;
2016-06-28 17:18:03 +02:00
2016-03-29 10:58:33 +02:00
ret . put ( nf , map ) ;
2016-06-28 17:18:03 +02:00
2016-03-29 10:58:33 +02:00
filters . remove ( nf ) ;
}
return ret ;
2016-02-09 18:40:59 +01:00
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
protected String getQualifiedProperty ( String property ) {
2016-06-28 17:18:03 +02:00
//DEVELOPING
return ( property ) ;
//return String.format("%s.%s", bucketName, property);
}
//Use for proprerty into a specify bucket
protected String getSpecializedProperty ( Class < ? extends AggregatedRecord < ? , ? > > clz , String property ) {
return String . format ( " %s.%s " , connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) , property ) ;
2016-03-25 19:18:17 +01:00
}
2016-02-09 18:40:59 +01:00
@Override
2016-03-25 19:18:17 +01:00
public SortedSet < NumberedFilter > getNextPossibleValues (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
2016-06-28 17:18:03 +02:00
TemporalConstraint temporalConstraint , List < Filter > filters ,
String key , String orderingProperty ) throws Exception {
2016-03-25 19:18:17 +01:00
String currentScope = ScopeProvider . instance . get ( ) ;
String recordType = clz . newInstance ( ) . getRecordType ( ) ;
if ( orderingProperty = = null ) {
orderingProperty = AccountingPersistenceQuery .
getDefaultOrderingProperties ( clz ) ;
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
Collection < Expression > selectExpressions = new ArrayList < > ( ) ;
2016-06-28 17:18:03 +02:00
//add select expression
selectExpressions . add ( x ( " SUM( " + getSpecializedProperty ( clz , orderingProperty ) + " ) " ) .
2016-03-25 19:18:17 +01:00
as ( orderingProperty ) ) ;
2016-06-28 17:18:03 +02:00
selectExpressions . add ( x ( getSpecializedProperty ( clz , key ) ) . as ( key ) ) ;
//add where expression
2016-03-25 19:18:17 +01:00
Expression whereExpression =
2016-06-28 17:18:03 +02:00
x ( getSpecializedProperty ( clz , BasicUsageRecord . SCOPE ) ) .
2016-03-25 19:18:17 +01:00
eq ( s ( currentScope ) ) ;
2016-06-28 17:18:03 +02:00
long startTime = temporalConstraint . getAlignedStartTime ( ) . getTimeInMillis ( ) ;
2016-03-25 19:18:17 +01:00
whereExpression = whereExpression . and (
2016-06-28 17:18:03 +02:00
x ( getSpecializedProperty ( clz , AggregatedRecord . START_TIME ) ) . gt ( startTime )
) ;
2016-03-25 19:18:17 +01:00
long endTime = temporalConstraint . getAlignedEndTime ( ) . getTimeInMillis ( ) ;
whereExpression = whereExpression . and (
2016-06-28 17:18:03 +02:00
x ( getSpecializedProperty ( clz , AggregatedRecord . END_TIME ) ) . lt ( endTime )
) ;
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
Set < String > recordKeysSet = AccountingPersistenceQuery
. getQuerableKeys ( clz . newInstance ( ) ) ;
2016-06-28 17:18:03 +02:00
//list filter used for remove duplicate filter
2016-03-25 19:18:17 +01:00
Collection < String > keys = new TreeSet < > ( ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
if ( filters ! = null & & filters . size ( ) ! = 0 ) {
for ( Filter filter : filters ) {
String filterKey = filter . getKey ( ) ;
String filterValue = filter . getValue ( ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
if ( filterKey ! = null & & filterKey . compareTo ( " " ) ! = 0
& & recordKeysSet . contains ( filterKey ) ) {
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
if ( filterValue ! = null & & filterValue . compareTo ( " " ) ! = 0 ) {
if ( keys . contains ( filterKey ) ) {
throw new DuplicatedKeyFilterException (
" Only one value per Filter key is allowed " ) ;
}
2016-06-28 17:18:03 +02:00
whereExpression =
whereExpression . and (
x ( getSpecializedProperty ( clz , filterKey ) ) . eq ( s ( filterValue ) ) ) ;
2016-03-25 19:18:17 +01:00
keys . add ( filterKey ) ;
} else {
throw new KeyException (
2016-06-28 17:18:03 +02:00
String . format ( " Invalid %s : %s " ,
Filter . class . getSimpleName ( ) , filter . toString ( ) ) ) ;
2016-03-25 19:18:17 +01:00
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
} else {
throw new ValueException ( String . format ( " Invalid %s : %s " ,
Filter . class . getSimpleName ( ) , filter . toString ( ) ) ) ;
}
}
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
Expression [ ] selectExpressionArray =
new Expression [ selectExpressions . size ( ) ] ;
selectExpressions . toArray ( selectExpressionArray ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
Sort sort = Sort . desc ( orderingProperty ) ;
2016-06-28 17:18:03 +02:00
OffsetPath path = select ( selectExpressionArray ) . from ( connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) )
2016-03-30 09:43:06 +02:00
. where ( whereExpression ) . groupBy ( key ) . orderBy ( sort ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
logger . debug ( path . toString ( ) ) ;
2016-03-25 19:18:17 +01:00
Comparator < NumberedFilter > comparator = new Comparator < NumberedFilter > ( ) {
@Override
public int compare ( NumberedFilter o1 , NumberedFilter o2 ) {
return - o1 . compareTo ( o2 ) ;
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
} ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
SortedSet < NumberedFilter > ret = new TreeSet < > ( comparator ) ;
2016-06-28 17:18:03 +02:00
N1qlQueryResult result = connectionMap . get ( clz . getSimpleName ( ) ) . query ( path ) ;
2016-03-25 19:18:17 +01:00
if ( ! result . finalSuccess ( ) ) {
logger . debug ( " {} failed : {} " ,
N1qlQueryResult . class . getSimpleName ( ) , result . errors ( ) ) ;
throw new Exception ( " Query Failed : \ n " + result . errors ( ) ) ;
}
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
List < N1qlQueryRow > rows = result . allRows ( ) ;
for ( N1qlQueryRow row : rows ) {
try {
JsonObject jsonObject = row . value ( ) ;
2016-06-28 17:18:03 +02:00
//logger.trace("JsonObject : {}", row.value()+" key"+key);
2016-03-25 19:18:17 +01:00
2016-06-28 17:18:03 +02:00
//verify for a not null value
String value = jsonObject . getString ( key ) ;
2016-03-25 19:18:17 +01:00
Number n = jsonObject . getDouble ( orderingProperty ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
NumberedFilter numberedFilter =
2016-03-30 09:43:06 +02:00
new NumberedFilter ( key , value , n , orderingProperty ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
ret . add ( numberedFilter ) ;
2016-06-28 17:18:03 +02:00
2016-03-25 19:18:17 +01:00
} catch ( Exception e ) {
logger . warn ( " Unable to eleborate result for {} " , row . toString ( ) ) ;
2016-06-28 17:18:03 +02:00
//logger.warn("Error:"+e.getLocalizedMessage());
2016-03-25 19:18:17 +01:00
}
2016-06-28 17:18:03 +02:00
//logger.trace("\n\n");
}
return ret ;
}
@SuppressWarnings ( " deprecation " )
@Override
public SortedSet < NumberedFilter > getFilterValues (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters ,
String key ) throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
JsonArray startKey = JsonArray . create ( ) ;
startKey . add ( currentScope ) ;
int scopeDateGroupLevel = 2 ;
int groupLevel = scopeDateGroupLevel ;
String designDocId = getDesignDocId ( clz ) + " Value " ;
String viewName = key ;
logger . trace ( " designDocId:{} view:{} " , designDocId , key ) ;
logger . trace ( " startKey:{} " , startKey ) ;
logger . trace ( " groupLevel " + groupLevel ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
query . inclusiveEnd ( ) ;
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . descending ( false ) ;
String orderingProperty = AccountingPersistenceQuery .
getDefaultOrderingProperties ( clz ) ;
ViewResult viewResult ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( " error executing the query " , e ) ;
throw e ;
}
Comparator < NumberedFilter > comparator = new Comparator < NumberedFilter > ( ) {
@Override
public int compare ( NumberedFilter o1 , NumberedFilter o2 ) {
if ( o1 . getValue ( ) = = null )
o1 . setValue ( " " ) ;
if ( o2 . getValue ( ) = = null )
o2 . setValue ( " " ) ;
return o1 . getValue ( ) . compareTo ( o2 . getValue ( ) ) ;
}
} ;
SortedSet < NumberedFilter > ret = new TreeSet < > ( comparator ) ;
for ( ViewRow row : viewResult ) {
String value = ( String ) row . value ( ) ;
NumberedFilter numberedFilter =
new NumberedFilter ( key , value , 0 , orderingProperty ) ;
ret . add ( numberedFilter ) ;
2016-03-25 19:18:17 +01:00
}
2016-06-28 17:18:03 +02:00
logger . trace ( " returning {} values " , ret . size ( ) ) ;
2016-03-25 19:18:17 +01:00
return ret ;
2016-06-28 17:18:03 +02:00
2016-01-11 11:29:26 +01:00
}
2016-06-28 17:18:03 +02:00
@SuppressWarnings ( " deprecation " )
@Override
public JSONObject getUsageValue (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , Filter applicant ) throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
JsonArray startKey = JsonArray . create ( ) ;
startKey . add ( currentScope ) ;
JsonArray endKey = JsonArray . create ( ) ;
endKey . add ( currentScope ) ;
AggregationMode aggregationMode = temporalConstraint
. getAggregationMode ( ) ;
JsonArray temporalStartKey = getRangeKey (
temporalConstraint . getStartTime ( ) ,
aggregationMode , false , false ) ;
JsonArray temporalEndKey = getRangeKey (
temporalConstraint . getEndTime ( ) ,
aggregationMode , false , false ) ;
startKey . add ( applicant . getValue ( ) ) ;
for ( Object temporal : temporalStartKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) )
startKey . add ( temporal ) ;
}
endKey . add ( applicant . getValue ( ) ) ;
int count = 1 ;
for ( Object temporal : temporalEndKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) ) {
//couchbase exclude last value
if ( count = = temporalEndKey . size ( ) )
temporal = ( int ) temporal + 1 ;
endKey . add ( temporal ) ;
}
count + + ;
}
// +1 because mode start from 0
// +1 because have afilter value from 1
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode . ordinal ( ) + 1 + 1 + 1 ;
int groupLevel = scopeDateGroupLevel ;
String designDocId = getDesignDocId ( clz ) ;
String viewName = applicant . getKey ( ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
query . inclusiveEnd ( ) ;
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . endKey ( endKey ) ;
query . descending ( false ) ;
logger . trace ( " Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ " Group Level : {}, Start Key : {}, End Key : {}, "
+ " temporalStartKey :{}, temporalEndKey :{} " ,
clz . getSimpleName ( ) , designDocId , viewName , groupLevel , startKey , endKey , temporalStartKey . toString ( ) , temporalEndKey . toString ( ) ) ;
ViewResult viewResult ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( e . getLocalizedMessage ( ) ) ;
throw e ;
}
Map < String , Float > map = new HashMap < String , Float > ( ) ;
for ( ViewRow row : viewResult ) {
JsonObject jsnobject = ( JsonObject ) row . value ( ) ;
JSONObject objJson = new JSONObject ( jsnobject . toString ( ) ) ;
Iterator < ? > iterateJosn = objJson . keys ( ) ;
while ( iterateJosn . hasNext ( ) ) {
String key = ( String ) iterateJosn . next ( ) ;
Float valuetmp = Float . parseFloat ( objJson . get ( key ) . toString ( ) ) ;
if ( key . equals ( " operationCount " ) | | key . equals ( " dataVolume " ) ) {
if ( map . containsKey ( key ) ) {
map . put ( key , valuetmp + map . get ( key ) ) ;
}
else
map . put ( key , valuetmp ) ;
}
}
}
JSONObject result = new JSONObject ( map ) ;
return result ;
}
2016-01-11 11:29:26 +01:00
}