/** * */ package org.gcube.accounting.analytics.persistence.couchbase; import static com.couchbase.client.java.query.Select.select; import static com.couchbase.client.java.query.dsl.Expression.s; import static com.couchbase.client.java.query.dsl.Expression.x; import java.security.KeyException; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; import org.gcube.accounting.analytics.NumberedFilter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum; import org.gcube.accounting.analytics.UsageServiceValue; import org.gcube.accounting.analytics.UsageStorageValue; import org.gcube.accounting.analytics.UsageValue; import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; import org.gcube.accounting.analytics.exception.ValueException; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery; import org.gcube.accounting.datamodel.BasicUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.bucket.BucketManager; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.json.JsonArray; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.query.N1qlQueryResult; import com.couchbase.client.java.query.N1qlQueryRow; import com.couchbase.client.java.query.dsl.Expression; import com.couchbase.client.java.query.dsl.Sort; import com.couchbase.client.java.query.dsl.path.OffsetPath; import com.couchbase.client.java.view.OnError; import com.couchbase.client.java.view.View; import com.couchbase.client.java.view.ViewQuery; import com.couchbase.client.java.view.ViewResult; import com.couchbase.client.java.view.ViewRow; /** * @author Luca Frosini (ISTI - CNR) * @author Alessandro Pieve (ISTI - CNR) alessandro.pieve@isti.cnr.it * */ public class AccountingPersistenceQueryCouchBase implements AccountingPersistenceBackendQuery { private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryCouchBase.class); public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY; /* The environment configuration */ protected static final CouchbaseEnvironment ENV; public static final long MAX_REQUEST_LIFE_TIME = TimeUnit.MINUTES.toMillis(2); public static final long KEEP_ALIVE_INTERVAL = TimeUnit.HOURS.toMillis(1); public static final long AUTO_RELEASE_AFTER = TimeUnit.HOURS.toMillis(1); public static final long VIEW_TIMEOUT_BUCKET = TimeUnit.MINUTES.toMillis(2); public static final long CONNECTION_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(15); public static final long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15); protected AccountingPersistenceBackendQueryConfiguration configuration; protected Cluster cluster; protected Map connectionMap; public static final String DESIGN_DOC_ID_LIST_USAGE = "ListUsage"; protected static final String MAP_REDUCE__DESIGN = ""; protected static final String MAP_REDUCE_ALL = "all"; // Used in the name of map reduce to separate keys used as filter protected static final String KEYS_SEPARATOR = "__"; protected static final String DESIGN_DOC_ID = "top_"; static { ENV = DefaultCouchbaseEnvironment.builder().connectTimeout(CONNECTION_TIMEOUT) .maxRequestLifetime(MAX_REQUEST_LIFE_TIME).queryTimeout(CONNECTION_TIMEOUT) .viewTimeout(VIEW_TIMEOUT_BUCKET).keepAliveInterval(KEEP_ALIVE_INTERVAL).kvTimeout(5000) .autoreleaseAfter(AUTO_RELEASE_AFTER).build(); // One Record per package is enough RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); } @Override public boolean isConnectionActive() throws Exception { return !connectionMap.values().iterator().next().isClosed(); } @Override public void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { String url = configuration.getProperty(URL_PROPERTY_KEY); cluster = CouchbaseCluster.create(ENV, url); this.configuration = configuration; this.connectionMap = new HashMap<>(); } protected Bucket getBucket(Class clz) throws Exception { UsageRecord instance = clz.newInstance(); String recordType = instance.getRecordType(); return getBucket(recordType); } protected static final String AGGREGATED_PREFIX = "Aggregated"; protected Bucket getBucket(String recordType) throws Exception { if(recordType.startsWith(AGGREGATED_PREFIX)) { recordType = recordType.replace(AGGREGATED_PREFIX, ""); } Bucket bucket = connectionMap.get(recordType); if(bucket == null) { logger.debug("Trying to get the Bucket for {}", recordType); String bucketName = configuration.getProperty(recordType); logger.debug("Bucket for {} is {}. Going to open it.", recordType, bucketName); bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY)); this.connectionMap.put(recordType, bucket); } return bucket; } /** * {@inheritDoc} */ @Override public void close() throws Exception { cluster.disconnect(); } protected Calendar getCalendar(JSONObject obj, AggregationMode aggregationMode) throws NumberFormatException, JSONException { long millis; if(obj.has(AggregatedRecord.START_TIME)) { millis = new Long(obj.getString(AggregatedRecord.START_TIME)); logger.trace("The result {} was from an aggregated record. Using {}", obj.toString(), AggregatedRecord.START_TIME); } else { millis = new Long(obj.getString(UsageRecord.CREATION_TIME)); logger.trace("The result {} was from single record. Using {}", obj.toString(), UsageRecord.CREATION_TIME); } Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, aggregationMode); logger.trace("{} has been aligned to {}", millis, calendar.getTimeInMillis()); return calendar; } /* * * IS NOT USED * * @param clz * @param temporalConstraint * @param filters * @return * @throws Exception * / @Deprecated protected Map selectQuery(Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { String currentScope = ScopeProvider.instance.get(); String recordType = clz.newInstance().getRecordType(); Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope)); expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(s(recordType))); long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis(); expression = expression .and(x(AggregatedRecord.START_TIME).gt(startTime).or(x(AggregatedRecord.CREATION_TIME).gt(startTime))); long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime)) .or(x(AggregatedRecord.CREATION_TIME).lt(endTime)); AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); // TODO Aggregate Results if (filters != null) { for (Filter filter : filters) { expression = expression.and(x(filter.getKey()).eq(s(filter.getValue()))); } } @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); GroupByPath groupByPath = select("*").from(bucket.name()).where(expression); Map map = new HashMap(); N1qlQueryResult result = bucket.query(groupByPath); if (!result.finalSuccess()) { logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); return map; } List rows = result.allRows(); for (N1qlQueryRow row : rows) { try { logger.trace("Row : {}", row.toString()); JsonObject jsonObject = row.value().getObject(clz.getSimpleName()); logger.trace("JsonObject : {}", row.toString()); String recordString = jsonObject.toMap().toString(); logger.trace("Record String : {}", recordString); Record record = RecordUtility.getRecord(recordString); JSONObject obj = new JSONObject(jsonObject.toString()); Calendar calendar = getCalendar(obj, aggregationMode); if (map.containsKey(calendar)) { Info info = map.get(calendar); JSONObject value = info.getValue(); jsonObject.toMap(); } else { map.put(calendar, new Info(calendar, obj)); } } catch (Exception e) { logger.warn("Unable to eleborate result for {}", row.toString()); } logger.trace("\n\n\n"); } return map; } */ protected Calendar getCalendarFromArray(JsonArray array) throws JSONException { boolean startFound = false; Calendar calendar = Calendar.getInstance(TemporalConstraint.DEFAULT_TIME_ZONE); int count = 0; CalendarEnum[] calendarValues = CalendarEnum.values(); for(int i = 0; i < array.size(); i++) { try { int value = array.getInt(i); int calendarValue = calendarValues[count].getCalendarValue(); if(calendarValue == Calendar.MONTH) { value--; } calendar.set(calendarValue, value); count++; startFound = true; } catch(Exception e) { // logger.trace("The provide value is not an int. {}", // array.get(i).toString()); if(startFound) { break; } } } for(int j = count; j < calendarValues.length; j++) { if(calendarValues[j].getCalendarValue() == Calendar.DAY_OF_MONTH) { calendar.set(calendarValues[j].getCalendarValue(), 1); } else { calendar.set(calendarValues[j].getCalendarValue(), 0); } } return calendar; } protected JsonArray getRangeKey(long time, AggregationMode aggregationMode, boolean wildCard, boolean endKey) throws JSONException { JsonArray array = JsonArray.create(); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); CalendarEnum[] values = CalendarEnum.values(); if(endKey) { calendar.add(values[aggregationMode.ordinal()].getCalendarValue(), 1); } for(int i = 0; i <= aggregationMode.ordinal(); i++) { int value = calendar.get(values[i].getCalendarValue()); if(values[i].getCalendarValue() == Calendar.MONTH) { value = value + 1; } array.add(value); } if(wildCard) { array.add("{}"); } return array; } // OLD METHOD OF DIVISION MAP_REDUCE DESIGN @Deprecated protected String getDesignDocId(Class> recordClass) throws InstantiationException, IllegalAccessException { return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass.newInstance().getRecordType()); } protected String getDesignDocIdSpecific(Class> recordClass, Collection keys) throws InstantiationException, IllegalAccessException { String specific = "all"; if(!keys.isEmpty()) { specific = keys.iterator().next(); } String getDesigndocIdSpecific = specific; // logger.trace("Use a designDocIDSpecific:{}",getDesigndocIdSpecific); return getDesigndocIdSpecific; } public static String getMapReduceFunctionName(Collection collection) { String reduceFunction = MAP_REDUCE_ALL; if(!collection.isEmpty()) { reduceFunction = null; for(String property : collection) { if(reduceFunction == null) { reduceFunction = property; } else { reduceFunction = reduceFunction + KEYS_SEPARATOR + property; } } } return reduceFunction; } public static String getMapReduceFunctionNameTopMap(String top, Collection collection) { logger.debug("top:{}", top); logger.debug("collection:{}", collection.toString()); String reduceFunction = MAP_REDUCE_ALL; if(!collection.isEmpty()) { reduceFunction = top; for(String property : collection) { if(!property.equals(top)) { if(reduceFunction == null) { reduceFunction = property; } else { reduceFunction = reduceFunction + KEYS_SEPARATOR + property; } } } } return reduceFunction; } /** * EXPERIMENTAL DEPRECATED generate a name of design doc id for a top * * @param collection * @return String */ public static String getDesignDocIdName(Collection collection) { String reduceFunction = MAP_REDUCE_ALL; if(!collection.isEmpty()) { reduceFunction = null; String property = collection.iterator().next(); reduceFunction = property; } return reduceFunction; } /* * EXPERIMENTAL * * This method is used for call a map-reduce ListUsage the filter on which * you want to know its use and the parameters you want to know have been * used Example: Input: request interval (start date end-date) filter * consumerId: alessandro.pieve Used parameters: Service Class Return: List * of service class used by alessandro in the required period */ /* * public SortedMap getListUsage(Class> clz, TemporalConstraint temporalConstraint, * List filters,String context,List parameters)throws * Exception{ //TODO String currentScope=null; if (context==null) * currentScope = ScopeProvider.instance.get(); else currentScope = context; * JsonArray startKey = JsonArray.create(); startKey.add(currentScope); * JsonArray endKey = JsonArray.create(); endKey.add(currentScope); * * AggregationMode aggregationMode = * temporalConstraint.getAggregationMode(); JsonArray temporalStartKey = * getRangeKey(temporalConstraint.getStartTime(),aggregationMode, false, * false); JsonArray temporalEndKey = * getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, * false); * * Set recordKeysSet = * AccountingPersistenceQuery.getQuerableKeys(clz.newInstance()); * * Collection keys = new TreeSet<>(); * * if (filters != null && filters.size() != 0) { // Sorting filter for call * a mapreduce Collections.sort(filters, new Comparator() { * * @Override public int compare(Filter filter1, Filter filter2) { int result * =filter1.getKey().compareTo(filter2.getKey()); return result; } }); for * (Filter filter : filters) { String filterKey = filter.getKey(); String * filterValue = filter.getValue(); if (filterKey != null && * filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if * (filterValue != null && filterValue.compareTo("") != 0) { if * (keys.contains(filterKey)) { throw new DuplicatedKeyFilterException( * "Only one value per Filter key is allowed"); } startKey.add(filterValue); * endKey.add(filterValue); keys.add(filterKey); } else { throw new * KeyException( * String.format("Invalid %s : %s",Filter.class.getSimpleName(), * filter.toString())); } * * } else { throw new * ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName * (), filter.toString())); } } } * * for (Object temporal: temporalStartKey.toList()){ if * (!temporal.toString().isEmpty()) startKey.add(temporal); } int count =1; * for (Object temporal: temporalEndKey.toList()){ if * (!temporal.toString().isEmpty()){ //couchbase exclude last value if * (count==temporalEndKey.size()) temporal=(int)temporal+1; * endKey.add(temporal); } count++; } String viewName = * getMapReduceFunctionName(keys); if (parameters!=null){ for (String * name:parameters){ viewName+=KEYS_SEPARATOR+name; } } //TODO DA COMPLETARE * con in piu' alle chiavi anche la lista parametri passata * * ViewQuery query = ViewQuery.from(DESIGN_DOC_ID_LIST_USAGE, viewName); * query.inclusiveEnd(); //query.groupLevel(groupLevel); * query.reduce(false); query.startKey(startKey); query.endKey(endKey); * query.descending(false); * * logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, " + * "Start Key : {}, End Key : {}," + * "temporalStartKey :{}, temporalEndKey :{}", * clz.getSimpleName(),DESIGN_DOC_ID_LIST_USAGE, viewName, startKey, * endKey,temporalStartKey.toString(), temporalEndKey.toString()); * SortedMap infos = new TreeMap<>(); ViewResult * viewResult; try { //execute query in a specify bucket viewResult = * connectionMap.get(clz.getSimpleName()).query(query); * * } catch (Exception e) { logger.error(e.getLocalizedMessage()); throw e; } * * for (ViewRow row : viewResult) { * * JsonArray array = (JsonArray) row.key(); Calendar calendar = * getCalendarFromArray(array); JsonObject value = (JsonObject) row.value(); * JSONObject obj = new JSONObject(value.toString()); Info info = new * Info(calendar, obj); //infos.put(info); * * } //TODO complete the request from user/service usage into period return * null; * * * } */ protected SortedMap mapReduceQuery(Class> clz, TemporalConstraint temporalConstraint, List filters, String context, Boolean valueEmpty, Boolean noScope) throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); if(context != null) { currentScope = context; } JsonArray startKey = JsonArray.create(); JsonArray endKey = JsonArray.create(); // no scope call a map reduce without scope in startkey and endkey if(!noScope) { startKey.add(currentScope); endKey.add(currentScope); } AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false); Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz); Collection keys = new TreeSet<>(); if(filters != null && filters.size() != 0) { // Sorting filter for call a mapreduce Collections.sort(filters, new Comparator() { @Override public int compare(Filter filter1, Filter filter2) { int result = filter1.getKey().compareTo(filter2.getKey()); return result; } }); for(Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if(filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if(filterValue != null && filterValue.compareTo("") != 0) { if(keys.contains(filterKey)) { throw new DuplicatedKeyFilterException("Only one value per Filter key is allowed"); } startKey.add(filterValue); endKey.add(filterValue); keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } // +1 because mode start from 0 // +1 because of scope at the beginning int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1; int groupLevel = scopeDateGroupLevel; if(filters != null) { groupLevel += keys.size(); } // String designDocId = getDesignDocId(clz); String designDocId = getDesignDocIdSpecific(clz, keys); if(noScope) { designDocId = "noContext"; groupLevel = groupLevel - 1; } // logger.trace("designDocIdNew :{}",designDocId); for(Object temporal : temporalStartKey.toList()) { if(!temporal.toString().isEmpty()) startKey.add(temporal); } int count = 1; for(Object temporal : temporalEndKey.toList()) { if(!temporal.toString().isEmpty()) { // couchbase exclude last value if(count == temporalEndKey.size()) temporal = (int) temporal + 1; endKey.add(temporal); } count++; } String viewName = getMapReduceFunctionName(keys); ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.endKey(endKey); query.descending(false); logger.trace( "Bucket :{}, Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}," + "temporalStartKey :{}, temporalEndKey :{}", clz.getSimpleName(), designDocId, viewName, groupLevel, startKey, endKey, temporalStartKey.toString(), temporalEndKey.toString()); SortedMap infos = new TreeMap<>(); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error(e.getLocalizedMessage()); throw e; } for(ViewRow row : viewResult) { JsonArray array = (JsonArray) row.key(); Calendar calendar = getCalendarFromArray(array); JsonObject value = (JsonObject) row.value(); JSONObject obj = new JSONObject(value.toString()); Info info = new Info(calendar, obj); infos.put(calendar, info); } logger.trace("valueEmpty not permitted:{}", valueEmpty); // infos not empity is permitted only for getTimeSeries and Top if(valueEmpty) { if(infos.isEmpty()) { logger.trace("infos is empity"); query = ViewQuery.from(designDocId, viewName); query.groupLevel(groupLevel); query.descending(false); try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.warn("not execute query", e.getLocalizedMessage()); // throw e; } try { if(viewResult.totalRows() != 0) { ViewRow row = viewResult.allRows().get(0); JsonArray array = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); Calendar calendar = getCalendarFromArray(array); JsonObject value = (JsonObject) row.value(); JSONObject objJson = new JSONObject(value.toString()); JSONObject objJsontemplate = new JSONObject(); Iterator iterateJson = objJson.keys(); while(iterateJson.hasNext()) { String key = (String) iterateJson.next(); objJsontemplate.put(key, 0); } // generate an example object for json Info info = new Info(calendar, objJsontemplate); infos.put(calendar, info); } } catch(Exception e) { logger.warn("error :{}", e.getLocalizedMessage()); } } } logger.trace("infos:{}", infos.toString()); return infos; } @Override public SortedMap getTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { SortedMap map = mapReduceQuery(clz, temporalConstraint, filters, null, true, false); return map; } @Override public SortedMap getNoContextTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { SortedMap map = mapReduceQuery(clz, temporalConstraint, filters, null, true, true); return map; } @Override public SortedMap> getTopValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String topKey, String orderingProperty) throws Exception { Comparator comparator = new Comparator() { @Override public int compare(NumberedFilter o1, NumberedFilter o2) { int result = -o1.compareTo(o2); if(result == 0) { result = o1.compareTo((Filter) o2); } return result; } }; SortedMap> ret = new TreeMap<>(comparator); SortedSet top = null; // top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, // orderingProperty); if(usingNextPossibleValuesWithMap(clz, topKey, filters)) { logger.trace("getNextPossibleValues using map"); top = getNextPossibleValuesWithMap(clz, temporalConstraint, filters, topKey, orderingProperty); } else { logger.trace("getNextPossibleValues using query"); top = getNextPossibleValues(clz, temporalConstraint, filters, topKey, orderingProperty); } logger.trace("getNextPossibleValues:{}", top.toString()); for(NumberedFilter nf : top) { filters.add(nf); SortedMap map = mapReduceQuery(clz, temporalConstraint, filters, null, true, false); ret.put(nf, map); filters.remove(nf); } return ret; } /** * SPERIMENTAL Used for verify if have exist map for calculate a top * * @param clz * @param topKey * @return boolean * @throws Exception */ protected boolean usingNextPossibleValuesWithMap(Class> clz, String topKey, List filters) throws Exception { logger.debug("usingNextPossibleValuesWithMap init"); Collection keys = new TreeSet<>(); Set recordKeysSet; try { recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz); keys.add(topKey); if(filters != null && filters.size() != 0) { // Sorting filter for call a mapreduce Collections.sort(filters, new Comparator() { @Override public int compare(Filter filter1, Filter filter2) { int result = filter1.getKey().compareTo(filter2.getKey()); return result; } }); for(Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if(filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if(filterValue != null && filterValue.compareTo("") != 0) { if(keys.contains(filterKey)) { throw new DuplicatedKeyFilterException("Only one value per Filter key is allowed"); } keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } } catch(Exception e1) { logger.warn("usingNextPossibleValuesWithMap -exception with filter:{}", filters.toString()); return false; } logger.debug("usingNextPossibleValuesWithMap complete key and name"); String viewName = getMapReduceFunctionNameTopMap(topKey, keys); // String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys); String designDocId = DESIGN_DOC_ID + topKey; @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); BucketManager bucketManager = bucket.bucketManager(); // logger.debug("----"+bucketManager.getDesignDocument(designDocId)); if(bucketManager.getDesignDocument(designDocId) != null) { logger.debug("usingNextPossibleValuesWithMap designDocId exist:{}-and viewname:{}-", designDocId, viewName); for(View view : bucketManager.getDesignDocument(designDocId).views()) { logger.debug("found:{}- ", view.name()); if(view.name().equals(viewName)) { logger.debug("usingNextPossibleValuesWithMap viewname exist"); return true; } } } else { logger.debug("usingNextPossibleValuesWithQuery"); return false; } /* * ViewQuery query = ViewQuery.from(designDocId, viewName); * query.inclusiveEnd(); query.groupLevel(1); ViewResult viewResult; try * { viewResult = connectionMap.get(clz.getSimpleName()).query(query); * logger.debug("usingNextPossibleValuesWithMap viewResult:{}" * ,viewResult.toString()); } catch (Exception e) { return false; } */ return false; } /** * Calculate a next possible value with map (faster but with greater demand * for resources) * * @param clz * @param temporalConstraint * @param filters * @param key * @param orderingProperty * @return SortedSet * @throws Exception */ protected SortedSet getNextPossibleValuesWithMap(Class> clz, TemporalConstraint temporalConstraint, List filters, String key, String orderingProperty) throws Exception { logger.debug("getNextPossibleValuesWithMap init"); String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); if(orderingProperty == null) { orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz); } JsonArray startKey = JsonArray.create(); startKey.add(currentScope); JsonArray endKey = JsonArray.create(); endKey.add(currentScope); AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false); Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz); Collection keys = new TreeSet<>(); keys.add(key); /* BEGIN DELETE ONLY FOR TEST */ Collection selectExpressions = new ArrayList<>(); selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz, orderingProperty) + " IS NOT NULL THEN " + getSpecializedProperty(clz, orderingProperty) + " ELSE 1 END )").as(orderingProperty)); selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz, key) + " IS NOT NULL THEN " + getSpecializedProperty(clz, key) + " ELSE 'UNKNOWN' END )").as(key)); Expression whereExpression = x(getSpecializedProperty(clz, BasicUsageRecord.SCOPE)).eq(s(currentScope)); long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis(); whereExpression = whereExpression .and(x(getSpecializedProperty(clz, AggregatedRecord.START_TIME)).gt(startTime)); long endTime = temporalConstraint.getEndTime(); whereExpression = whereExpression.and(x(getSpecializedProperty(clz, AggregatedRecord.END_TIME)).lt(endTime)); Expression[] selectExpressionArray = new Expression[selectExpressions.size()]; selectExpressions.toArray(selectExpressionArray); Sort sort = Sort.desc(orderingProperty); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); OffsetPath path = select(selectExpressionArray).from(bucket.name()).where(whereExpression).groupBy(key) .orderBy(sort); /* END DELETE ONLY FOR TEST */ if(filters != null && filters.size() != 0) { // Sorting filter for call a mapreduce Collections.sort(filters, new Comparator() { @Override public int compare(Filter filter1, Filter filter2) { int result = filter1.getKey().compareTo(filter2.getKey()); return result; } }); for(Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if(filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if(filterValue != null && filterValue.compareTo("") != 0) { if(keys.contains(filterKey)) { throw new DuplicatedKeyFilterException("Only one value per Filter key is allowed"); } startKey.add(filterValue); endKey.add(filterValue); whereExpression = whereExpression .and(x(getSpecializedProperty(clz, filterKey)).eq(s(filterValue))); keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } logger.debug("Alternative Query for top:" + path.toString()); int groupLevel = 1; for(Object temporal : temporalStartKey.toList()) { if(!temporal.toString().isEmpty()) startKey.add(temporal); } int count = 1; for(Object temporal : temporalEndKey.toList()) { if(!temporal.toString().isEmpty()) { // couchbase exclude last value if(count == temporalEndKey.size()) temporal = (int) temporal + 1; endKey.add(temporal); } count++; } String viewName = getMapReduceFunctionNameTopMap(key, keys); // String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys); String designDocId = DESIGN_DOC_ID + key; logger.trace("keys:{}", keys.toString()); ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.endKey(endKey); query.descending(false); logger.trace( "Bucket :{}, Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}," + "temporalStartKey :{}, temporalEndKey :{}", clz.getSimpleName(), designDocId, viewName, groupLevel, startKey, endKey, temporalStartKey.toString(), temporalEndKey.toString()); Comparator comparator = new Comparator() { @Override public int compare(NumberedFilter o1, NumberedFilter o2) { int compareResult = -o1.compareTo(o2); if(compareResult == 0) { compareResult = 1; } return compareResult; } }; SortedSet ret = new TreeSet<>(comparator); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error(e.getLocalizedMessage()); throw e; } ViewRow row = viewResult.allRows().get(0); JsonObject value = (JsonObject) row.value(); JSONObject objectValueTop = new JSONObject(value.toString()); Iterator iterateJosn = objectValueTop.keys(); while(iterateJosn.hasNext()) { String keyTop = (String) iterateJosn.next(); Number n = (Number) objectValueTop.get(keyTop); if(n == null) n = 0; NumberedFilter numberedFilter = new NumberedFilter(key, keyTop, n, orderingProperty); ret.add(numberedFilter); } return ret; } /** * Calculate a next possible value with query (more slow but with fewer * resources ) * * @param clz * @param temporalConstraint * @param filters * @param key * @param orderingProperty * @return SortedSet * @throws Exception */ private SortedSet getNextPossibleValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key, String orderingProperty) throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); if(orderingProperty == null) { orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz); } Collection selectExpressions = new ArrayList<>(); // add select expression and check if exist selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz, orderingProperty) + " IS NOT NULL THEN " + getSpecializedProperty(clz, orderingProperty) + " ELSE 1 END )").as(orderingProperty)); selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz, key) + " IS NOT NULL THEN " + getSpecializedProperty(clz, key) + " ELSE 'UNKNOWN' END )").as(key)); // add where expression Expression whereExpression = x(getSpecializedProperty(clz, BasicUsageRecord.SCOPE)).eq(s(currentScope)); long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis(); whereExpression = whereExpression .and(x(getSpecializedProperty(clz, AggregatedRecord.START_TIME)).gt(startTime)); long endTime = temporalConstraint.getEndTime(); whereExpression = whereExpression.and(x(getSpecializedProperty(clz, AggregatedRecord.END_TIME)).lt(endTime)); Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz); // list filter used for remove duplicate filter Collection keys = new TreeSet<>(); if(filters != null && filters.size() != 0) { for(Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if(filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if(filterValue != null && filterValue.compareTo("") != 0) { if(keys.contains(filterKey)) { throw new DuplicatedKeyFilterException("Only one value per Filter key is allowed"); } whereExpression = whereExpression .and(x(getSpecializedProperty(clz, filterKey)).eq(s(filterValue))); keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } Expression[] selectExpressionArray = new Expression[selectExpressions.size()]; selectExpressions.toArray(selectExpressionArray); Sort sort = Sort.desc(orderingProperty); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); OffsetPath path = select(selectExpressionArray).from(bucket.name()).where(whereExpression).groupBy(key) .orderBy(sort); logger.debug("Query for top:" + path.toString()); Comparator comparator = new Comparator() { @Override public int compare(NumberedFilter o1, NumberedFilter o2) { int compareResult = -o1.compareTo(o2); if(compareResult == 0) { compareResult = 1; } return compareResult; } }; SortedSet ret = new TreeSet<>(comparator); N1qlQueryResult result = bucket.query(path); if(!result.finalSuccess()) { logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); throw new Exception("Query Failed :\n" + result.errors()); } List rows = result.allRows(); for(N1qlQueryRow row : rows) { try { JsonObject jsonObject = row.value(); // logger.trace("JsonObject : {}", row.value()+" key"+key); // logger.warn("pre"+jsonObject.toString()+" key :"+key); // verify for a not null value String value = jsonObject.getString(key); Number n = jsonObject.getDouble(orderingProperty); if(n == null) n = 0; // logger.trace("pre:{}, key:{}, value:{}, // n:{},orderingProperty:{}",jsonObject.toString(),key, value, // n, orderingProperty); NumberedFilter numberedFilter = new NumberedFilter(key, value, n, orderingProperty); ret.add(numberedFilter); } catch(Exception e) { logger.warn("Unable to eleborate result for {}", row.toString()); e.printStackTrace(); } } return ret; } /** * Return a list of context time series (used for portlet accounting * context) * * @param clz * @param temporalConstraint * @param filters * @param contexts * @return SortedMap * @throws Exception */ @Override public SortedMap> getContextTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters, List contexts) throws Exception { logger.trace("getContextTimeSeries for contexts:{}", contexts.toString()); SortedSet listContexts = new TreeSet(); for(String context : contexts) { Filter contextLabel = new Filter("context", context); listContexts.add(contextLabel); } SortedMap> ret = new TreeMap<>(); for(Filter nf : listContexts) { logger.debug("detail time series :{}", nf.toString()); SortedMap map = mapReduceQuery(clz, temporalConstraint, filters, nf.getValue(), false, false); if(!map.isEmpty()) { ret.put(nf, map); } filters.remove(nf); } return ret; } protected String getQualifiedProperty(String property) { // DEVELOPING return (property); } // Use for property into a specify bucket protected String getSpecializedProperty(Class> clz, String property) throws Exception { @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); return String.format("%s.%s", bucket.name(), property); } @Override @Deprecated public SortedSet getFilterValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key) throws Exception { return getFilterValues(clz, temporalConstraint, filters, key, null); } /** * Used for list a possible values for each filter * * @param clz * @param temporalConstraint * @param filters * @param key * @return SortedSet * @throws Exception */ @Override public SortedSet getFilterValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key, Integer limit) throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); JsonArray startKey = JsonArray.create(); startKey.add(currentScope); int scopeDateGroupLevel = 2; int groupLevel = scopeDateGroupLevel; // NO ADD A SPECIFIY DESIGN DOC ID FAMILY String designDocId = getDesignDocId(clz) + "Value"; String viewName = key; logger.trace("designDocId:{} view:{} startKey:{} groupLevel:{}", designDocId, key, startKey, groupLevel); ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.descending(false); if(limit != null) { query.limit(limit); } String orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error("error executing the query", e); throw e; } Comparator comparator = new Comparator() { @Override public int compare(NumberedFilter o1, NumberedFilter o2) { if(o1.getValue() == null) o1.setValue(""); if(o2.getValue() == null) o2.setValue(""); return o1.getValue().compareTo(o2.getValue()); } }; SortedSet ret = new TreeSet<>(comparator); for(ViewRow row : viewResult) { String value = (String) row.value(); NumberedFilter numberedFilter = new NumberedFilter(key, value, 0, orderingProperty); ret.add(numberedFilter); } logger.trace("returning {} values", ret.size()); return ret; } /** * SPERIMENTAL now is not used * * @param clz * @param temporalConstraint * @param applicant * @return JSONObject * @throws Exception */ public JSONObject getUsageValue(Class> clz, TemporalConstraint temporalConstraint, Filter applicant) throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); JsonArray startKey = JsonArray.create(); startKey.add(currentScope); JsonArray endKey = JsonArray.create(); endKey.add(currentScope); AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false); startKey.add(applicant.getValue()); for(Object temporal : temporalStartKey.toList()) { if(!temporal.toString().isEmpty()) startKey.add(temporal); } endKey.add(applicant.getValue()); int count = 1; for(Object temporal : temporalEndKey.toList()) { if(!temporal.toString().isEmpty()) { // couchbase exclude last value if(count == temporalEndKey.size()) temporal = (int) temporal + 1; endKey.add(temporal); } count++; } // +1 because mode start from 0 // +1 because have afilter value from 1 // +1 because of scope at the beginning int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1 + 1; int groupLevel = scopeDateGroupLevel; Collection keys = new TreeSet<>(); keys.add(applicant.getKey()); // ADD A SPECIFIY DESIGN DOC ID FAMILY String designDocId = getDesignDocIdSpecific(clz, keys); String viewName = applicant.getKey(); ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.endKey(endKey); query.descending(false); logger.trace( "Bucket :{}, Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}," + "temporalStartKey :{}, temporalEndKey :{}", clz.getSimpleName(), designDocId, viewName, groupLevel, startKey, endKey, temporalStartKey.toString(), temporalEndKey.toString()); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error(e.getLocalizedMessage()); throw e; } Map map = new HashMap(); for(ViewRow row : viewResult) { JsonObject jsnobject = (JsonObject) row.value(); JSONObject objJson = new JSONObject(jsnobject.toString()); Iterator iterateJosn = objJson.keys(); while(iterateJosn.hasNext()) { String key = (String) iterateJosn.next(); Float valuetmp = Float.parseFloat(objJson.get(key).toString()); if(key.equals("operationCount") || key.equals("dataVolume")) { if(map.containsKey(key)) { map.put(key, valuetmp + map.get(key)); } else map.put(key, valuetmp); } } } JSONObject result = new JSONObject(map); return result; } /** * * Used for calculate a usage value for each element of list (QUOTA) * * @param listUsage * @return List * @throws Exception */ @Override public List getUsageValueQuotaTotal(List listUsage) throws Exception { logger.debug("getUsageValueQuotaTotal init with list:{}", listUsage); String keyOrderingProperty = null; for(UsageValue totalFilters : listUsage) { String currentScope = totalFilters.getContext(); Collection keys = new TreeSet<>(); keys.add("consumerId"); String designDocId = getDesignDocIdSpecific(totalFilters.getClz(), keys); JsonArray temporalStartKey = null; JsonArray temporalEndKey = null; logger.trace("temporalConstraint:{}", totalFilters); TemporalConstraint temporalConstraint = totalFilters.getTemporalConstraint(); if(temporalConstraint == (null)) { // used for no temporalConstraint logger.trace("Not found temporalConstraint"); Calendar startTime = Calendar.getInstance(); startTime.set(1970, Calendar.JANUARY, 1); Calendar endTime = Calendar.getInstance(); temporalConstraint = new TemporalConstraint(startTime.getTimeInMillis(), endTime.getTimeInMillis(), AggregationMode.DAILY); if(totalFilters instanceof UsageStorageValue) { designDocId = "QuotaTotalSeparated"; } } else if(totalFilters.getClz().getSimpleName() .equals(AggregatedStorageStatusRecord.class.getSimpleName())) { logger.trace("AggregatedStorageStatusRecord with temporalConstraint"); designDocId = "Quota"; } AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); temporalStartKey = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false); Double totalQuota = 0.00; /* if (totalFilters instanceof UsageServiceValue) { UsageServiceValue totalFiltersService = (UsageServiceValue) totalFilters; } */ // do { String viewNameTmp = null; JsonArray startKeyTmp = JsonArray.create(); startKeyTmp.add(currentScope); JsonArray endKeyTmp = JsonArray.create(); endKeyTmp.add(currentScope); int groupLevelTmp = 2; // FiltersValue singleFilter=null; viewNameTmp = AggregatedServiceUsageRecord.CONSUMER_ID; startKeyTmp.add(totalFilters.getIdentifier()); endKeyTmp.add(totalFilters.getIdentifier()); if(totalFilters instanceof UsageServiceValue) { // logger.debug("******UsageServiceValue"); UsageServiceValue totalFiltersService = (UsageServiceValue) totalFilters; // singleFilter=totalFiltersService.getFiltersValue().get(i); for(Filter filter : totalFiltersService.getFilters()) { viewNameTmp = viewNameTmp + "__" + filter.getKey(); startKeyTmp.add(filter.getValue()); endKeyTmp.add(filter.getValue()); groupLevelTmp++; } } // not defined temporal constraint for(Object temporal : temporalStartKey.toList()) { if(!temporal.toString().isEmpty()) startKeyTmp.add(temporal); } int count = 1; for(Object temporal : temporalEndKey.toList()) { if(!temporal.toString().isEmpty()) { // couchbase excludes last value if(count == temporalEndKey.size()) temporal = (int) temporal + 1; endKeyTmp.add(temporal); } count++; } logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}," + "temporalStartKey :{}, temporalEndKey :{}", totalFilters.getClz().getSimpleName(), designDocId, viewNameTmp, groupLevelTmp, startKeyTmp, endKeyTmp, temporalStartKey.toString(), temporalEndKey.toString()); ViewQuery query = ViewQuery.from(designDocId, viewNameTmp); query.inclusiveEnd(); query.groupLevel(groupLevelTmp); query.startKey(startKeyTmp); query.endKey(endKeyTmp); query.descending(false); query.onError(OnError.STOP); logger.trace("query row:{}", query.toString()); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) totalFilters.getClz()); ViewResult viewResult; try { viewResult = bucket.query(query); } catch(Exception e) { logger.error(e.getLocalizedMessage()); throw e; } logger.trace("viewResult row:{}", viewResult.toString()); Map map = new HashMap(); for(ViewRow row : viewResult) { logger.trace("ViewRow row:{}", row.toString()); JsonObject jsnobject = (JsonObject) row.value(); JSONObject objJson = new JSONObject(jsnobject.toString()); Iterator iterateJosn = objJson.keys(); while(iterateJosn.hasNext()) { String key = (String) iterateJosn.next(); if(totalFilters instanceof UsageStorageValue) { JSONArray valueStorage = (JSONArray) objJson.get(key); logger.debug("--storageUsageRecord -key:{} value:{}", key, valueStorage.get(0)); totalQuota += Float.parseFloat(valueStorage.get(0).toString()); keyOrderingProperty = "dataVolume"; } else { if(key.equals("operationCount") || key.equals("dataVolume")) { Float valuetmp = Float.parseFloat(objJson.get(key).toString()); if(map.containsKey(key)) { map.put(key, valuetmp + map.get(key)); logger.debug("?UsageRecord -designDocId:{}", designDocId); keyOrderingProperty = key; totalQuota += totalFilters.getD() + valuetmp.doubleValue(); } else { map.put(key, valuetmp); logger.debug("?UsageRecord -designDocId:{}", designDocId); keyOrderingProperty = key; totalQuota += valuetmp.doubleValue(); } } } } } // convert usage from byte to Mb if(totalFilters instanceof UsageStorageValue) { totalQuota = totalQuota / 1024 / 1024; totalQuota = Math.round(totalQuota * 100.0) / 100.0; } totalFilters.setOrderingProperty(keyOrderingProperty); if(totalQuota.isNaN()) { totalQuota = 0.0; } totalFilters.setD(totalQuota); } return listUsage; } @Override public Record getRecord(String recordId, String recordType) throws Exception { try { Bucket bucket = getBucket(recordType); JsonDocument recordJson = bucket.get(recordId); String recordJsonString = recordJson.content().toString(); return RecordUtility.getRecord(recordJsonString); } catch(Exception e) { return null; } } /** * Used for storage status aka tab Space (into portlet accounting) and * popolate a list combobox used * * @throws Exception */ @Override public SortedSet getSpaceProvidersIds() throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); JsonArray startKey = JsonArray.create(); startKey.add(currentScope); int scopeDateGroupLevel = 2; int groupLevel = scopeDateGroupLevel; String designDocId = "StorageStatusRecordValue"; String viewName = "providerId"; logger.trace("designDocId:{} view:{} startKey:{} groupLevel:{}", designDocId, viewName, startKey, groupLevel); ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.descending(false); SortedSet ret = new TreeSet(); Bucket bucket = getBucket(AggregatedStorageStatusRecord.class); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error("error executing the query", e); throw e; } for(ViewRow row : viewResult) { String value = (String) row.value(); ret.add(value); } return ret; } /** * used for accounting portlet section storage status * * @param clz * @param temporalConstraint * @param filters * @param providersId * @return SortedMap * @throws JSONException */ public SortedMap> getSpaceTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters, List providersId) throws Exception { String currentScope = AccountingPersistenceBackendQuery.getScopeToQuery(); JsonArray startKey = JsonArray.create(); JsonArray endKey = JsonArray.create(); startKey.add(currentScope); endKey.add(currentScope); AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(), aggregationMode, false, false); JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false); Collection keys = new TreeSet<>(); String designDocId = "StorageStatusUsage"; String viewName; int groupLevel = 5; if(temporalConstraint.getAggregationMode().equals(AggregationMode.MONTHLY)) { groupLevel = 4; } if(temporalConstraint.getAggregationMode().equals(AggregationMode.YEARLY)) { groupLevel = 3; } viewName = temporalConstraint.getAggregationMode().name().toLowerCase(); if(filters != null && filters.size() != 0) { // Sorting filter for call a mapreduce Collections.sort(filters, new Comparator() { @Override public int compare(Filter filter1, Filter filter2) { int result = filter1.getKey().compareTo(filter2.getKey()); return result; } }); for(Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if(filterKey != null && filterKey.compareTo("") != 0) { if(filterValue != null && filterValue.compareTo("") != 0) { if(keys.contains(filterKey)) { throw new DuplicatedKeyFilterException("Only one value per Filter key is allowed"); } startKey.add(filterValue); endKey.add(filterValue); keys.add(filterKey); viewName = viewName + "_" + filterKey; if(filterKey != "consumerId") { groupLevel = groupLevel + 1; } } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } for(Object temporal : temporalStartKey.toList()) { if(!temporal.toString().isEmpty()) startKey.add(temporal); } int count = 1; for(Object temporal : temporalEndKey.toList()) { if(!temporal.toString().isEmpty()) { // couchbase exclude last value if(count == temporalEndKey.size()) temporal = (int) temporal + 1; endKey.add(temporal); } count++; } ViewQuery query = ViewQuery.from(designDocId, viewName); query.inclusiveEnd(); query.groupLevel(groupLevel); query.startKey(startKey); query.endKey(endKey); query.descending(false); logger.trace( "Bucket :{}, Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}," + "temporalStartKey :{}, temporalEndKey :{}", clz.getSimpleName(), designDocId, viewName, groupLevel, startKey, endKey, temporalStartKey.toString(), temporalEndKey.toString()); @SuppressWarnings("unchecked") Bucket bucket = getBucket((Class) clz); ViewResult viewResult; try { // execute query in a specify bucket viewResult = bucket.query(query); } catch(Exception e) { logger.error(e.getLocalizedMessage()); throw e; } SortedMap> ret = new TreeMap<>(); for(ViewRow row : viewResult) { JsonArray array = (JsonArray) row.key(); Calendar calendar = getCalendarFromArray(array); JsonObject value = (JsonObject) row.value(); JSONObject obj = new JSONObject(value.toString()); // logger.trace("row: {}, value: {}, obj: // {}",row.toString(),value.toString(),obj.toString()); for(Iterator iterator = obj.keys(); iterator.hasNext();) { String key = (String) iterator.next(); String[] tmp = key.split("-"); String providerId = tmp[0]; if(providersId.contains(providerId)) { Long valueProvider = Long.parseLong(obj.get(key).toString().split(",")[0].replace("[", "")); // convert into kb valueProvider = valueProvider / 1024; Filter filter = new Filter("providerId", providerId); if(!ret.containsKey(filter)) { SortedMap infos = new TreeMap<>(); infos.put(calendar, valueProvider); ret.put(filter, infos); } else { SortedMap singleValue = ret.get(filter); if(!singleValue.containsKey(calendar)) { // if not exist put singleValue.put(calendar, valueProvider); } else { // if exist, update singleValue.put(calendar, singleValue.get(calendar) + valueProvider); } } } } } logger.trace("return ret:{}", ret.toString()); return ret; } }