From 6dfa1805648072c2f52eea4f7365a5e23631d0d7 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Fri, 25 Mar 2016 18:18:17 +0000 Subject: [PATCH] refs #1665: Create Accounting Analytics Persistence CouchBase https://support.d4science.org/issues/1665 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-analytics-persistence-couchbase@126315 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 16 +- .../AccountingPersistenceQueryCouchBase.java | 643 +++++++++++++++--- 2 files changed, 541 insertions(+), 118 deletions(-) diff --git a/pom.xml b/pom.xml index 9f6eb4f..38a2ba7 100644 --- a/pom.xml +++ b/pom.xml @@ -27,19 +27,19 @@ org.gcube.data.publishing document-store-lib - [1.0.0-SNAPSHOT,2.0.0-SNAPSHOT) - provided - - - org.gcube.accounting - accounting-analytics - [1.2.0-SNAPSHOT, 2.0.0-SNAPSHOT) + [1.1.0-SNAPSHOT,2.0.0-SNAPSHOT) provided org.gcube.accounting accounting-lib - [2.1.0-SNAPSHOT, 3.0.0-SNAPSHOT) + [2.2.0-SNAPSHOT, 3.0.0-SNAPSHOT) + provided + + + org.gcube.accounting + accounting-analytics + [2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT) provided diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java index df02248..0a6e901 100644 --- a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java +++ b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java @@ -7,26 +7,34 @@ import static com.couchbase.client.java.query.Select.select; import static com.couchbase.client.java.query.dsl.Expression.s; import static com.couchbase.client.java.query.dsl.Expression.x; +import java.security.KeyException; +import java.util.ArrayList; import java.util.Calendar; -import java.util.HashMap; +import java.util.Collection; +import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; +import org.gcube.accounting.analytics.NumberedFilter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; +import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum; +import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; +import org.gcube.accounting.analytics.exception.ValueException; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; +import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery; import org.gcube.accounting.datamodel.BasicUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; -import org.gcube.accounting.persistence.AccountingPersistence; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.documentstore.records.AggregatedRecord; -import org.gcube.documentstore.records.Record; -import org.gcube.documentstore.records.RecordUtility; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; @@ -35,33 +43,39 @@ import org.slf4j.LoggerFactory; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; +import com.couchbase.client.java.document.json.JsonArray; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.query.N1qlQueryResult; import com.couchbase.client.java.query.N1qlQueryRow; import com.couchbase.client.java.query.dsl.Expression; -import com.couchbase.client.java.query.dsl.path.GroupByPath; +import com.couchbase.client.java.query.dsl.Sort; +import com.couchbase.client.java.query.dsl.path.LimitPath; +import com.couchbase.client.java.view.ViewQuery; +import com.couchbase.client.java.view.ViewResult; +import com.couchbase.client.java.view.ViewRow; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ - * + * */ -public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBackendQuery { +public class AccountingPersistenceQueryCouchBase implements + AccountingPersistenceBackendQuery { + + private static final Logger logger = LoggerFactory + .getLogger(AccountingPersistenceQueryCouchBase.class); - private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryCouchBase.class); - public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; - //public static final String USERNAME_PROPERTY_KEY = AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY; + // public static final String USERNAME_PROPERTY_KEY = + // AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY; public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY; public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName"; - + /* The environment configuration */ - protected static final CouchbaseEnvironment ENV = - DefaultCouchbaseEnvironment.builder() - .queryEnabled(true) - .build(); - + protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment + .builder().queryEnabled(true).build(); + protected Cluster cluster; protected Bucket bucket; protected String bucketName; @@ -70,7 +84,9 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa * {@inheritDoc} */ @Override - protected void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { + public void prepareConnection( + AccountingPersistenceBackendQueryConfiguration configuration) + throws Exception { String url = configuration.getProperty(URL_PROPERTY_KEY); // String username = configuration.getProperty(USERNAME_PROPERTY_KEY); String password = configuration.getProperty(PASSWORD_PROPERTY_KEY); @@ -79,7 +95,7 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY); bucket = cluster.openBucket(bucketName, password); } - + /** * {@inheritDoc} */ @@ -88,114 +104,521 @@ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBa cluster.disconnect(); } - protected Calendar getCalendar(JSONObject obj, AggregationMode aggregationMode) throws NumberFormatException, JSONException{ + protected Calendar getCalendar(JSONObject obj, + AggregationMode aggregationMode) throws NumberFormatException, + JSONException { long millis; - if(obj.has(AggregatedRecord.START_TIME)){ + if (obj.has(AggregatedRecord.START_TIME)) { millis = new Long(obj.getString(AggregatedRecord.START_TIME)); - logger.trace("The result {} was from an aggregated record. Using {}", obj.toString(), AggregatedRecord.START_TIME); - }else{ + logger.trace( + "The result {} was from an aggregated record. Using {}", + obj.toString(), AggregatedRecord.START_TIME); + } else { millis = new Long(obj.getString(UsageRecord.CREATION_TIME)); - logger.trace("The result {} was from single record. Using {}", obj.toString(), UsageRecord.CREATION_TIME); + logger.trace("The result {} was from single record. Using {}", + obj.toString(), UsageRecord.CREATION_TIME); } - Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, aggregationMode); - logger.trace("{} has been aligned to {}", millis, calendar.getTimeInMillis()); + Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, + aggregationMode); + logger.trace("{} has been aligned to {}", millis, + calendar.getTimeInMillis()); return calendar; } + +// private Map selectQuery( +// Class> clz, +// TemporalConstraint temporalConstraint, List filters) +// throws Exception { +// // String currentScope = BasicUsageRecord.getScopeFromToken(); +// String currentScope = ScopeProvider.instance.get(); +// String recordType = clz.newInstance().getRecordType(); +// +// Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope)); +// expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq( +// s(recordType))); +// +// long startTime = temporalConstraint.getAlignedStartTime() +// .getTimeInMillis(); +// expression = expression.and(x(AggregatedRecord.START_TIME) +// .gt(startTime).or( +// x(AggregatedRecord.CREATION_TIME).gt(startTime))); +// +// long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); +// expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime)) +// .or(x(AggregatedRecord.CREATION_TIME).lt(endTime)); +// +// AggregationMode aggregationMode = temporalConstraint +// .getAggregationMode(); +// // TODO Aggregate Results +// +// if (filters != null) { +// for (Filter filter : filters) { +// expression = expression.and(x(filter.getKey()).eq( +// s(filter.getValue()))); +// } +// } +// +// GroupByPath groupByPath = select("*").from(bucketName) +// .where(expression); +// +// Map map = new HashMap(); +// +// N1qlQueryResult result = bucket.query(groupByPath); +// if (!result.finalSuccess()) { +// logger.debug("{} failed : {}", +// N1qlQueryResult.class.getSimpleName(), result.errors()); +// return map; +// } +// +// List rows = result.allRows(); +// +// for (N1qlQueryRow row : rows) { +// try { +// logger.trace("Row : {}", row.toString()); +// JsonObject jsonObject = row.value().getObject(bucketName); +// logger.trace("JsonObject : {}", row.toString()); +// String recordString = jsonObject.toMap().toString(); +// logger.trace("Record String : {}", recordString); +// Record record = RecordUtility.getRecord(recordString); +// +// JSONObject obj = new JSONObject(jsonObject.toString()); +// Calendar calendar = getCalendar(obj, aggregationMode); +// if (map.containsKey(calendar)) { +// Info info = map.get(calendar); +// JSONObject value = info.getValue(); +// jsonObject.toMap(); +// } else { +// map.put(calendar, new Info(calendar, obj)); +// } +// } catch (Exception e) { +// logger.warn("Unable to eleborate result for {}", row.toString()); +// } +// +// logger.trace("\n\n\n"); +// } +// +// return map; +// } + + protected Calendar getCalendarFromArray(JsonArray array) + throws JSONException { + boolean startFound = false; + Calendar calendar = Calendar + .getInstance(TemporalConstraint.DEFAULT_TIME_ZONE); + int count = 0; + CalendarEnum[] calendarValues = CalendarEnum.values(); + for (int i = 0; i < array.size(); i++) { + try { + int value = array.getInt(i); + int calendarValue = calendarValues[count].getCalendarValue(); + if (calendarValue == Calendar.MONTH) { + value--; + } + calendar.set(calendarValue, value); + count++; + startFound = true; + } catch (Exception e) { + /* + logger.trace("The provide value is not an int. {}", array + .get(i).toString()); + */ + if (startFound) { + break; + } + + } + } + + for (int j = count; j < calendarValues.length; j++) { + if (calendarValues[j].getCalendarValue() == Calendar.DAY_OF_MONTH) { + calendar.set(calendarValues[j].getCalendarValue(), 1); + } else { + calendar.set(calendarValues[j].getCalendarValue(), 0); + } + } + + return calendar; + + } + + protected JsonArray getRangeKey(long time, AggregationMode aggregationMode, + boolean wildCard, boolean endKey) throws JSONException { + + JsonArray array = JsonArray.create(); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(time); + CalendarEnum[] values = CalendarEnum.values(); + + if (endKey) { + calendar.add(values[aggregationMode.ordinal()].getCalendarValue(), + 1); + } + + for (int i = 0; i <= aggregationMode.ordinal(); i++) { + int value = calendar.get(values[i].getCalendarValue()); + + if (values[i].getCalendarValue() == Calendar.MONTH) { + value = value + 1; + } + + array.add(value); + } + + if (wildCard) { + array.add("{}"); + } + + return array; + } + + protected static final String MAP_REDUCE__DESIGN = "_design/"; + protected static final String MAP_REDUCE_ALL = "all"; /** - * {@inheritDoc} + * Used in the name of map reduce to separate keys used as filter */ + protected static final String KEYS_SEPARATOR = "__"; + + protected String getDesignDocId( + Class> recordClass) + throws InstantiationException, IllegalAccessException { + return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass + .newInstance().getRecordType()); + } + + public static String getMapReduceFunctionName( + Collection collection) { + String reduceFunction = MAP_REDUCE_ALL; + for (String property : collection) { + if (reduceFunction == null) { + reduceFunction = property; + } else { + reduceFunction = reduceFunction + KEYS_SEPARATOR + property; + } + } + return reduceFunction; + } + + protected SortedMap mapReduceQuery( + Class> clz, + TemporalConstraint temporalConstraint, List filters) + throws Exception { + + // String currentScope = BasicUsageRecord.getScopeFromToken(); + String currentScope = ScopeProvider.instance.get(); + + JsonArray startKey = JsonArray.create(); + startKey.add(currentScope); + JsonArray endKey = JsonArray.create(); + endKey.add(currentScope); + + AggregationMode aggregationMode = temporalConstraint + .getAggregationMode(); + + JsonArray temporalStartKey = getRangeKey( + temporalConstraint.getStartTime(), + aggregationMode, false, false); + + JsonArray temporalEndKey = getRangeKey( + temporalConstraint.getEndTime(), + aggregationMode, true, false); + + + Set recordKeysSet = AccountingPersistenceQuery + .getQuerableKeys(clz.newInstance()); + + Collection keys = new TreeSet<>(); + + JsonArray filterStartKey = JsonArray.create(); + JsonArray filterEndKey = JsonArray.create(); + + if (filters != null && filters.size() != 0) { + for (Filter filter : filters) { + + String filterKey = filter.getKey(); + String filterValue = filter.getValue(); + + if (filterKey != null && filterKey.compareTo("") != 0 + && recordKeysSet.contains(filterKey)) { + + if (filterValue != null && filterValue.compareTo("") != 0) { + if (keys.contains(filterKey)) { + throw new DuplicatedKeyFilterException( + "Only one value per Filter key is allowed"); + } + + filterStartKey.add(filterValue); + filterEndKey.add(filterValue); + + keys.add(filterKey); + } else { + throw new KeyException( + String.format("Invalid %s : %s", + Filter.class.getSimpleName(), filter.toString())); + } + + } else { + throw new ValueException(String.format("Invalid %s : %s", + Filter.class.getSimpleName(), filter.toString())); + } + + } + } + + // +1 because mode start from 0 + // +1 because of scope at the beginning + int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1; + int groupLevel = scopeDateGroupLevel; + if (filters != null) { + groupLevel += keys.size(); + } + + String designDocId = getDesignDocId(clz); + + + startKey.add(filterStartKey.toList()); + startKey.add(temporalStartKey.toList()); + + endKey.add(filterEndKey.toList()); + endKey.add(temporalEndKey.toList()); + + + String viewName = getMapReduceFunctionName(keys); + + ViewQuery query = ViewQuery.from(designDocId, viewName); + query.group(true); + query.groupLevel(groupLevel); + + query.startKey(startKey); + query.endKey(endKey); + query.descending(false); + + logger.trace("Design Doc ID : {}, View Name : {}, " + + "Group Level : {}, Start Key : {}, End Key : {}", + designDocId, viewName, groupLevel, startKey, endKey); + + SortedMap infos = new TreeMap<>(); + + ViewResult viewResult; + try { + viewResult = bucket.query(query); + } catch (Exception e) { + throw e; + } + + for (ViewRow row : viewResult) { + JsonArray array = (JsonArray) row.key(); + Calendar calendar = getCalendarFromArray(array); + + JsonObject value = (JsonObject) row.value(); + JSONObject obj = new JSONObject(value.toString()); + + Info info = new Info(calendar, obj); + infos.put(calendar, info); + + } + + return infos; + } + @Override - protected Map reallyQuery( - @SuppressWarnings("rawtypes") Class recordClass, + public SortedMap getTimeSeries( + Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { - - //String currentScope = BasicUsageRecord.getScopeFromToken(); - String currentScope = ScopeProvider.instance.get(); - String recordType = recordClass.newInstance().getRecordType(); - - Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope)); - expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(s(recordType))); - - long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis(); - expression = expression.and( - x(AggregatedRecord.START_TIME).gt(startTime). - or(x(AggregatedRecord.CREATION_TIME).gt(startTime))); - - long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); - expression = expression.and( - x(AggregatedRecord.END_TIME).lt(endTime)). - or(x(AggregatedRecord.CREATION_TIME).lt(endTime)); - - AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); - - if(filters!=null){ - for(Filter filter : filters){ - expression = expression.and(x(filter.getKey()).eq(s(filter.getValue()))); - } - } - - GroupByPath groupByPath = select("*").from(bucketName).where(expression); - - N1qlQueryResult result = bucket.query(groupByPath); - List rows = result.allRows(); - Map map = new HashMap(); - - for(N1qlQueryRow row : rows){ - logger.debug(row.toString()); - JsonObject jsonObject = row.value().getObject(bucketName); - Record record = RecordUtility.getRecord(jsonObject.toMap().toString()); - - JSONObject obj = new JSONObject(jsonObject.toString()); - Calendar calendar = getCalendar(obj, aggregationMode); - if(map.containsKey(calendar)){ - Info info = map.get(calendar); - JSONObject value = info.getValue(); - jsonObject.toMap(); - - }else{ - map.put(calendar, new Info(calendar, obj)); - } - } - - return map; + return mapReduceQuery(clz, temporalConstraint, filters); } - - /** - * {@inheritDoc} - */ @Override - public Set getKeys(Class recordClass) - throws Exception { + public SortedMap> getTopValues( + Class> clz, + TemporalConstraint temporalConstraint, List filters, + String topKey, String orderingProperty) throws Exception { // TODO Auto-generated method stub return null; } - - - /** - * {@inheritDoc} - */ - @Override - public Set getPossibleValuesForKey( - Class recordClass, String key) - throws Exception { - // TODO Auto-generated method stub - // SELECT DISTINCT - return null; - } - - /* (non-Javadoc) - * @see org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery#getPossibleValuesForKey(java.lang.Class, java.lang.String, int) - */ - @Override - public Set getPossibleValuesForKey( - Class recordClass, String key, int limit) - throws Exception { - // TODO Auto-generated method stub - return null; - } - + protected String getQualifiedProperty(String property){ + return String.format("%s.%s", bucketName, property); + } + + @SuppressWarnings("deprecation") + @Override + public SortedSet getNextPossibleValues( + Class> clz, + TemporalConstraint temporalConstraint, List filters, + String topKey, String orderingProperty) throws Exception { + + /* + SELECT + SUM(accounting.dataVolume) as dataVolume, + // SUM(accounting.operationCount) as operationCount, + + // Filter List e.g. + accounting.consumerId as consumerId, + accounting.serviceClass as serviceClass, + + // topKey e.g + accounting.serviceName as serviceName + + FROM accounting + WHERE + accounting.scope="/gcube/devsec" AND + (accounting.recordType="StorageUsageRecord" OR + accounting.usageRecordType="StorageUsageRecord") AND + + AND + (accounting.startTime >= temporalConstraint.startTime AND + accounting.endTime >= temporalConstraint.endTime) + + + // Filter List e.g + AND + accounting.consumerId="luca.frosini" AND + accounting.serviceClass="VREManagement" + + // topKey + GROUP BY accounting.serviceName + + ORDER BY dataVolume DESC + // ORDER BY operationsCount DESC + + LIMIT 10 + */ + + + // String currentScope = BasicUsageRecord.getScopeFromToken(); + String currentScope = ScopeProvider.instance.get(); + String recordType = clz.newInstance().getRecordType(); + + if(orderingProperty==null){ + orderingProperty = AccountingPersistenceQuery. + getDefaultOrderingProperties(clz); + } + + Collection selectExpressions = new ArrayList<>(); + + selectExpressions.add(x("SUM(" + getQualifiedProperty(orderingProperty) + ")"). + as(orderingProperty)); + + selectExpressions.add(x(getQualifiedProperty(topKey)).as(topKey)); + + Expression whereExpression = + x(getQualifiedProperty(BasicUsageRecord.SCOPE)). + eq(s(currentScope)); + + whereExpression = whereExpression.and( + x(getQualifiedProperty(BasicUsageRecord.RECORD_TYPE)). + eq(s(recordType)).or( + x(getQualifiedProperty(BasicUsageRecord.USAGE_RECORD_TYPE)). + eq(s(recordType))) + ); + + long startTime = temporalConstraint.getAlignedStartTime() + .getTimeInMillis(); + whereExpression = whereExpression.and( + x(getQualifiedProperty(AggregatedRecord.START_TIME)).gt(startTime).or( + x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).gt(startTime))); + + long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); + whereExpression = whereExpression.and( + x(getQualifiedProperty(AggregatedRecord.END_TIME)).lt(endTime)).or( + x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).lt(endTime)); + + Set recordKeysSet = AccountingPersistenceQuery + .getQuerableKeys(clz.newInstance()); + + Collection keys = new TreeSet<>(); + + if (filters != null && filters.size() != 0) { + for (Filter filter : filters) { + + String filterKey = filter.getKey(); + String filterValue = filter.getValue(); + + if (filterKey != null && filterKey.compareTo("") != 0 + && recordKeysSet.contains(filterKey)) { + + if (filterValue != null && filterValue.compareTo("") != 0) { + if (keys.contains(filterKey)) { + throw new DuplicatedKeyFilterException( + "Only one value per Filter key is allowed"); + } + + whereExpression.and( + x(getQualifiedProperty(filterKey)).eq(filterValue)); + + keys.add(filterKey); + } else { + throw new KeyException( + String.format("Invalid %s : %s", + Filter.class.getSimpleName(), filter.toString())); + } + + } else { + throw new ValueException(String.format("Invalid %s : %s", + Filter.class.getSimpleName(), filter.toString())); + } + + } + } + + + Expression[] selectExpressionArray = + new Expression[selectExpressions.size()]; + selectExpressions.toArray(selectExpressionArray); + + Sort sort = Sort.desc(orderingProperty); + + LimitPath path = select(selectExpressionArray).from(bucketName) + .where(whereExpression).groupBy(topKey).orderBy(sort); + + logger.debug(path.toString()); + + + Comparator comparator = new Comparator() { + + @Override + public int compare(NumberedFilter o1, NumberedFilter o2) { + return - o1.compareTo(o2); + } + + }; + + SortedSet ret = new TreeSet<>(comparator); + + N1qlQueryResult result = bucket.query(path); + if (!result.finalSuccess()) { + logger.debug("{} failed : {}", + N1qlQueryResult.class.getSimpleName(), result.errors()); + throw new Exception("Query Failed :\n" + result.errors()); + } + + List rows = result.allRows(); + + for (N1qlQueryRow row : rows) { + try { + logger.trace("Row : {}", row.toString()); + JsonObject jsonObject = row.value(); + logger.trace("JsonObject : {}", row.toString()); + + String value = jsonObject.getString(topKey); + + Number n = jsonObject.getDouble(orderingProperty); + + NumberedFilter numberedFilter = + new NumberedFilter(topKey, value, n, orderingProperty); + + ret.add(numberedFilter); + + } catch (Exception e) { + logger.warn("Unable to eleborate result for {}", row.toString()); + } + + logger.trace("\n\n\n"); + } + + return ret; + } + }