/** * */ package org.gcube.accounting.analytics.persistence.couchbase; import static com.couchbase.client.java.query.Select.select; import static com.couchbase.client.java.query.dsl.Expression.s; import static com.couchbase.client.java.query.dsl.Expression.x; import java.security.KeyException; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; import org.gcube.accounting.analytics.NumberedFilter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum; import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; import org.gcube.accounting.analytics.exception.ValueException; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery; import org.gcube.accounting.datamodel.BasicUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.documentstore.records.AggregatedRecord; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.json.JsonArray; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.query.N1qlQueryResult; import com.couchbase.client.java.query.N1qlQueryRow; import com.couchbase.client.java.query.dsl.Expression; import com.couchbase.client.java.query.dsl.Sort; import com.couchbase.client.java.query.dsl.path.LimitPath; import com.couchbase.client.java.view.ViewQuery; import com.couchbase.client.java.view.ViewResult; import com.couchbase.client.java.view.ViewRow; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ public class AccountingPersistenceQueryCouchBase implements AccountingPersistenceBackendQuery { private static final Logger logger = LoggerFactory .getLogger(AccountingPersistenceQueryCouchBase.class); public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; // public static final String USERNAME_PROPERTY_KEY = // AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY; public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY; public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName"; /* The environment configuration */ protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment .builder().queryEnabled(true).build(); protected Cluster cluster; protected Bucket bucket; protected String bucketName; /** * {@inheritDoc} */ @Override public void prepareConnection( AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { String url = configuration.getProperty(URL_PROPERTY_KEY); // String username = configuration.getProperty(USERNAME_PROPERTY_KEY); String password = configuration.getProperty(PASSWORD_PROPERTY_KEY); cluster = CouchbaseCluster.create(ENV, url); bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY); bucket = cluster.openBucket(bucketName, password); } /** * {@inheritDoc} */ @Override public void close() throws Exception { cluster.disconnect(); } protected Calendar getCalendar(JSONObject obj, AggregationMode aggregationMode) throws NumberFormatException, JSONException { long millis; if (obj.has(AggregatedRecord.START_TIME)) { millis = new Long(obj.getString(AggregatedRecord.START_TIME)); logger.trace( "The result {} was from an aggregated record. Using {}", obj.toString(), AggregatedRecord.START_TIME); } else { millis = new Long(obj.getString(UsageRecord.CREATION_TIME)); logger.trace("The result {} was from single record. Using {}", obj.toString(), UsageRecord.CREATION_TIME); } Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, aggregationMode); logger.trace("{} has been aligned to {}", millis, calendar.getTimeInMillis()); return calendar; } // private Map selectQuery( // Class> clz, // TemporalConstraint temporalConstraint, List filters) // throws Exception { // // String currentScope = BasicUsageRecord.getScopeFromToken(); // String currentScope = ScopeProvider.instance.get(); // String recordType = clz.newInstance().getRecordType(); // // Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope)); // expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq( // s(recordType))); // // long startTime = temporalConstraint.getAlignedStartTime() // .getTimeInMillis(); // expression = expression.and(x(AggregatedRecord.START_TIME) // .gt(startTime).or( // x(AggregatedRecord.CREATION_TIME).gt(startTime))); // // long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); // expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime)) // .or(x(AggregatedRecord.CREATION_TIME).lt(endTime)); // // AggregationMode aggregationMode = temporalConstraint // .getAggregationMode(); // // TODO Aggregate Results // // if (filters != null) { // for (Filter filter : filters) { // expression = expression.and(x(filter.getKey()).eq( // s(filter.getValue()))); // } // } // // GroupByPath groupByPath = select("*").from(bucketName) // .where(expression); // // Map map = new HashMap(); // // N1qlQueryResult result = bucket.query(groupByPath); // if (!result.finalSuccess()) { // logger.debug("{} failed : {}", // N1qlQueryResult.class.getSimpleName(), result.errors()); // return map; // } // // List rows = result.allRows(); // // for (N1qlQueryRow row : rows) { // try { // logger.trace("Row : {}", row.toString()); // JsonObject jsonObject = row.value().getObject(bucketName); // logger.trace("JsonObject : {}", row.toString()); // String recordString = jsonObject.toMap().toString(); // logger.trace("Record String : {}", recordString); // Record record = RecordUtility.getRecord(recordString); // // JSONObject obj = new JSONObject(jsonObject.toString()); // Calendar calendar = getCalendar(obj, aggregationMode); // if (map.containsKey(calendar)) { // Info info = map.get(calendar); // JSONObject value = info.getValue(); // jsonObject.toMap(); // } else { // map.put(calendar, new Info(calendar, obj)); // } // } catch (Exception e) { // logger.warn("Unable to eleborate result for {}", row.toString()); // } // // logger.trace("\n\n\n"); // } // // return map; // } protected Calendar getCalendarFromArray(JsonArray array) throws JSONException { boolean startFound = false; Calendar calendar = Calendar .getInstance(TemporalConstraint.DEFAULT_TIME_ZONE); int count = 0; CalendarEnum[] calendarValues = CalendarEnum.values(); for (int i = 0; i < array.size(); i++) { try { int value = array.getInt(i); int calendarValue = calendarValues[count].getCalendarValue(); if (calendarValue == Calendar.MONTH) { value--; } calendar.set(calendarValue, value); count++; startFound = true; } catch (Exception e) { /* logger.trace("The provide value is not an int. {}", array .get(i).toString()); */ if (startFound) { break; } } } for (int j = count; j < calendarValues.length; j++) { if (calendarValues[j].getCalendarValue() == Calendar.DAY_OF_MONTH) { calendar.set(calendarValues[j].getCalendarValue(), 1); } else { calendar.set(calendarValues[j].getCalendarValue(), 0); } } return calendar; } protected JsonArray getRangeKey(long time, AggregationMode aggregationMode, boolean wildCard, boolean endKey) throws JSONException { JsonArray array = JsonArray.create(); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); CalendarEnum[] values = CalendarEnum.values(); if (endKey) { calendar.add(values[aggregationMode.ordinal()].getCalendarValue(), 1); } for (int i = 0; i <= aggregationMode.ordinal(); i++) { int value = calendar.get(values[i].getCalendarValue()); if (values[i].getCalendarValue() == Calendar.MONTH) { value = value + 1; } array.add(value); } if (wildCard) { array.add("{}"); } return array; } protected static final String MAP_REDUCE__DESIGN = "_design/"; protected static final String MAP_REDUCE_ALL = "all"; /** * Used in the name of map reduce to separate keys used as filter */ protected static final String KEYS_SEPARATOR = "__"; protected String getDesignDocId( Class> recordClass) throws InstantiationException, IllegalAccessException { return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass .newInstance().getRecordType()); } public static String getMapReduceFunctionName( Collection collection) { String reduceFunction = MAP_REDUCE_ALL; for (String property : collection) { if (reduceFunction == null) { reduceFunction = property; } else { reduceFunction = reduceFunction + KEYS_SEPARATOR + property; } } return reduceFunction; } protected SortedMap mapReduceQuery( Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { // String currentScope = BasicUsageRecord.getScopeFromToken(); String currentScope = ScopeProvider.instance.get(); JsonArray startKey = JsonArray.create(); startKey.add(currentScope); JsonArray endKey = JsonArray.create(); endKey.add(currentScope); AggregationMode aggregationMode = temporalConstraint .getAggregationMode(); JsonArray temporalStartKey = getRangeKey( temporalConstraint.getStartTime(), aggregationMode, false, false); JsonArray temporalEndKey = getRangeKey( temporalConstraint.getEndTime(), aggregationMode, true, false); Set recordKeysSet = AccountingPersistenceQuery .getQuerableKeys(clz.newInstance()); Collection keys = new TreeSet<>(); JsonArray filterStartKey = JsonArray.create(); JsonArray filterEndKey = JsonArray.create(); if (filters != null && filters.size() != 0) { for (Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if (filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if (filterValue != null && filterValue.compareTo("") != 0) { if (keys.contains(filterKey)) { throw new DuplicatedKeyFilterException( "Only one value per Filter key is allowed"); } filterStartKey.add(filterValue); filterEndKey.add(filterValue); keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException(String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } // +1 because mode start from 0 // +1 because of scope at the beginning int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1; int groupLevel = scopeDateGroupLevel; if (filters != null) { groupLevel += keys.size(); } String designDocId = getDesignDocId(clz); startKey.add(filterStartKey.toList()); startKey.add(temporalStartKey.toList()); endKey.add(filterEndKey.toList()); endKey.add(temporalEndKey.toList()); String viewName = getMapReduceFunctionName(keys); ViewQuery query = ViewQuery.from(designDocId, viewName); query.group(true); query.groupLevel(groupLevel); query.startKey(startKey); query.endKey(endKey); query.descending(false); logger.trace("Design Doc ID : {}, View Name : {}, " + "Group Level : {}, Start Key : {}, End Key : {}", designDocId, viewName, groupLevel, startKey, endKey); SortedMap infos = new TreeMap<>(); ViewResult viewResult; try { viewResult = bucket.query(query); } catch (Exception e) { throw e; } for (ViewRow row : viewResult) { JsonArray array = (JsonArray) row.key(); Calendar calendar = getCalendarFromArray(array); JsonObject value = (JsonObject) row.value(); JSONObject obj = new JSONObject(value.toString()); Info info = new Info(calendar, obj); infos.put(calendar, info); } return infos; } @Override public SortedMap getTimeSeries( Class> clz, TemporalConstraint temporalConstraint, List filters) throws Exception { return mapReduceQuery(clz, temporalConstraint, filters); } @Override public SortedMap> getTopValues( Class> clz, TemporalConstraint temporalConstraint, List filters, String topKey, String orderingProperty) throws Exception { // TODO Auto-generated method stub return null; } protected String getQualifiedProperty(String property){ return String.format("%s.%s", bucketName, property); } @SuppressWarnings("deprecation") @Override public SortedSet getNextPossibleValues( Class> clz, TemporalConstraint temporalConstraint, List filters, String topKey, String orderingProperty) throws Exception { /* SELECT SUM(accounting.dataVolume) as dataVolume, // SUM(accounting.operationCount) as operationCount, // Filter List e.g. accounting.consumerId as consumerId, accounting.serviceClass as serviceClass, // topKey e.g accounting.serviceName as serviceName FROM accounting WHERE accounting.scope="/gcube/devsec" AND (accounting.recordType="StorageUsageRecord" OR accounting.usageRecordType="StorageUsageRecord") AND AND (accounting.startTime >= temporalConstraint.startTime AND accounting.endTime >= temporalConstraint.endTime) // Filter List e.g AND accounting.consumerId="luca.frosini" AND accounting.serviceClass="VREManagement" // topKey GROUP BY accounting.serviceName ORDER BY dataVolume DESC // ORDER BY operationsCount DESC LIMIT 10 */ // String currentScope = BasicUsageRecord.getScopeFromToken(); String currentScope = ScopeProvider.instance.get(); String recordType = clz.newInstance().getRecordType(); if(orderingProperty==null){ orderingProperty = AccountingPersistenceQuery. getDefaultOrderingProperties(clz); } Collection selectExpressions = new ArrayList<>(); selectExpressions.add(x("SUM(" + getQualifiedProperty(orderingProperty) + ")"). as(orderingProperty)); selectExpressions.add(x(getQualifiedProperty(topKey)).as(topKey)); Expression whereExpression = x(getQualifiedProperty(BasicUsageRecord.SCOPE)). eq(s(currentScope)); whereExpression = whereExpression.and( x(getQualifiedProperty(BasicUsageRecord.RECORD_TYPE)). eq(s(recordType)).or( x(getQualifiedProperty(BasicUsageRecord.USAGE_RECORD_TYPE)). eq(s(recordType))) ); long startTime = temporalConstraint.getAlignedStartTime() .getTimeInMillis(); whereExpression = whereExpression.and( x(getQualifiedProperty(AggregatedRecord.START_TIME)).gt(startTime).or( x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).gt(startTime))); long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); whereExpression = whereExpression.and( x(getQualifiedProperty(AggregatedRecord.END_TIME)).lt(endTime)).or( x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).lt(endTime)); Set recordKeysSet = AccountingPersistenceQuery .getQuerableKeys(clz.newInstance()); Collection keys = new TreeSet<>(); if (filters != null && filters.size() != 0) { for (Filter filter : filters) { String filterKey = filter.getKey(); String filterValue = filter.getValue(); if (filterKey != null && filterKey.compareTo("") != 0 && recordKeysSet.contains(filterKey)) { if (filterValue != null && filterValue.compareTo("") != 0) { if (keys.contains(filterKey)) { throw new DuplicatedKeyFilterException( "Only one value per Filter key is allowed"); } whereExpression.and( x(getQualifiedProperty(filterKey)).eq(filterValue)); keys.add(filterKey); } else { throw new KeyException( String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } else { throw new ValueException(String.format("Invalid %s : %s", Filter.class.getSimpleName(), filter.toString())); } } } Expression[] selectExpressionArray = new Expression[selectExpressions.size()]; selectExpressions.toArray(selectExpressionArray); Sort sort = Sort.desc(orderingProperty); LimitPath path = select(selectExpressionArray).from(bucketName) .where(whereExpression).groupBy(topKey).orderBy(sort); logger.debug(path.toString()); Comparator comparator = new Comparator() { @Override public int compare(NumberedFilter o1, NumberedFilter o2) { return - o1.compareTo(o2); } }; SortedSet ret = new TreeSet<>(comparator); N1qlQueryResult result = bucket.query(path); if (!result.finalSuccess()) { logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); throw new Exception("Query Failed :\n" + result.errors()); } List rows = result.allRows(); for (N1qlQueryRow row : rows) { try { logger.trace("Row : {}", row.toString()); JsonObject jsonObject = row.value(); logger.trace("JsonObject : {}", row.toString()); String value = jsonObject.getString(topKey); Number n = jsonObject.getDouble(orderingProperty); NumberedFilter numberedFilter = new NumberedFilter(topKey, value, n, orderingProperty); ret.add(numberedFilter); } catch (Exception e) { logger.warn("Unable to eleborate result for {}", row.toString()); } logger.trace("\n\n\n"); } return ret; } }