@ -11,8 +11,12 @@ import java.security.KeyException;
import java.util.ArrayList ;
import java.util.Calendar ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.Comparator ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.SortedMap ;
import java.util.SortedSet ;
@ -35,6 +39,8 @@ import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration ;
import org.gcube.common.scope.api.ScopeProvider ;
import org.gcube.documentstore.records.AggregatedRecord ;
import org.gcube.documentstore.records.Record ;
import org.gcube.documentstore.records.RecordUtility ;
import org.json.JSONException ;
import org.json.JSONObject ;
import org.slf4j.Logger ;
@ -51,7 +57,8 @@ import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow ;
import com.couchbase.client.java.query.dsl.Expression ;
import com.couchbase.client.java.query.dsl.Sort ;
import com.couchbase.client.java.query.dsl.path.LimitPath ;
import com.couchbase.client.java.query.dsl.path.GroupByPath ;
import com.couchbase.client.java.query.dsl.path.OffsetPath ;
import com.couchbase.client.java.view.ViewQuery ;
import com.couchbase.client.java.view.ViewResult ;
import com.couchbase.client.java.view.ViewRow ;
@ -66,19 +73,47 @@ public class AccountingPersistenceQueryCouchBase implements
private static final Logger logger = LoggerFactory
. getLogger ( AccountingPersistenceQueryCouchBase . class ) ;
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration . URL_PROPERTY_KEY ;
// public static final String USERNAME_PROPERTY_KEY =
// AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY;
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration . PASSWORD_PROPERTY_KEY ;
public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName" ;
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY = "AggregatedStorageUsageRecord" ;
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY = "AggregatedServiceUsageRecord" ;
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY = "AggregatedPortletUsageRecord" ;
public static final String BUCKET_JOB_NAME_PROPERTY_KEY = "AggregatedJobUsageRecord" ;
public static final String BUCKET_TASK_NAME_PROPERTY_KEY = "AggregatedTaskUsageRecord" ;
public static final long ENV_TIME_OUT = 180000 ;
/* The environment configuration */
protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment
. builder ( ) . queryEnabled ( true ) . build ( ) ;
. builder ( ) . maxRequestLifetime ( ENV_TIME_OUT ) . queryTimeout ( ENV_TIME_OUT ) . queryEnabled ( true ) . build ( ) ;
protected Cluster cluster ;
protected Bucket bucket ;
protected String bucketName ;
/* One Bucket for type*/
protected Bucket bucketStorage ;
protected String bucketNameStorage ;
protected Bucket bucketService ;
protected String bucketNameService ;
protected Bucket bucketPortlet ;
protected String bucketNamePortlet ;
protected Bucket bucketJob ;
protected String bucketNameJob ;
protected Bucket bucketTask ;
protected String bucketNameTask ;
private Map < String , Bucket > connectionMap ;
/ * *
* { @inheritDoc }
@ -87,13 +122,50 @@ public class AccountingPersistenceQueryCouchBase implements
public void prepareConnection (
AccountingPersistenceBackendQueryConfiguration configuration )
throws Exception {
String url = configuration . getProperty ( URL_PROPERTY_KEY ) ;
// String username = configuration.getProperty(USERNAME_PROPERTY_KEY);
String password = configuration . getProperty ( PASSWORD_PROPERTY_KEY ) ;
cluster = CouchbaseCluster . create ( ENV , url ) ;
bucketName = configuration . getProperty ( BUCKET_NAME_PROPERTY_KEY ) ;
bucket = cluster . openBucket ( bucketName , password ) ;
logger . trace ( "env" + ENV . toString ( ) ) ;
bucketNameStorage = configuration . getProperty ( BUCKET_STORAGE_NAME_PROPERTY_KEY ) ;
bucketNameService = configuration . getProperty ( BUCKET_SERVICE_NAME_PROPERTY_KEY ) ;
bucketNameJob = configuration . getProperty ( BUCKET_JOB_NAME_PROPERTY_KEY ) ;
bucketNamePortlet = configuration . getProperty ( BUCKET_PORTLET_NAME_PROPERTY_KEY ) ;
bucketNameTask = configuration . getProperty ( BUCKET_TASK_NAME_PROPERTY_KEY ) ;
connectionMap = new HashMap < String , Bucket > ( ) ;
bucketStorage = cluster . openBucket ( bucketNameStorage , password ) ;
connectionMap . put ( BUCKET_STORAGE_NAME_PROPERTY_KEY , bucketStorage ) ;
bucketService = cluster . openBucket ( bucketNameService , password ) ;
connectionMap . put ( BUCKET_SERVICE_NAME_PROPERTY_KEY , bucketService ) ;
bucketJob = cluster . openBucket ( bucketNameJob , password ) ;
connectionMap . put ( BUCKET_JOB_NAME_PROPERTY_KEY , bucketJob ) ;
bucketPortlet = cluster . openBucket ( bucketNamePortlet , password ) ;
connectionMap . put ( BUCKET_PORTLET_NAME_PROPERTY_KEY , bucketPortlet ) ;
bucketTask = cluster . openBucket ( bucketNameTask , password ) ;
connectionMap . put ( BUCKET_TASK_NAME_PROPERTY_KEY , bucketTask ) ;
logger . trace ( "Open cluster Service Bucket Url:" + url + " Pwd:" + configuration . getProperty ( PASSWORD_PROPERTY_KEY ) +
" BucketName " + configuration . getProperty ( BUCKET_SERVICE_NAME_PROPERTY_KEY ) ) ;
}
/ * *
@ -125,80 +197,100 @@ public class AccountingPersistenceQueryCouchBase implements
return calendar ;
}
// private Map<Calendar, Info> selectQuery(
// Class<? extends AggregatedRecord<?, ?>> clz,
// TemporalConstraint temporalConstraint, List<Filter> filters)
// throws Exception {
// // String currentScope = BasicUsageRecord.getScopeFromToken();
// String currentScope = ScopeProvider.instance.get();
// String recordType = clz.newInstance().getRecordType();
//
// Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
// expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(
// s(recordType)));
//
// long startTime = temporalConstraint.getAlignedStartTime()
// .getTimeInMillis();
// expression = expression.and(x(AggregatedRecord.START_TIME)
// .gt(startTime).or(
// x(AggregatedRecord.CREATION_TIME).gt(startTime)));
//
// long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
// expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime))
// .or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
//
// AggregationMode aggregationMode = temporalConstraint
// .getAggregationMode();
// // TODO Aggregate Results
//
// if (filters != null) {
// for (Filter filter : filters) {
// expression = expression.and(x(filter.getKey()).eq(
// s(filter.getValue())));
// }
// }
//
// GroupByPath groupByPath = select("*").from(bucketName)
// .where(expression);
//
// Map<Calendar, Info> map = new HashMap<Calendar, Info>();
//
// N1qlQueryResult result = bucket.query(groupByPath);
// if (!result.finalSuccess()) {
// logger.debug("{} failed : {}",
// N1qlQueryResult.class.getSimpleName(), result.errors());
// return map;
// }
//
// List<N1qlQueryRow> rows = result.allRows();
//
// for (N1qlQueryRow row : rows) {
// try {
// logger.trace("Row : {}", row.toString());
// JsonObject jsonObject = row.value().getObject(bucketName);
// logger.trace("JsonObject : {}", row.toString());
// String recordString = jsonObject.toMap().toString();
// logger.trace("Record String : {}", recordString);
// Record record = RecordUtility.getRecord(recordString);
//
// JSONObject obj = new JSONObject(jsonObject.toString());
// Calendar calendar = getCalendar(obj, aggregationMode);
// if (map.containsKey(calendar)) {
// Info info = map.get(calendar);
// JSONObject value = info.getValue();
// jsonObject.toMap();
// } else {
// map.put(calendar, new Info(calendar, obj));
// }
// } catch (Exception e) {
// logger.warn("Unable to eleborate result for {}", row.toString());
// }
//
// logger.trace("\n\n\n");
// }
//
// return map;
// }
protected Map < Calendar , Info > selectQuery (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
String recordType = clz . newInstance ( ) . getRecordType ( ) ;
Expression expression = x ( BasicUsageRecord . SCOPE ) . eq ( s ( currentScope ) ) ;
expression = expression . and ( x ( BasicUsageRecord . RECORD_TYPE ) . eq (
s ( recordType ) ) ) ;
long startTime = temporalConstraint . getAlignedStartTime ( )
. getTimeInMillis ( ) ;
expression = expression . and ( x ( AggregatedRecord . START_TIME )
. gt ( startTime ) . or (
x ( AggregatedRecord . CREATION_TIME ) . gt ( startTime ) ) ) ;
long endTime = temporalConstraint . getAlignedEndTime ( ) . getTimeInMillis ( ) ;
expression = expression . and ( x ( AggregatedRecord . END_TIME ) . lt ( endTime ) )
. or ( x ( AggregatedRecord . CREATION_TIME ) . lt ( endTime ) ) ;
AggregationMode aggregationMode = temporalConstraint
. getAggregationMode ( ) ;
// TODO Aggregate Results
if ( filters ! = null ) {
for ( Filter filter : filters ) {
expression = expression . and ( x ( filter . getKey ( ) ) . eq (
s ( filter . getValue ( ) ) ) ) ;
}
}
GroupByPath groupByPath = select ( "*" ) . from ( connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) )
. where ( expression ) ;
Map < Calendar , Info > map = new HashMap < Calendar , Info > ( ) ;
// using the DSL
/ * e . g .
select sum ( dataVolume ) as Volume ,
sum ( operationCount ) as Operation ,
consumerId
from accounting_storage_test
group by order by Volume desc
* /
// N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(select("sum(dataVolume) as Volume ,sum(operationCount) as Operation , consumerId").from("accounting_storage").groupBy("consumerId").limit(5));
//logger.info("result"+result.toString());
N1qlQueryResult result = connectionMap . get ( clz . getSimpleName ( ) ) . query ( groupByPath ) ;
if ( ! result . finalSuccess ( ) ) {
logger . debug ( "{} failed : {}" ,
N1qlQueryResult . class . getSimpleName ( ) , result . errors ( ) ) ;
return map ;
}
List < N1qlQueryRow > rows = result . allRows ( ) ;
for ( N1qlQueryRow row : rows ) {
try {
logger . trace ( "Row : {}" , row . toString ( ) ) ;
JsonObject jsonObject = row . value ( ) . getObject ( clz . getSimpleName ( ) ) ;
logger . trace ( "JsonObject : {}" , row . toString ( ) ) ;
String recordString = jsonObject . toMap ( ) . toString ( ) ;
logger . trace ( "Record String : {}" , recordString ) ;
Record record = RecordUtility . getRecord ( recordString ) ;
JSONObject obj = new JSONObject ( jsonObject . toString ( ) ) ;
Calendar calendar = getCalendar ( obj , aggregationMode ) ;
if ( map . containsKey ( calendar ) ) {
Info info = map . get ( calendar ) ;
JSONObject value = info . getValue ( ) ;
jsonObject . toMap ( ) ;
} else {
map . put ( calendar , new Info ( calendar , obj ) ) ;
}
} catch ( Exception e ) {
logger . warn ( "Unable to eleborate result for {}" , row . toString ( ) ) ;
}
logger . trace ( "\n\n\n" ) ;
}
return map ;
}
protected Calendar getCalendarFromArray ( JsonArray array )
throws JSONException {
@ -271,7 +363,8 @@ public class AccountingPersistenceQueryCouchBase implements
return array ;
}
protected static final String MAP_REDUCE__DESIGN = "_design/" ;
protected static final String MAP_REDUCE__DESIGN = "" ;
protected static final String MAP_REDUCE_ALL = "all" ;
/ * *
@ -289,6 +382,8 @@ public class AccountingPersistenceQueryCouchBase implements
public static String getMapReduceFunctionName (
Collection < String > collection ) {
String reduceFunction = MAP_REDUCE_ALL ;
if ( ! collection . isEmpty ( ) ) {
reduceFunction = null ;
for ( String property : collection ) {
if ( reduceFunction = = null ) {
reduceFunction = property ;
@ -296,6 +391,7 @@ public class AccountingPersistenceQueryCouchBase implements
reduceFunction = reduceFunction + KEYS_SEPARATOR + property ;
}
}
}
return reduceFunction ;
}
@ -304,7 +400,6 @@ public class AccountingPersistenceQueryCouchBase implements
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
// String currentScope = BasicUsageRecord.getScopeFromToken();
String currentScope = ScopeProvider . instance . get ( ) ;
JsonArray startKey = JsonArray . create ( ) ;
@ -321,7 +416,7 @@ public class AccountingPersistenceQueryCouchBase implements
JsonArray temporalEndKey = getRangeKey (
temporalConstraint . getEndTime ( ) ,
aggregationMode , tru e, false ) ;
aggregationMode , fals e, false ) ;
Set < String > recordKeysSet = AccountingPersistenceQuery
@ -329,10 +424,23 @@ public class AccountingPersistenceQueryCouchBase implements
Collection < String > keys = new TreeSet < > ( ) ;
JsonArray filterStartKey = JsonArray . create ( ) ;
JsonArray filterEndKey = JsonArray . create ( ) ;
//JsonArray filterStartKey = JsonArray.create();
//JsonArray filterEndKey = JsonArray.create();
if ( filters ! = null & & filters . size ( ) ! = 0 ) {
// Sorting filter for call a mapreduce
Collections . sort ( filters , new Comparator < Filter > ( ) {
@Override
public int compare ( Filter filter1 , Filter filter2 )
{
return filter1 . getKey ( ) . compareTo ( filter2 . getKey ( ) ) ;
}
} ) ;
logger . trace ( "filter" + filters . toString ( ) ) ;
for ( Filter filter : filters ) {
String filterKey = filter . getKey ( ) ;
@ -347,9 +455,10 @@ public class AccountingPersistenceQueryCouchBase implements
"Only one value per Filter key is allowed" ) ;
}
filterStartKey . add ( filterValue ) ;
filterEndKey . add ( filterValue ) ;
startKey . add ( filterValue ) ;
endKey . add ( filterValue ) ;
//filterStartKey.add(filterValue);
//filterEndKey.add(filterValue);
keys . add ( filterKey ) ;
} else {
throw new KeyException (
@ -375,34 +484,57 @@ public class AccountingPersistenceQueryCouchBase implements
String designDocId = getDesignDocId ( clz ) ;
for ( Object temporal : temporalStartKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) )
startKey . add ( temporal ) ;
}
int count = 1 ;
for ( Object temporal : temporalEndKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) ) {
//couchbase exclude last value
if ( count = = temporalEndKey . size ( ) )
temporal = ( int ) temporal + 1 ;
endKey . add ( temporal ) ;
}
count + + ;
}
startKey . add ( filterStartKey . toList ( ) ) ;
startKey . add ( temporalStartKey . toList ( ) ) ;
endKey . add ( filterEndKey . toList ( ) ) ;
endKey . add ( temporalEndKey . toList ( ) ) ;
logger . trace ( "startKey:{}" + startKey ) ;
//startKey.add(temporalStartKey.toList());
//endKey.add(temporalEndKey.toList());
String viewName = getMapReduceFunctionName ( keys ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
query . group ( true ) ;
query . groupLevel ( groupLevel ) ;
query . inclusiveEnd ( ) ;
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . endKey ( endKey ) ;
query . descending ( false ) ;
logger . trace ( "Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {}" ,
designDocId , viewName , groupLevel , startKey , endKey ) ;
logger . trace ( "Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}" ,
clz . getSimpleName ( ) , designDocId , viewName , groupLevel , startKey , endKey , temporalStartKey . toString ( ) , temporalEndKey . toString ( ) ) ;
SortedMap < Calendar , Info > infos = new TreeMap < > ( ) ;
ViewResult viewResult ;
try {
viewResult = bucket . query ( query ) ;
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( e . getLocalizedMessage ( ) ) ;
throw e ;
}
@ -412,12 +544,49 @@ public class AccountingPersistenceQueryCouchBase implements
JsonObject value = ( JsonObject ) row . value ( ) ;
JSONObject obj = new JSONObject ( value . toString ( ) ) ;
Info info = new Info ( calendar , obj ) ;
infos . put ( calendar , info ) ;
}
if ( infos . isEmpty ( ) ) {
//exec a map reduce for found name key
query = ViewQuery . from ( designDocId , viewName ) ;
query . groupLevel ( groupLevel ) ;
query . descending ( false ) ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( e . getLocalizedMessage ( ) ) ;
throw e ;
}
ViewRow row = viewResult . allRows ( ) . get ( 0 ) ;
JsonArray array = getRangeKey (
temporalConstraint . getStartTime ( ) ,
aggregationMode , false , false ) ;
Calendar calendar = getCalendarFromArray ( array ) ;
JsonObject value = ( JsonObject ) row . value ( ) ;
JSONObject objJson = new JSONObject ( value . toString ( ) ) ;
JSONObject objJsontemplate = new JSONObject ( ) ;
Iterator < ? > iterateJson = objJson . keys ( ) ;
while ( iterateJson . hasNext ( ) ) {
String key = ( String ) iterateJson . next ( ) ;
objJsontemplate . put ( key , 0 ) ;
}
//generate an example object for json
Info info = new Info ( calendar , objJsontemplate ) ;
infos . put ( calendar , info ) ;
//break;
}
return infos ;
}
@ -426,7 +595,10 @@ public class AccountingPersistenceQueryCouchBase implements
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters )
throws Exception {
return mapReduceQuery ( clz , temporalConstraint , filters ) ;
SortedMap < Calendar , Info > map = mapReduceQuery ( clz , temporalConstraint , filters ) ;
return map ;
}
@Override
@ -435,15 +607,25 @@ public class AccountingPersistenceQueryCouchBase implements
TemporalConstraint temporalConstraint , List < Filter > filters ,
String topKey , String orderingProperty ) throws Exception {
Comparator < NumberedFilter > comparator = new Comparator < NumberedFilter > ( ) {
@Override
public int compare ( NumberedFilter o1 , NumberedFilter o2 ) {
return - o1 . compareTo ( o2 ) ;
}
} ;
SortedMap < NumberedFilter , SortedMap < Calendar , Info > > ret =
new TreeMap < > ( ) ;
new TreeMap < > ( comparator ) ;
SortedSet < NumberedFilter > top = getNextPossibleValues ( clz ,
temporalConstraint , filters , topKey , orderingProperty ) ;
for ( NumberedFilter nf : top ) {
filters . add ( nf ) ;
SortedMap < Calendar , Info > map =
mapReduceQuery ( clz , temporalConstraint , filters ) ;
@ -451,60 +633,31 @@ public class AccountingPersistenceQueryCouchBase implements
filters . remove ( nf ) ;
}
return ret ;
}
protected String getQualifiedProperty ( String property ) {
return String . format ( "%s.%s" , bucketName , property ) ;
//DEVELOPING
return ( property ) ;
//return String.format("%s.%s", bucketName, property);
}
//Use for proprerty into a specify bucket
protected String getSpecializedProperty ( Class < ? extends AggregatedRecord < ? , ? > > clz , String property ) {
return String . format ( "%s.%s" , connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) , property ) ;
}
@SuppressWarnings ( "deprecation" )
@Override
public SortedSet < NumberedFilter > getNextPossibleValues (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters ,
String key , String orderingProperty ) throws Exception {
/ *
SELECT
SUM ( accounting . dataVolume ) as dataVolume ,
// SUM(accounting.operationCount) as operationCount,
// Filter List e.g.
accounting . consumerId as consumerId ,
accounting . serviceClass as serviceClass ,
// topKey e.g
accounting . serviceName as serviceName
FROM accounting
WHERE
accounting . scope = "/gcube/devsec" AND
( accounting . recordType = "StorageUsageRecord" OR
accounting . usageRecordType = "StorageUsageRecord" ) AND
AND
( accounting . startTime > = temporalConstraint . startTime AND
accounting . endTime > = temporalConstraint . endTime )
// Filter List e.g
AND
accounting . consumerId = "luca.frosini" AND
accounting . serviceClass = "VREManagement"
// topKey
GROUP BY accounting . serviceName
ORDER BY dataVolume DESC
// ORDER BY operationsCount DESC
LIMIT 10
* /
// String currentScope = BasicUsageRecord.getScopeFromToken();
String currentScope = ScopeProvider . instance . get ( ) ;
String recordType = clz . newInstance ( ) . getRecordType ( ) ;
@ -515,38 +668,40 @@ public class AccountingPersistenceQueryCouchBase implements
Collection < Expression > selectExpressions = new ArrayList < > ( ) ;
selectExpressions . add ( x ( "SUM(" + getQualifiedProperty ( orderingProperty ) + ")" ) .
//add select expression
selectExpressions . add ( x ( "SUM(" + getSpecializedProperty ( clz , orderingProperty ) + ")" ) .
as ( orderingProperty ) ) ;
selectExpressions . add ( x ( getSpecializedProperty ( clz , key ) ) . as ( key ) ) ;
selectExpressions . add ( x ( getQualifiedProperty ( key ) ) . as ( key ) ) ;
//add where expression
Expression whereExpression =
x ( get QualifiedProperty( BasicUsageRecord . SCOPE ) ) .
x ( get SpecializedProperty( clz , BasicUsageRecord . SCOPE ) ) .
eq ( s ( currentScope ) ) ;
whereExpression = whereExpression . and (
x ( getQualifiedProperty ( BasicUsageRecord . RECORD_TYPE ) ) .
eq ( s ( recordType ) ) . or (
x ( getQualifiedProperty ( BasicUsageRecord . USAGE_RECORD_TYPE ) ) .
eq ( s ( recordType ) ) )
) ;
long startTime = temporalConstraint . getAlignedStartTime ( )
. getTimeInMillis ( ) ;
long startTime = temporalConstraint . getAlignedStartTime ( ) . getTimeInMillis ( ) ;
whereExpression = whereExpression . and (
x ( getQualifiedProperty ( AggregatedRecord . START_TIME ) ) . gt ( startTime ) . or (
x ( getQualifiedProperty ( AggregatedRecord . CREATION_TIME ) ) . gt ( startTime ) ) ) ;
x ( getSpecializedProperty ( clz , AggregatedRecord . START_TIME ) ) . gt ( startTime )
) ;
long endTime = temporalConstraint . getAlignedEndTime ( ) . getTimeInMillis ( ) ;
whereExpression = whereExpression . and (
x ( getQualifiedProperty ( AggregatedRecord . END_TIME ) ) . lt ( endTime ) ) . or (
x ( getQualifiedProperty ( AggregatedRecord . CREATION_TIME ) ) . lt ( endTime ) ) ;
x ( getSpecializedProperty ( clz , AggregatedRecord . END_TIME ) ) . lt ( endTime )
) ;
Set < String > recordKeysSet = AccountingPersistenceQuery
. getQuerableKeys ( clz . newInstance ( ) ) ;
//list filter used for remove duplicate filter
Collection < String > keys = new TreeSet < > ( ) ;
if ( filters ! = null & & filters . size ( ) ! = 0 ) {
for ( Filter filter : filters ) {
@ -561,9 +716,9 @@ public class AccountingPersistenceQueryCouchBase implements
throw new DuplicatedKeyFilterException (
"Only one value per Filter key is allowed" ) ;
}
whereExpression =
whereExpression . and (
x ( getQualifiedProperty ( filterKey ) ) . eq ( filterValue) ) ;
x ( getSpecializedProperty ( clz , filterKey ) ) . eq ( s( filterValue) ) ) ;
keys . add ( filterKey ) ;
} else {
@ -586,10 +741,11 @@ public class AccountingPersistenceQueryCouchBase implements
selectExpressions . toArray ( selectExpressionArray ) ;
Sort sort = Sort . desc ( orderingProperty ) ;
LimitPath path = select ( selectExpressionArray ) . from ( bucketName )
OffsetPath path = select ( selectExpressionArray ) . from ( connectionMap . get ( clz . getSimpleName ( ) ) . name ( ) )
. where ( whereExpression ) . groupBy ( key ) . orderBy ( sort ) ;
logger . debug ( path . toString ( ) ) ;
@ -604,7 +760,10 @@ public class AccountingPersistenceQueryCouchBase implements
SortedSet < NumberedFilter > ret = new TreeSet < > ( comparator ) ;
N1qlQueryResult result = bucket . query ( path ) ;
N1qlQueryResult result = connectionMap . get ( clz . getSimpleName ( ) ) . query ( path ) ;
if ( ! result . finalSuccess ( ) ) {
logger . debug ( "{} failed : {}" ,
N1qlQueryResult . class . getSimpleName ( ) , result . errors ( ) ) ;
@ -615,12 +774,11 @@ public class AccountingPersistenceQueryCouchBase implements
for ( N1qlQueryRow row : rows ) {
try {
logger . trace ( "Row : {}" , row . toString ( ) ) ;
JsonObject jsonObject = row . value ( ) ;
logger . trace ( "JsonObject : {}" , row . toString ( ) ) ;
//logger.trace("JsonObject : {}", row.value()+" key"+key);
//verify for a not null value
String value = jsonObject . getString ( key ) ;
Number n = jsonObject . getDouble ( orderingProperty ) ;
NumberedFilter numberedFilter =
@ -630,12 +788,194 @@ public class AccountingPersistenceQueryCouchBase implements
} catch ( Exception e ) {
logger . warn ( "Unable to eleborate result for {}" , row . toString ( ) ) ;
//logger.warn("Error:"+e.getLocalizedMessage());
}
logger . trace ( "\n\n\n" ) ;
//logger.trace("\n\n");
}
return ret ;
}
@SuppressWarnings ( "deprecation" )
@Override
public SortedSet < NumberedFilter > getFilterValues (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , List < Filter > filters ,
String key ) throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
JsonArray startKey = JsonArray . create ( ) ;
startKey . add ( currentScope ) ;
int scopeDateGroupLevel = 2 ;
int groupLevel = scopeDateGroupLevel ;
String designDocId = getDesignDocId ( clz ) + "Value" ;
String viewName = key ;
logger . trace ( "designDocId:{} view:{} " , designDocId , key ) ;
logger . trace ( "startKey:{}" , startKey ) ;
logger . trace ( "groupLevel" + groupLevel ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
query . inclusiveEnd ( ) ;
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . descending ( false ) ;
String orderingProperty = AccountingPersistenceQuery .
getDefaultOrderingProperties ( clz ) ;
ViewResult viewResult ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( "error executing the query" , e ) ;
throw e ;
}
Comparator < NumberedFilter > comparator = new Comparator < NumberedFilter > ( ) {
@Override
public int compare ( NumberedFilter o1 , NumberedFilter o2 ) {
if ( o1 . getValue ( ) = = null )
o1 . setValue ( "" ) ;
if ( o2 . getValue ( ) = = null )
o2 . setValue ( "" ) ;
return o1 . getValue ( ) . compareTo ( o2 . getValue ( ) ) ;
}
} ;
SortedSet < NumberedFilter > ret = new TreeSet < > ( comparator ) ;
for ( ViewRow row : viewResult ) {
String value = ( String ) row . value ( ) ;
NumberedFilter numberedFilter =
new NumberedFilter ( key , value , 0 , orderingProperty ) ;
ret . add ( numberedFilter ) ;
}
logger . trace ( "returning {} values" , ret . size ( ) ) ;
return ret ;
}
@SuppressWarnings ( "deprecation" )
@Override
public JSONObject getUsageValue (
Class < ? extends AggregatedRecord < ? , ? > > clz ,
TemporalConstraint temporalConstraint , Filter applicant ) throws Exception {
String currentScope = ScopeProvider . instance . get ( ) ;
JsonArray startKey = JsonArray . create ( ) ;
startKey . add ( currentScope ) ;
JsonArray endKey = JsonArray . create ( ) ;
endKey . add ( currentScope ) ;
AggregationMode aggregationMode = temporalConstraint
. getAggregationMode ( ) ;
JsonArray temporalStartKey = getRangeKey (
temporalConstraint . getStartTime ( ) ,
aggregationMode , false , false ) ;
JsonArray temporalEndKey = getRangeKey (
temporalConstraint . getEndTime ( ) ,
aggregationMode , false , false ) ;
startKey . add ( applicant . getValue ( ) ) ;
for ( Object temporal : temporalStartKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) )
startKey . add ( temporal ) ;
}
endKey . add ( applicant . getValue ( ) ) ;
int count = 1 ;
for ( Object temporal : temporalEndKey . toList ( ) ) {
if ( ! temporal . toString ( ) . isEmpty ( ) ) {
//couchbase exclude last value
if ( count = = temporalEndKey . size ( ) )
temporal = ( int ) temporal + 1 ;
endKey . add ( temporal ) ;
}
count + + ;
}
// +1 because mode start from 0
// +1 because have afilter value from 1
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode . ordinal ( ) + 1 + 1 + 1 ;
int groupLevel = scopeDateGroupLevel ;
String designDocId = getDesignDocId ( clz ) ;
String viewName = applicant . getKey ( ) ;
ViewQuery query = ViewQuery . from ( designDocId , viewName ) ;
query . inclusiveEnd ( ) ;
query . groupLevel ( groupLevel ) ;
query . startKey ( startKey ) ;
query . endKey ( endKey ) ;
query . descending ( false ) ;
logger . trace ( "Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}" ,
clz . getSimpleName ( ) , designDocId , viewName , groupLevel , startKey , endKey , temporalStartKey . toString ( ) , temporalEndKey . toString ( ) ) ;
ViewResult viewResult ;
try {
//execute query in a specify bucket
viewResult = connectionMap . get ( clz . getSimpleName ( ) ) . query ( query ) ;
} catch ( Exception e ) {
logger . error ( e . getLocalizedMessage ( ) ) ;
throw e ;
}
Map < String , Float > map = new HashMap < String , Float > ( ) ;
for ( ViewRow row : viewResult ) {
JsonObject jsnobject = ( JsonObject ) row . value ( ) ;
JSONObject objJson = new JSONObject ( jsnobject . toString ( ) ) ;
Iterator < ? > iterateJosn = objJson . keys ( ) ;
while ( iterateJosn . hasNext ( ) ) {
String key = ( String ) iterateJosn . next ( ) ;
Float valuetmp = Float . parseFloat ( objJson . get ( key ) . toString ( ) ) ;
if ( key . equals ( "operationCount" ) | | key . equals ( "dataVolume" ) ) {
if ( map . containsKey ( key ) ) {
map . put ( key , valuetmp + map . get ( key ) ) ;
}
else
map . put ( key , valuetmp ) ;
}
}
}
JSONObject result = new JSONObject ( map ) ;
return result ;
}
}