You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.

1678 lines
53 KiB
Java

/**
*
*/
package org.gcube.accounting.analytics.persistence.couchbase;
import static com.couchbase.client.java.query.Select.select;
import static com.couchbase.client.java.query.dsl.Expression.s;
import static com.couchbase.client.java.query.dsl.Expression.x;
import java.security.KeyException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum;
import org.gcube.accounting.analytics.UsageServiceValue;
import org.gcube.accounting.analytics.UsageStorageValue;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.ValueException;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.bucket.BucketManager;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.query.dsl.Expression;
import com.couchbase.client.java.query.dsl.Sort;
import com.couchbase.client.java.query.dsl.path.GroupByPath;
import com.couchbase.client.java.query.dsl.path.OffsetPath;
import com.couchbase.client.java.view.DesignDocument;
import com.couchbase.client.java.view.ViewQuery;
import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
/**
* @author Luca Frosini (ISTI - CNR)
* @author Alessandro Pieve (ISTI - CNR) alessandro.pieve@isti.cnr.it
*
*/
public class AccountingPersistenceQueryCouchBase implements
AccountingPersistenceBackendQuery {
private static final Logger logger = LoggerFactory
.getLogger(AccountingPersistenceQueryCouchBase.class);
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY;
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY;
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
public static final String BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord";
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
public static final String DESIGN_DOC_ID_LIST_USAGE="ListUsage";
public static final long ENV_TIME_OUT=240000;
public static final long QUERY_TIME_OUT=360000;
/* The environment configuration */
protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment
.builder()
.connectTimeout(QUERY_TIME_OUT)
.maxRequestLifetime(QUERY_TIME_OUT)
.queryTimeout(QUERY_TIME_OUT).build();
protected Cluster cluster;
/* One Bucket for type*/
protected Bucket bucketStorage;
protected String bucketNameStorage;
protected Bucket bucketStorageStatus;
protected String bucketNameStorageStatus;
protected Bucket bucketService;
protected String bucketNameService;
protected Bucket bucketPortlet;
protected String bucketNamePortlet;
protected Bucket bucketJob;
protected String bucketNameJob;
protected Bucket bucketTask;
protected String bucketNameTask;
private Map <String, Bucket> connectionMap;
protected static final String MAP_REDUCE__DESIGN = "";
protected static final String MAP_REDUCE_ALL = "all";
// Used in the name of map reduce to separate keys used as filter
protected static final String KEYS_SEPARATOR = "__";
protected static final String DESIGN_DOC_ID="top_";
/**
* {@inheritDoc}
*/
@Override
public void prepareConnection(
AccountingPersistenceBackendQueryConfiguration configuration)
throws Exception {
String url = configuration.getProperty(URL_PROPERTY_KEY);
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
cluster = CouchbaseCluster.create(ENV, url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
bucketNameStorageStatus = configuration.getProperty(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY);
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY);
connectionMap = new HashMap<String, Bucket>();
bucketStorage = cluster.openBucket(bucketNameStorage, password);
connectionMap.put(BUCKET_STORAGE_NAME_PROPERTY_KEY, bucketStorage);
bucketStorageStatus = cluster.openBucket( bucketNameStorageStatus,password);
connectionMap.put(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY, bucketStorageStatus);
logger.debug("connectionMap"+connectionMap.toString());
bucketService = cluster.openBucket(bucketNameService, password);
connectionMap.put(BUCKET_SERVICE_NAME_PROPERTY_KEY, bucketService);
bucketJob= cluster.openBucket(bucketNameJob, password);
connectionMap.put(BUCKET_JOB_NAME_PROPERTY_KEY, bucketJob);
bucketPortlet= cluster.openBucket(bucketNamePortlet, password);
connectionMap.put(BUCKET_PORTLET_NAME_PROPERTY_KEY, bucketPortlet);
bucketTask= cluster.openBucket(bucketNameTask, password);
connectionMap.put(BUCKET_TASK_NAME_PROPERTY_KEY, bucketTask);
logger.trace("Open cluster Service Bucket Url:"+url+" BucketName:"+configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY));
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws Exception {
cluster.disconnect();
}
protected Calendar getCalendar(JSONObject obj,
AggregationMode aggregationMode) throws NumberFormatException,
JSONException {
long millis;
if (obj.has(AggregatedRecord.START_TIME)) {
millis = new Long(obj.getString(AggregatedRecord.START_TIME));
logger.trace(
"The result {} was from an aggregated record. Using {}",
obj.toString(), AggregatedRecord.START_TIME);
} else {
millis = new Long(obj.getString(UsageRecord.CREATION_TIME));
logger.trace("The result {} was from single record. Using {}",
obj.toString(), UsageRecord.CREATION_TIME);
}
Calendar calendar = TemporalConstraint.getAlignedCalendar(millis,
aggregationMode);
logger.trace("{} has been aligned to {}", millis,
calendar.getTimeInMillis());
return calendar;
}
/**
* IS NOT USED
* @param clz
* @param temporalConstraint
* @param filters
* @return
* @throws Exception
*/
@Deprecated
protected Map<Calendar, Info> selectQuery(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters)
throws Exception {
String currentScope = ScopeProvider.instance.get();
String recordType = clz.newInstance().getRecordType();
Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(
s(recordType)));
long startTime = temporalConstraint.getAlignedStartTime()
.getTimeInMillis();
expression = expression.and(x(AggregatedRecord.START_TIME)
.gt(startTime).or(
x(AggregatedRecord.CREATION_TIME).gt(startTime)));
long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime))
.or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
// TODO Aggregate Results
if (filters != null) {
for (Filter filter : filters) {
expression = expression.and(x(filter.getKey()).eq(
s(filter.getValue())));
}
}
GroupByPath groupByPath = select("*").from(connectionMap.get(clz.getSimpleName()).name())
.where(expression);
Map<Calendar, Info> map = new HashMap<Calendar, Info>();
N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(groupByPath);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}",
N1qlQueryResult.class.getSimpleName(), result.errors());
return map;
}
List<N1qlQueryRow> rows = result.allRows();
for (N1qlQueryRow row : rows) {
try {
logger.trace("Row : {}", row.toString());
JsonObject jsonObject = row.value().getObject(clz.getSimpleName());
logger.trace("JsonObject : {}", row.toString());
String recordString = jsonObject.toMap().toString();
logger.trace("Record String : {}", recordString);
Record record = RecordUtility.getRecord(recordString);
JSONObject obj = new JSONObject(jsonObject.toString());
Calendar calendar = getCalendar(obj, aggregationMode);
if (map.containsKey(calendar)) {
Info info = map.get(calendar);
JSONObject value = info.getValue();
jsonObject.toMap();
} else {
map.put(calendar, new Info(calendar, obj));
}
} catch (Exception e) {
logger.warn("Unable to eleborate result for {}", row.toString());
}
logger.trace("\n\n\n");
}
return map;
}
protected Calendar getCalendarFromArray(JsonArray array)
throws JSONException {
boolean startFound = false;
Calendar calendar = Calendar
.getInstance(TemporalConstraint.DEFAULT_TIME_ZONE);
int count = 0;
CalendarEnum[] calendarValues = CalendarEnum.values();
for (int i = 0; i < array.size(); i++) {
try {
int value = array.getInt(i);
int calendarValue = calendarValues[count].getCalendarValue();
if (calendarValue == Calendar.MONTH) {
value--;
}
calendar.set(calendarValue, value);
count++;
startFound = true;
} catch (Exception e) {
//logger.trace("The provide value is not an int. {}", array.get(i).toString());
if (startFound) {
break;
}
}
}
for (int j = count; j < calendarValues.length; j++) {
if (calendarValues[j].getCalendarValue() == Calendar.DAY_OF_MONTH) {
calendar.set(calendarValues[j].getCalendarValue(), 1);
} else {
calendar.set(calendarValues[j].getCalendarValue(), 0);
}
}
return calendar;
}
protected JsonArray getRangeKey(long time, AggregationMode aggregationMode,
boolean wildCard, boolean endKey) throws JSONException {
JsonArray array = JsonArray.create();
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
CalendarEnum[] values = CalendarEnum.values();
if (endKey) {
calendar.add(values[aggregationMode.ordinal()].getCalendarValue(),1);
}
for (int i = 0; i <= aggregationMode.ordinal(); i++) {
int value = calendar.get(values[i].getCalendarValue());
if (values[i].getCalendarValue() == Calendar.MONTH) {
value = value + 1;
}
array.add(value);
}
if (wildCard) {
array.add("{}");
}
return array;
}
//OLD METHOD OF DIVISION MAP_REDUCE DESIGN
@Deprecated
protected String getDesignDocId(
Class<? extends AggregatedRecord<?,?>> recordClass)
throws InstantiationException, IllegalAccessException {
return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
.newInstance().getRecordType());
}
/**
* New division of designDocId and map-reduce
* @param recordClass
* @param keys
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
protected String getDesignDocIdSpecific(
Class<? extends AggregatedRecord<?,?>> recordClass,
Collection<String> keys)
throws InstantiationException, IllegalAccessException {
String specific="all";
if (!keys.isEmpty()){
specific = keys.iterator().next();
}
String getDesigndocIdSpecific=specific;
//logger.trace("Use a designDocIDSpecific:{}",getDesigndocIdSpecific);
return getDesigndocIdSpecific;
}
/**
* generate a name of map-reduce view
* @param collection
* @return
*/
public static String getMapReduceFunctionName(
Collection<String> collection) {
String reduceFunction = MAP_REDUCE_ALL;
if (!collection.isEmpty()){
reduceFunction = null;
for (String property : collection) {
if (reduceFunction == null) {
reduceFunction = property;
} else {
reduceFunction = reduceFunction + KEYS_SEPARATOR + property;
}
}
}
return reduceFunction;
}
/**EXPERIMENTAL
* generate a name of design doc id for a top
* @param collection
* @return
*/
public static String getDesignDocIdName(
Collection<String> collection) {
String reduceFunction = MAP_REDUCE_ALL;
if (!collection.isEmpty()){
reduceFunction = null;
String property = collection.iterator().next();
reduceFunction = property;
}
return reduceFunction;
}
/*
* EXPERIMENTAL
*
* This method is used for call a map-reduce ListUsage
*the filter on which you want to know its use
*and the parameters you want to know have been used
*Example:
*Input:
* request interval (start date end-date)
* filter consumerId: alessandro.pieve
* Used parameters: Service Class
*Return:
* List of service class used by alessandro in the required period
*/
public SortedMap<String,Integer> getListUsage(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,String context,List<String> parameters)throws Exception{
//TODO
String currentScope=null;
if (context==null)
currentScope = ScopeProvider.instance.get();
else
currentScope = context;
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
JsonArray endKey = JsonArray.create();
endKey.add(currentScope);
AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(),aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
/*
int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
int groupLevel = scopeDateGroupLevel;
if (filters != null) {
groupLevel += keys.size();
}
*/
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
String viewName = getMapReduceFunctionName(keys);
if (parameters!=null){
for (String name:parameters){
viewName+=KEYS_SEPARATOR+name;
}
}
//TODO DA COMPLETARE con in piu' alle chiavi anche la lista parametri passata
ViewQuery query = ViewQuery.from(DESIGN_DOC_ID_LIST_USAGE, viewName);
query.inclusiveEnd();
//query.groupLevel(groupLevel);
query.reduce(false);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),DESIGN_DOC_ID_LIST_USAGE, viewName, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
SortedMap<String, Integer> infos = new TreeMap<>();
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
for (ViewRow row : viewResult) {
JsonArray array = (JsonArray) row.key();
Calendar calendar = getCalendarFromArray(array);
JsonObject value = (JsonObject) row.value();
JSONObject obj = new JSONObject(value.toString());
Info info = new Info(calendar, obj);
//infos.put(info);
}
//TODO complete the request from user/service usage into period
return null;
}
protected SortedMap<Calendar, Info> mapReduceQuery(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,String context,Boolean valueEmpty,Boolean noScope)
throws Exception {
String currentScope=null;
if (context==null)
currentScope = ScopeProvider.instance.get();
else
currentScope = context;
JsonArray startKey = JsonArray.create();
JsonArray endKey = JsonArray.create();
//no scope call a map reduce without scope in startkey and endkey
if (!noScope){
startKey.add(currentScope);
endKey.add(currentScope);
}
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
// +1 because mode start from 0
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
int groupLevel = scopeDateGroupLevel;
if (filters != null) {
groupLevel += keys.size();
}
//String designDocId = getDesignDocId(clz);
String designDocId = getDesignDocIdSpecific(clz,keys);
if (noScope){
designDocId="noContext";
groupLevel=groupLevel-1;
}
//logger.trace("designDocIdNew :{}",designDocId);
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
String viewName = getMapReduceFunctionName(keys);
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
SortedMap<Calendar, Info> infos = new TreeMap<>();
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
for (ViewRow row : viewResult) {
JsonArray array = (JsonArray) row.key();
Calendar calendar = getCalendarFromArray(array);
JsonObject value = (JsonObject) row.value();
JSONObject obj = new JSONObject(value.toString());
Info info = new Info(calendar, obj);
infos.put(calendar, info);
}
logger.trace("valueEmpty not permitted:{}",valueEmpty);
//infos not empity is permitted only for getTimeSeries and Top
if (valueEmpty){
if (infos.isEmpty()){
logger.trace("infos is empity");
query = ViewQuery.from(designDocId, viewName);
query.groupLevel(groupLevel);
query.descending(false);
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.warn("not execute query",e.getLocalizedMessage());
//throw e;
}
try {
if(viewResult.totalRows()!=0){
ViewRow row=viewResult.allRows().get(0);
JsonArray array = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
Calendar calendar = getCalendarFromArray(array);
JsonObject value = (JsonObject) row.value();
JSONObject objJson= new JSONObject(value.toString());
JSONObject objJsontemplate=new JSONObject();
Iterator<?> iterateJson = objJson.keys();
while( iterateJson.hasNext() ) {
String key = (String)iterateJson.next();
objJsontemplate.put(key, 0);
}
//generate an example object for json
Info info = new Info(calendar, objJsontemplate);
infos.put(calendar, info);
}
} catch (Exception e) {
logger.warn("error :{}",e.getLocalizedMessage());
}
}
}
logger.trace("infos:{}",infos.toString());
return infos;
}
@Override
public SortedMap<Calendar, Info> getTimeSeries(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters)
throws Exception {
SortedMap<Calendar, Info> map = mapReduceQuery(clz, temporalConstraint, filters,null,true,false);
return map;
}
@Override
public SortedMap<Calendar, Info> getNoContextTimeSeries(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters)
throws Exception {
SortedMap<Calendar, Info> map = mapReduceQuery(clz, temporalConstraint, filters,null,true,true);
return map;
}
/**
* Used for calculate a top value
* @param clz
* @param temporalConstraint
* @param filters
* @param topKey
* @param orderingProperty
* @return
* @throws Exception
*/
@Override
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,
String topKey, String orderingProperty) throws Exception {
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
int result= - o1.compareTo(o2);
if (result==0 ){
result= o1.compareTo((Filter) o2);
}
return result;
}
};
SortedMap<NumberedFilter, SortedMap<Calendar, Info>> ret =
new TreeMap<>(comparator);
SortedSet<NumberedFilter> top=null;
//top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
if (usingNextPossibleValuesWithMap(clz,topKey,filters)){
logger.trace("getNextPossibleValues using map");
top = getNextPossibleValuesWithMap(clz,temporalConstraint, filters, topKey, orderingProperty);
}else{
logger.trace("getNextPossibleValues using query");
top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
}
logger.trace("getNextPossibleValues:{}",top.toString());
for(NumberedFilter nf : top){
filters.add(nf);
SortedMap<Calendar, Info> map =
mapReduceQuery(clz, temporalConstraint, filters,null,true,false);
ret.put(nf, map);
filters.remove(nf);
}
return ret;
}
/**
* SPERIMENTAL
* Used for verify if have exist map for calculate a top
* @param clz
* @param topKey
* @return
*/
protected boolean usingNextPossibleValuesWithMap(Class<? extends AggregatedRecord<?, ?>> clz, String topKey,List<Filter> filters){
logger.debug("usingNextPossibleValuesWithMap init");
Collection<String> keys = new TreeSet<>();
Set<String> recordKeysSet;
try {
recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
keys.add(topKey);
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
}catch (Exception e1) {
logger.warn("usingNextPossibleValuesWithMap -exception with filter:{}",filters.toString() );
return false;
}
logger.debug("usingNextPossibleValuesWithMap complete key and name");
String viewName=getMapReduceFunctionName(keys);
String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
BucketManager bucketManager = connectionMap.get(clz.getSimpleName()).bucketManager();
//logger.debug("----"+bucketManager.getDesignDocument(designDocId));
if (bucketManager.getDesignDocument(designDocId)!=null){
logger.debug("usingNextPossibleValuesWithMap");
}
else{
logger.debug("usingNextPossibleValuesWithQuery");
return false;
}
/*
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(1);
ViewResult viewResult;
try {
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
logger.debug("usingNextPossibleValuesWithMap viewResult:{}",viewResult.toString());
} catch (Exception e) {
return false;
}
*/
return true;
}
/**
* Calculate a next possible value with map
* (faster but with greater demand for resources)
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @param orderingProperty
* @return
* @throws Exception
*/
public SortedSet<NumberedFilter> getNextPossibleValuesWithMap(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,
String key, String orderingProperty) throws Exception {
logger.debug("getNextPossibleValuesWithMap init");
String currentScope = ScopeProvider.instance.get();
if(orderingProperty==null){
orderingProperty = AccountingPersistenceQuery.
getDefaultOrderingProperties(clz);
}
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
JsonArray endKey = JsonArray.create();
endKey.add(currentScope);
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
keys.add(key);
/*BEGIN DELETE ONLY FOR TEST*/
Collection<Expression> selectExpressions = new ArrayList<>();
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
Expression whereExpression =
x(getSpecializedProperty(clz,BasicUsageRecord.SCOPE)).
eq(s(currentScope));
long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
);
long endTime = temporalConstraint.getEndTime();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.END_TIME)).lt(endTime)
);
Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
Sort sort = Sort.desc(orderingProperty);
OffsetPath path = select(selectExpressionArray).from(connectionMap.get(clz.getSimpleName()).name())
.where(whereExpression).groupBy(key).orderBy(sort);
/*END DELETE ONLY FOR TEST*/
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
logger.debug("Alternative Query for top:"+path.toString());
int groupLevel=1;
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
String viewName = getMapReduceFunctionName(keys);
String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
logger.trace("keys:{}",keys.toString());
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
int compareResult = -o1.compareTo(o2);
if(compareResult==0){
compareResult=1;
}
return compareResult;
}
};
SortedSet<NumberedFilter> ret = new TreeSet<>(comparator);
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
ViewRow row =viewResult.allRows().get(0);
JsonObject value = (JsonObject) row.value();
JSONObject objectValueTop = new JSONObject(value.toString());
Iterator<?> iterateJosn = objectValueTop.keys();
while( iterateJosn.hasNext() ) {
String keyTop = (String)iterateJosn.next();
Number n = (Number) objectValueTop.get(keyTop);
if (n==null)
n=0;
NumberedFilter numberedFilter = new NumberedFilter(key, keyTop, n, orderingProperty);
ret.add(numberedFilter);
}
return ret;
}
/**
* Calculate a next possible value with query
* (more slow but with fewer resources )
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @param orderingProperty
* @return
* @throws Exception
*/
@Override
public SortedSet<NumberedFilter> getNextPossibleValues(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,
String key, String orderingProperty) throws Exception {
String currentScope = ScopeProvider.instance.get();
//String recordType = clz.newInstance().getRecordType();
if(orderingProperty==null){
orderingProperty = AccountingPersistenceQuery.
getDefaultOrderingProperties(clz);
}
Collection<Expression> selectExpressions = new ArrayList<>();
//add select expression and check if exist
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
//add where expression
Expression whereExpression =
x(getSpecializedProperty(clz,BasicUsageRecord.SCOPE)).
eq(s(currentScope));
long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
);
long endTime = temporalConstraint.getEndTime();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.END_TIME)).lt(endTime)
);
Set<String> recordKeysSet = AccountingPersistenceQuery
.getQuerableKeys(clz.newInstance());
//list filter used for remove duplicate filter
Collection<String> keys = new TreeSet<>();
if (filters != null && filters.size() != 0) {
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
Sort sort = Sort.desc(orderingProperty);
OffsetPath path = select(selectExpressionArray).from(connectionMap.get(clz.getSimpleName()).name())
.where(whereExpression).groupBy(key).orderBy(sort);
logger.debug("Query for top:"+path.toString());
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
int compareResult = -o1.compareTo(o2);
if(compareResult==0){
compareResult=1;
}
return compareResult;
}
};
SortedSet<NumberedFilter> ret = new TreeSet<>(comparator);
N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(path);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}",
N1qlQueryResult.class.getSimpleName(), result.errors());
throw new Exception("Query Failed :\n" + result.errors());
}
List<N1qlQueryRow> rows = result.allRows();
for (N1qlQueryRow row : rows) {
try {
JsonObject jsonObject = row.value();
//logger.trace("JsonObject : {}", row.value()+" key"+key);
//logger.warn("pre"+jsonObject.toString()+" key :"+key);
//verify for a not null value
String value = jsonObject.getString(key);
Number n = jsonObject.getDouble(orderingProperty);
if (n==null)
n=0;
//logger.trace("pre:{}, key:{}, value:{}, n:{},orderingProperty:{}",jsonObject.toString(),key, value, n, orderingProperty);
NumberedFilter numberedFilter =
new NumberedFilter(key, value, n, orderingProperty);
ret.add(numberedFilter);
} catch (Exception e) {
logger.warn("Unable to eleborate result for {}", row.toString());
e.printStackTrace();
}
}
return ret;
}
/**
* Return a list of context time series
* (used for portlet accounting context)
* @param clz
* @param temporalConstraint
* @param filters
* @param contexts
* @return
* @throws Exception
*/
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,List<String> contexts)
throws Exception {
logger.trace("getContextTimeSeries for contexts:{}",contexts.toString());
SortedSet<Filter> listContexts = new TreeSet();
for (String context:contexts){
Filter contextLabel= new Filter("context",context);
listContexts.add(contextLabel);
}
SortedMap<Filter, SortedMap<Calendar, Info>> ret = new TreeMap<>();
for(Filter nf : listContexts){
logger.debug("detail time series :{}",nf.toString());
SortedMap<Calendar, Info> map =
mapReduceQuery(clz, temporalConstraint, filters,nf.getValue(),false,false);
if (!map.isEmpty()){
ret.put(nf, map);
}
filters.remove(nf);
}
return ret;
}
protected String getQualifiedProperty(String property){
//DEVELOPING
return (property);
}
//Use for property into a specify bucket
protected String getSpecializedProperty(Class<? extends AggregatedRecord<?, ?>> clz,String property){
return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
}
/**
* Used for list a possible values for each filter
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @return
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Override
public SortedSet<NumberedFilter> getFilterValues(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,
String key) throws Exception {
String currentScope = ScopeProvider.instance.get();
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
int scopeDateGroupLevel = 2;
int groupLevel = scopeDateGroupLevel;
//NO ADD A SPECIFIY DESIGN DOC ID FAMILY
String designDocId = getDesignDocId(clz)+"Value";
String viewName = key;
logger.trace("designDocId:{} view:{} startKey:{} groupLevel:{}",designDocId,key,startKey,groupLevel);
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
query.startKey(startKey);
query.descending(false);
String orderingProperty = AccountingPersistenceQuery.
getDefaultOrderingProperties(clz);
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error("error executing the query",e);
throw e;
}
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
if (o1.getValue()==null)
o1.setValue("");
if (o2.getValue()==null)
o2.setValue("");
return o1.getValue().compareTo(o2.getValue());
}
};
SortedSet<NumberedFilter> ret = new TreeSet<>(comparator);
for (ViewRow row : viewResult) {
String value =(String) row.value();
NumberedFilter numberedFilter =
new NumberedFilter(key, value, 0, orderingProperty);
ret.add(numberedFilter);
}
logger.trace("returning {} values",ret.size());
return ret;
}
/**
* SPERIMENTAL now is not used
* @param clz
* @param temporalConstraint
* @param applicant
* @return
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Override
public JSONObject getUsageValue(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, Filter applicant) throws Exception {
String currentScope = ScopeProvider.instance.get();
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
JsonArray endKey = JsonArray.create();
endKey.add(currentScope);
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
startKey.add(applicant.getValue());
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
endKey.add(applicant.getValue());
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
// +1 because mode start from 0
// +1 because have afilter value from 1
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1 +1;
int groupLevel = scopeDateGroupLevel;
Collection<String> keys = new TreeSet<>();
keys.add(applicant.getKey());
//ADD A SPECIFIY DESIGN DOC ID FAMILY
String designDocId = getDesignDocIdSpecific(clz,keys);
String viewName = applicant.getKey();
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
Map<String, Float> map = new HashMap<String, Float>();
for (ViewRow row : viewResult) {
JsonObject jsnobject = (JsonObject) row.value();
JSONObject objJson = new JSONObject(jsnobject.toString());
Iterator<?> iterateJosn = objJson.keys();
while( iterateJosn.hasNext() ) {
String key = (String)iterateJosn.next();
Float valuetmp=Float.parseFloat(objJson.get(key).toString());
if (key.equals("operationCount") || key.equals("dataVolume")){
if (map.containsKey(key)) {
map.put(key, valuetmp + map.get(key));
}
else
map.put(key, valuetmp);
}
}
}
JSONObject result= new JSONObject(map);
return result;
}
/**
* SPERIMENTAL
* Used for calculate a usage value for each element of list
* (QUOTA)
* @param listUsage
* @return
* @throws Exception
*/
@Override
public List<UsageValue> getUsageValueQuotaTotal(
List<UsageValue> listUsage)
throws Exception {
logger.debug("getUsageValueQuotaTotal init with list:{}",listUsage);
//String currentScope = ScopeProvider.instance.get();
String keyOrderingProperty = null;
for (UsageValue totalFilters:listUsage){
logger.debug("-----------------/ init for totalFilters");
/*
UsageValue totalfilter=
new UsageServiceValue(context,"lucio.lelii",AggregatedServiceUsageRecord.class,temporalConstraint,filters);
*/
String currentScope = totalFilters.getContext();
Collection<String> keys= new TreeSet<>();
keys.add("consumerId");
String designDocId = getDesignDocIdSpecific(totalFilters.getClz(), keys);
JsonArray temporalStartKey=null;
JsonArray temporalEndKey=null;
logger.trace("temporalConstraint:{}",totalFilters);
TemporalConstraint temporalConstraint=totalFilters.getTemporalConstraint();
if (temporalConstraint==(null)){
//used for no temporalConstraint
logger.trace("Not found temporalConstraint");
Calendar startTime = Calendar.getInstance();
startTime.set(1970, Calendar.JANUARY, 1);
Calendar endTime = Calendar.getInstance();
temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
//if type=storage then bucket =accounting_storage_status and designDocId = quotaTotal
//logger.trace("totalFilters.getClass().getSimpleName():{}",totalFilters.getClass().getSimpleName());
//logger.trace("totalFilters.getClz().getSimpleName():{}",totalFilters.getClz().getSimpleName());
if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
//TODO sarebbe possibile invece con il nome verificare che getClz e' un istanza di AggregatedStorageStatusRecord?
designDocId = "QuotaTotal";
}
}
else if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
logger.trace("AggregatedStorageStatusRecord with temporalConstraint");
designDocId = "Quota";
}
AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
Double totalQuota=0.00;
//int i = 0;
//int countFilters=0;
if (totalFilters instanceof UsageServiceValue){
UsageServiceValue totalFiltersService = (UsageServiceValue)totalFilters;
//countFilters =totalFiltersService.getFiltersValue().size();
}
//do {
String viewNameTmp=null;
JsonArray startKeyTmp=JsonArray.create();
startKeyTmp.add(currentScope);
JsonArray endKeyTmp=JsonArray.create();
endKeyTmp.add(currentScope);
int groupLevelTmp= 2;
//FiltersValue singleFilter=null;
viewNameTmp=AggregatedServiceUsageRecord.CONSUMER_ID;
startKeyTmp.add(totalFilters.getIdentifier());
endKeyTmp.add(totalFilters.getIdentifier());
//FiltersValue singleFilter=null;
if (totalFilters instanceof UsageServiceValue){
//logger.debug("******UsageServiceValue");
UsageServiceValue totalFiltersService = (UsageServiceValue)totalFilters;
//singleFilter=totalFiltersService.getFiltersValue().get(i);
for (Filter filter:totalFiltersService.getFilters()){
viewNameTmp=viewNameTmp+"__"+filter.getKey();
startKeyTmp.add(filter.getValue());
endKeyTmp.add(filter.getValue());
groupLevelTmp++;
}
}
//not defined temporal constraint
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKeyTmp.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase excludes last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKeyTmp.add(temporal);
}
count++;
}
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
totalFilters.getClz().getSimpleName(),designDocId, viewNameTmp, groupLevelTmp, startKeyTmp, endKeyTmp,temporalStartKey.toString(), temporalEndKey.toString());
logger.trace("connectionMap:{}",connectionMap.toString());
ViewQuery query = ViewQuery.from(designDocId, viewNameTmp);
query.inclusiveEnd();
query.groupLevel(groupLevelTmp);
query.startKey(startKeyTmp);
query.endKey(endKeyTmp);
query.descending(false);
ViewResult viewResult;
try {
viewResult = connectionMap.get(totalFilters.getClz().getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
Map<String, Float> map = new HashMap<String, Float>();
for (ViewRow row : viewResult) {
logger.trace("ViewRow row:{}",row.toString());
JsonObject jsnobject = (JsonObject) row.value();
JSONObject objJson = new JSONObject(jsnobject.toString());
Iterator<?> iterateJosn = objJson.keys();
while( iterateJosn.hasNext() ) {
String key = (String)iterateJosn.next();
Float valuetmp=Float.parseFloat(objJson.get(key).toString());
//logger.debug("totalFilters.getClass().getSimpleName():{} UsageStorageValue.class.getSimpleName():{}", totalFilters.getClass().getSimpleName(),UsageStorageValue.class.getSimpleName());
if (key.equals("operationCount") || key.equals("dataVolume")){
if (map.containsKey(key)) {
map.put(key, valuetmp + map.get(key));
// TODO verify a better method
if (totalFilters instanceof UsageStorageValue){
logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
keyOrderingProperty=key;
totalQuota+=totalFilters.getD()+valuetmp.doubleValue();
}
}
else{
logger.debug("?UsageRecord -designDocId:{}",designDocId);
keyOrderingProperty=key;
totalQuota+=totalFilters.getD()+valuetmp.doubleValue();
}
}
else{
map.put(key, valuetmp);
// TODO verify a better method
if (totalFilters instanceof UsageStorageValue){
logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
keyOrderingProperty=key;
totalQuota+=valuetmp.doubleValue();
}
}
else{
logger.debug("?UsageRecord -designDocId:{}",designDocId);
keyOrderingProperty=key;
totalQuota+=valuetmp.doubleValue();
}
}
}
}
}
//i++;
//} while (i <countFilters );
//convert usage from byte to Mb
if (totalFilters instanceof UsageStorageValue){
totalQuota=totalQuota/1024/1024;
totalQuota=Math.round(totalQuota * 100.0) / 100.0;
}
totalFilters.setOrderingProperty(keyOrderingProperty);
if (totalQuota.isNaN()){
totalQuota=0.0;
}
totalFilters.setD(totalQuota);
}
return listUsage;
}
}