diff --git a/pom.xml b/pom.xml
index 9f6eb4f..38a2ba7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,19 +27,19 @@
org.gcube.data.publishing
document-store-lib
- [1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)
- provided
-
-
- org.gcube.accounting
- accounting-analytics
- [1.2.0-SNAPSHOT, 2.0.0-SNAPSHOT)
+ [1.1.0-SNAPSHOT,2.0.0-SNAPSHOT)
provided
org.gcube.accounting
accounting-lib
- [2.1.0-SNAPSHOT, 3.0.0-SNAPSHOT)
+ [2.2.0-SNAPSHOT, 3.0.0-SNAPSHOT)
+ provided
+
+
+ org.gcube.accounting
+ accounting-analytics
+ [2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT)
provided
diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
index df02248..0a6e901 100644
--- a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
+++ b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
@@ -7,26 +7,34 @@ import static com.couchbase.client.java.query.Select.select;
import static com.couchbase.client.java.query.dsl.Expression.s;
import static com.couchbase.client.java.query.dsl.Expression.x;
+import java.security.KeyException;
+import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
+import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
+import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum;
+import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
+import org.gcube.accounting.analytics.exception.ValueException;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
+import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
-import org.gcube.accounting.persistence.AccountingPersistence;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
-import org.gcube.documentstore.records.Record;
-import org.gcube.documentstore.records.RecordUtility;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
@@ -35,33 +43,39 @@ import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.query.dsl.Expression;
-import com.couchbase.client.java.query.dsl.path.GroupByPath;
+import com.couchbase.client.java.query.dsl.Sort;
+import com.couchbase.client.java.query.dsl.path.LimitPath;
+import com.couchbase.client.java.view.ViewQuery;
+import com.couchbase.client.java.view.ViewResult;
+import com.couchbase.client.java.view.ViewRow;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
- *
+ *
*/
-public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBackendQuery {
+public class AccountingPersistenceQueryCouchBase implements
+ AccountingPersistenceBackendQuery {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(AccountingPersistenceQueryCouchBase.class);
- private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryCouchBase.class);
-
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY;
- //public static final String USERNAME_PROPERTY_KEY = AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY;
+ // public static final String USERNAME_PROPERTY_KEY =
+ // AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY;
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY;
public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName";
-
+
/* The environment configuration */
- protected static final CouchbaseEnvironment ENV =
- DefaultCouchbaseEnvironment.builder()
- .queryEnabled(true)
- .build();
-
+ protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment
+ .builder().queryEnabled(true).build();
+
protected Cluster cluster;
protected Bucket bucket;
protected String bucketName;
@@ -70,7 +84,9 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
* {@inheritDoc}
*/
@Override
- protected void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception {
+ public void prepareConnection(
+ AccountingPersistenceBackendQueryConfiguration configuration)
+ throws Exception {
String url = configuration.getProperty(URL_PROPERTY_KEY);
// String username = configuration.getProperty(USERNAME_PROPERTY_KEY);
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
@@ -79,7 +95,7 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY);
bucket = cluster.openBucket(bucketName, password);
}
-
+
/**
* {@inheritDoc}
*/
@@ -88,114 +104,521 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa
cluster.disconnect();
}
- protected Calendar getCalendar(JSONObject obj, AggregationMode aggregationMode) throws NumberFormatException, JSONException{
+ protected Calendar getCalendar(JSONObject obj,
+ AggregationMode aggregationMode) throws NumberFormatException,
+ JSONException {
long millis;
- if(obj.has(AggregatedRecord.START_TIME)){
+ if (obj.has(AggregatedRecord.START_TIME)) {
millis = new Long(obj.getString(AggregatedRecord.START_TIME));
- logger.trace("The result {} was from an aggregated record. Using {}", obj.toString(), AggregatedRecord.START_TIME);
- }else{
+ logger.trace(
+ "The result {} was from an aggregated record. Using {}",
+ obj.toString(), AggregatedRecord.START_TIME);
+ } else {
millis = new Long(obj.getString(UsageRecord.CREATION_TIME));
- logger.trace("The result {} was from single record. Using {}", obj.toString(), UsageRecord.CREATION_TIME);
+ logger.trace("The result {} was from single record. Using {}",
+ obj.toString(), UsageRecord.CREATION_TIME);
}
- Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, aggregationMode);
- logger.trace("{} has been aligned to {}", millis, calendar.getTimeInMillis());
+ Calendar calendar = TemporalConstraint.getAlignedCalendar(millis,
+ aggregationMode);
+ logger.trace("{} has been aligned to {}", millis,
+ calendar.getTimeInMillis());
return calendar;
}
+
+// private Map selectQuery(
+// Class extends AggregatedRecord, ?>> clz,
+// TemporalConstraint temporalConstraint, List filters)
+// throws Exception {
+// // String currentScope = BasicUsageRecord.getScopeFromToken();
+// String currentScope = ScopeProvider.instance.get();
+// String recordType = clz.newInstance().getRecordType();
+//
+// Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
+// expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(
+// s(recordType)));
+//
+// long startTime = temporalConstraint.getAlignedStartTime()
+// .getTimeInMillis();
+// expression = expression.and(x(AggregatedRecord.START_TIME)
+// .gt(startTime).or(
+// x(AggregatedRecord.CREATION_TIME).gt(startTime)));
+//
+// long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
+// expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime))
+// .or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
+//
+// AggregationMode aggregationMode = temporalConstraint
+// .getAggregationMode();
+// // TODO Aggregate Results
+//
+// if (filters != null) {
+// for (Filter filter : filters) {
+// expression = expression.and(x(filter.getKey()).eq(
+// s(filter.getValue())));
+// }
+// }
+//
+// GroupByPath groupByPath = select("*").from(bucketName)
+// .where(expression);
+//
+// Map map = new HashMap();
+//
+// N1qlQueryResult result = bucket.query(groupByPath);
+// if (!result.finalSuccess()) {
+// logger.debug("{} failed : {}",
+// N1qlQueryResult.class.getSimpleName(), result.errors());
+// return map;
+// }
+//
+// List rows = result.allRows();
+//
+// for (N1qlQueryRow row : rows) {
+// try {
+// logger.trace("Row : {}", row.toString());
+// JsonObject jsonObject = row.value().getObject(bucketName);
+// logger.trace("JsonObject : {}", row.toString());
+// String recordString = jsonObject.toMap().toString();
+// logger.trace("Record String : {}", recordString);
+// Record record = RecordUtility.getRecord(recordString);
+//
+// JSONObject obj = new JSONObject(jsonObject.toString());
+// Calendar calendar = getCalendar(obj, aggregationMode);
+// if (map.containsKey(calendar)) {
+// Info info = map.get(calendar);
+// JSONObject value = info.getValue();
+// jsonObject.toMap();
+// } else {
+// map.put(calendar, new Info(calendar, obj));
+// }
+// } catch (Exception e) {
+// logger.warn("Unable to eleborate result for {}", row.toString());
+// }
+//
+// logger.trace("\n\n\n");
+// }
+//
+// return map;
+// }
+
+ protected Calendar getCalendarFromArray(JsonArray array)
+ throws JSONException {
+ boolean startFound = false;
+ Calendar calendar = Calendar
+ .getInstance(TemporalConstraint.DEFAULT_TIME_ZONE);
+ int count = 0;
+ CalendarEnum[] calendarValues = CalendarEnum.values();
+ for (int i = 0; i < array.size(); i++) {
+ try {
+ int value = array.getInt(i);
+ int calendarValue = calendarValues[count].getCalendarValue();
+ if (calendarValue == Calendar.MONTH) {
+ value--;
+ }
+ calendar.set(calendarValue, value);
+ count++;
+ startFound = true;
+ } catch (Exception e) {
+ /*
+ logger.trace("The provide value is not an int. {}", array
+ .get(i).toString());
+ */
+ if (startFound) {
+ break;
+ }
+
+ }
+ }
+
+ for (int j = count; j < calendarValues.length; j++) {
+ if (calendarValues[j].getCalendarValue() == Calendar.DAY_OF_MONTH) {
+ calendar.set(calendarValues[j].getCalendarValue(), 1);
+ } else {
+ calendar.set(calendarValues[j].getCalendarValue(), 0);
+ }
+ }
+
+ return calendar;
+
+ }
+
+ protected JsonArray getRangeKey(long time, AggregationMode aggregationMode,
+ boolean wildCard, boolean endKey) throws JSONException {
+
+ JsonArray array = JsonArray.create();
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(time);
+ CalendarEnum[] values = CalendarEnum.values();
+
+ if (endKey) {
+ calendar.add(values[aggregationMode.ordinal()].getCalendarValue(),
+ 1);
+ }
+
+ for (int i = 0; i <= aggregationMode.ordinal(); i++) {
+ int value = calendar.get(values[i].getCalendarValue());
+
+ if (values[i].getCalendarValue() == Calendar.MONTH) {
+ value = value + 1;
+ }
+
+ array.add(value);
+ }
+
+ if (wildCard) {
+ array.add("{}");
+ }
+
+ return array;
+ }
+
+ protected static final String MAP_REDUCE__DESIGN = "_design/";
+ protected static final String MAP_REDUCE_ALL = "all";
/**
- * {@inheritDoc}
+ * Used in the name of map reduce to separate keys used as filter
*/
+ protected static final String KEYS_SEPARATOR = "__";
+
+ protected String getDesignDocId(
+ Class extends AggregatedRecord,?>> recordClass)
+ throws InstantiationException, IllegalAccessException {
+ return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
+ .newInstance().getRecordType());
+ }
+
+ public static String getMapReduceFunctionName(
+ Collection collection) {
+ String reduceFunction = MAP_REDUCE_ALL;
+ for (String property : collection) {
+ if (reduceFunction == null) {
+ reduceFunction = property;
+ } else {
+ reduceFunction = reduceFunction + KEYS_SEPARATOR + property;
+ }
+ }
+ return reduceFunction;
+ }
+
+ protected SortedMap mapReduceQuery(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters)
+ throws Exception {
+
+ // String currentScope = BasicUsageRecord.getScopeFromToken();
+ String currentScope = ScopeProvider.instance.get();
+
+ JsonArray startKey = JsonArray.create();
+ startKey.add(currentScope);
+ JsonArray endKey = JsonArray.create();
+ endKey.add(currentScope);
+
+ AggregationMode aggregationMode = temporalConstraint
+ .getAggregationMode();
+
+ JsonArray temporalStartKey = getRangeKey(
+ temporalConstraint.getStartTime(),
+ aggregationMode, false, false);
+
+ JsonArray temporalEndKey = getRangeKey(
+ temporalConstraint.getEndTime(),
+ aggregationMode, true, false);
+
+
+ Set recordKeysSet = AccountingPersistenceQuery
+ .getQuerableKeys(clz.newInstance());
+
+ Collection keys = new TreeSet<>();
+
+ JsonArray filterStartKey = JsonArray.create();
+ JsonArray filterEndKey = JsonArray.create();
+
+ if (filters != null && filters.size() != 0) {
+ for (Filter filter : filters) {
+
+ String filterKey = filter.getKey();
+ String filterValue = filter.getValue();
+
+ if (filterKey != null && filterKey.compareTo("") != 0
+ && recordKeysSet.contains(filterKey)) {
+
+ if (filterValue != null && filterValue.compareTo("") != 0) {
+ if (keys.contains(filterKey)) {
+ throw new DuplicatedKeyFilterException(
+ "Only one value per Filter key is allowed");
+ }
+
+ filterStartKey.add(filterValue);
+ filterEndKey.add(filterValue);
+
+ keys.add(filterKey);
+ } else {
+ throw new KeyException(
+ String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ } else {
+ throw new ValueException(String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ }
+ }
+
+ // +1 because mode start from 0
+ // +1 because of scope at the beginning
+ int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
+ int groupLevel = scopeDateGroupLevel;
+ if (filters != null) {
+ groupLevel += keys.size();
+ }
+
+ String designDocId = getDesignDocId(clz);
+
+
+ startKey.add(filterStartKey.toList());
+ startKey.add(temporalStartKey.toList());
+
+ endKey.add(filterEndKey.toList());
+ endKey.add(temporalEndKey.toList());
+
+
+ String viewName = getMapReduceFunctionName(keys);
+
+ ViewQuery query = ViewQuery.from(designDocId, viewName);
+ query.group(true);
+ query.groupLevel(groupLevel);
+
+ query.startKey(startKey);
+ query.endKey(endKey);
+ query.descending(false);
+
+ logger.trace("Design Doc ID : {}, View Name : {}, "
+ + "Group Level : {}, Start Key : {}, End Key : {}",
+ designDocId, viewName, groupLevel, startKey, endKey);
+
+ SortedMap infos = new TreeMap<>();
+
+ ViewResult viewResult;
+ try {
+ viewResult = bucket.query(query);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ for (ViewRow row : viewResult) {
+ JsonArray array = (JsonArray) row.key();
+ Calendar calendar = getCalendarFromArray(array);
+
+ JsonObject value = (JsonObject) row.value();
+ JSONObject obj = new JSONObject(value.toString());
+
+ Info info = new Info(calendar, obj);
+ infos.put(calendar, info);
+
+ }
+
+ return infos;
+ }
+
@Override
- protected Map reallyQuery(
- @SuppressWarnings("rawtypes") Class extends AggregatedRecord> recordClass,
+ public SortedMap getTimeSeries(
+ Class extends AggregatedRecord, ?>> clz,
TemporalConstraint temporalConstraint, List filters)
throws Exception {
-
- //String currentScope = BasicUsageRecord.getScopeFromToken();
- String currentScope = ScopeProvider.instance.get();
- String recordType = recordClass.newInstance().getRecordType();
-
- Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
- expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(s(recordType)));
-
- long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
- expression = expression.and(
- x(AggregatedRecord.START_TIME).gt(startTime).
- or(x(AggregatedRecord.CREATION_TIME).gt(startTime)));
-
- long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
- expression = expression.and(
- x(AggregatedRecord.END_TIME).lt(endTime)).
- or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
-
- AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
-
- if(filters!=null){
- for(Filter filter : filters){
- expression = expression.and(x(filter.getKey()).eq(s(filter.getValue())));
- }
- }
-
- GroupByPath groupByPath = select("*").from(bucketName).where(expression);
-
- N1qlQueryResult result = bucket.query(groupByPath);
- List rows = result.allRows();
- Map map = new HashMap();
-
- for(N1qlQueryRow row : rows){
- logger.debug(row.toString());
- JsonObject jsonObject = row.value().getObject(bucketName);
- Record record = RecordUtility.getRecord(jsonObject.toMap().toString());
-
- JSONObject obj = new JSONObject(jsonObject.toString());
- Calendar calendar = getCalendar(obj, aggregationMode);
- if(map.containsKey(calendar)){
- Info info = map.get(calendar);
- JSONObject value = info.getValue();
- jsonObject.toMap();
-
- }else{
- map.put(calendar, new Info(calendar, obj));
- }
- }
-
- return map;
+ return mapReduceQuery(clz, temporalConstraint, filters);
}
-
- /**
- * {@inheritDoc}
- */
@Override
- public Set getKeys(Class extends AggregatedRecord> recordClass)
- throws Exception {
+ public SortedMap> getTopValues(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters,
+ String topKey, String orderingProperty) throws Exception {
// TODO Auto-generated method stub
return null;
}
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Set getPossibleValuesForKey(
- Class extends AggregatedRecord> recordClass, String key)
- throws Exception {
- // TODO Auto-generated method stub
- // SELECT DISTINCT
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery#getPossibleValuesForKey(java.lang.Class, java.lang.String, int)
- */
- @Override
- public Set getPossibleValuesForKey(
- Class extends AggregatedRecord> recordClass, String key, int limit)
- throws Exception {
- // TODO Auto-generated method stub
- return null;
- }
-
+ protected String getQualifiedProperty(String property){
+ return String.format("%s.%s", bucketName, property);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public SortedSet getNextPossibleValues(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters,
+ String topKey, String orderingProperty) throws Exception {
+
+ /*
+ SELECT
+ SUM(accounting.dataVolume) as dataVolume,
+ // SUM(accounting.operationCount) as operationCount,
+
+ // Filter List e.g.
+ accounting.consumerId as consumerId,
+ accounting.serviceClass as serviceClass,
+
+ // topKey e.g
+ accounting.serviceName as serviceName
+
+ FROM accounting
+ WHERE
+ accounting.scope="/gcube/devsec" AND
+ (accounting.recordType="StorageUsageRecord" OR
+ accounting.usageRecordType="StorageUsageRecord") AND
+
+ AND
+ (accounting.startTime >= temporalConstraint.startTime AND
+ accounting.endTime >= temporalConstraint.endTime)
+
+
+ // Filter List e.g
+ AND
+ accounting.consumerId="luca.frosini" AND
+ accounting.serviceClass="VREManagement"
+
+ // topKey
+ GROUP BY accounting.serviceName
+
+ ORDER BY dataVolume DESC
+ // ORDER BY operationsCount DESC
+
+ LIMIT 10
+ */
+
+
+ // String currentScope = BasicUsageRecord.getScopeFromToken();
+ String currentScope = ScopeProvider.instance.get();
+ String recordType = clz.newInstance().getRecordType();
+
+ if(orderingProperty==null){
+ orderingProperty = AccountingPersistenceQuery.
+ getDefaultOrderingProperties(clz);
+ }
+
+ Collection selectExpressions = new ArrayList<>();
+
+ selectExpressions.add(x("SUM(" + getQualifiedProperty(orderingProperty) + ")").
+ as(orderingProperty));
+
+ selectExpressions.add(x(getQualifiedProperty(topKey)).as(topKey));
+
+ Expression whereExpression =
+ x(getQualifiedProperty(BasicUsageRecord.SCOPE)).
+ eq(s(currentScope));
+
+ whereExpression = whereExpression.and(
+ x(getQualifiedProperty(BasicUsageRecord.RECORD_TYPE)).
+ eq(s(recordType)).or(
+ x(getQualifiedProperty(BasicUsageRecord.USAGE_RECORD_TYPE)).
+ eq(s(recordType)))
+ );
+
+ long startTime = temporalConstraint.getAlignedStartTime()
+ .getTimeInMillis();
+ whereExpression = whereExpression.and(
+ x(getQualifiedProperty(AggregatedRecord.START_TIME)).gt(startTime).or(
+ x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).gt(startTime)));
+
+ long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
+ whereExpression = whereExpression.and(
+ x(getQualifiedProperty(AggregatedRecord.END_TIME)).lt(endTime)).or(
+ x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).lt(endTime));
+
+ Set recordKeysSet = AccountingPersistenceQuery
+ .getQuerableKeys(clz.newInstance());
+
+ Collection keys = new TreeSet<>();
+
+ if (filters != null && filters.size() != 0) {
+ for (Filter filter : filters) {
+
+ String filterKey = filter.getKey();
+ String filterValue = filter.getValue();
+
+ if (filterKey != null && filterKey.compareTo("") != 0
+ && recordKeysSet.contains(filterKey)) {
+
+ if (filterValue != null && filterValue.compareTo("") != 0) {
+ if (keys.contains(filterKey)) {
+ throw new DuplicatedKeyFilterException(
+ "Only one value per Filter key is allowed");
+ }
+
+ whereExpression.and(
+ x(getQualifiedProperty(filterKey)).eq(filterValue));
+
+ keys.add(filterKey);
+ } else {
+ throw new KeyException(
+ String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ } else {
+ throw new ValueException(String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ }
+ }
+
+
+ Expression[] selectExpressionArray =
+ new Expression[selectExpressions.size()];
+ selectExpressions.toArray(selectExpressionArray);
+
+ Sort sort = Sort.desc(orderingProperty);
+
+ LimitPath path = select(selectExpressionArray).from(bucketName)
+ .where(whereExpression).groupBy(topKey).orderBy(sort);
+
+ logger.debug(path.toString());
+
+
+ Comparator comparator = new Comparator() {
+
+ @Override
+ public int compare(NumberedFilter o1, NumberedFilter o2) {
+ return - o1.compareTo(o2);
+ }
+
+ };
+
+ SortedSet ret = new TreeSet<>(comparator);
+
+ N1qlQueryResult result = bucket.query(path);
+ if (!result.finalSuccess()) {
+ logger.debug("{} failed : {}",
+ N1qlQueryResult.class.getSimpleName(), result.errors());
+ throw new Exception("Query Failed :\n" + result.errors());
+ }
+
+ List rows = result.allRows();
+
+ for (N1qlQueryRow row : rows) {
+ try {
+ logger.trace("Row : {}", row.toString());
+ JsonObject jsonObject = row.value();
+ logger.trace("JsonObject : {}", row.toString());
+
+ String value = jsonObject.getString(topKey);
+
+ Number n = jsonObject.getDouble(orderingProperty);
+
+ NumberedFilter numberedFilter =
+ new NumberedFilter(topKey, value, n, orderingProperty);
+
+ ret.add(numberedFilter);
+
+ } catch (Exception e) {
+ logger.warn("Unable to eleborate result for {}", row.toString());
+ }
+
+ logger.trace("\n\n\n");
+ }
+
+ return ret;
+ }
+
}