/** * */ package org.gcube.accounting.analytics.persistence.couchbase; import static com.couchbase.client.java.query.Select.select; import static com.couchbase.client.java.query.dsl.Expression.s; import static com.couchbase.client.java.query.dsl.Expression.x; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.datamodel.BasicUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.persistence.AccountingPersistence; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.query.N1qlQueryResult; import com.couchbase.client.java.query.N1qlQueryRow; import com.couchbase.client.java.query.dsl.Expression; import com.couchbase.client.java.query.dsl.path.GroupByPath; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ public class AccountingPersistenceQueryCouchBase extends AccountingPersistenceBackendQuery { private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryCouchBase.class); public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; //public static final String USERNAME_PROPERTY_KEY = AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY; public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY; public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName"; /* The environment configuration */ protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment.builder() .queryEnabled(true) .build(); protected Cluster cluster; protected Bucket bucket; protected String bucketName; /** * {@inheritDoc} */ @Override protected void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { String url = configuration.getProperty(URL_PROPERTY_KEY); // String username = configuration.getProperty(USERNAME_PROPERTY_KEY); String password = configuration.getProperty(PASSWORD_PROPERTY_KEY); cluster = CouchbaseCluster.create(ENV, url); bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY); bucket = cluster.openBucket(bucketName, password); } /** * {@inheritDoc} */ @Override public void close() throws Exception { cluster.disconnect(); } protected Calendar getCalendar(JSONObject obj, AggregationMode aggregationMode) throws NumberFormatException, JSONException{ long millis; if(obj.has(AggregatedRecord.START_TIME)){ millis = new Long(obj.getString(AggregatedRecord.START_TIME)); logger.trace("The result {} was from an aggregated record. Using {}", obj.toString(), AggregatedRecord.START_TIME); }else{ millis = new Long(obj.getString(UsageRecord.CREATION_TIME)); logger.trace("The result {} was from single record. Using {}", obj.toString(), UsageRecord.CREATION_TIME); } Calendar calendar = TemporalConstraint.getAlignedCalendar(millis, aggregationMode); logger.trace("{} has been aligned to {}", millis, calendar.getTimeInMillis()); return calendar; } /** * {@inheritDoc} */ @Override protected Map reallyQuery( @SuppressWarnings("rawtypes") Class recordClass, TemporalConstraint temporalConstraint, List filters) throws Exception { //String currentScope = BasicUsageRecord.getScopeFromToken(); String currentScope = ScopeProvider.instance.get(); String recordType = recordClass.newInstance().getRecordType(); Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope)); expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(s(recordType))); long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis(); expression = expression.and( x(AggregatedRecord.START_TIME).gt(startTime). or(x(AggregatedRecord.CREATION_TIME).gt(startTime))); long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis(); expression = expression.and( x(AggregatedRecord.END_TIME).lt(endTime)). or(x(AggregatedRecord.CREATION_TIME).lt(endTime)); AggregationMode aggregationMode = temporalConstraint.getAggregationMode(); if(filters!=null){ for(Filter filter : filters){ expression = expression.and(x(filter.getKey()).eq(s(filter.getValue()))); } } GroupByPath groupByPath = select("*").from(bucketName).where(expression); N1qlQueryResult result = bucket.query(groupByPath); List rows = result.allRows(); Map map = new HashMap(); for(N1qlQueryRow row : rows){ logger.debug(row.toString()); JsonObject jsonObject = row.value().getObject(bucketName); Record record = RecordUtility.getRecord(jsonObject.toMap().toString()); JSONObject obj = new JSONObject(jsonObject.toString()); Calendar calendar = getCalendar(obj, aggregationMode); if(map.containsKey(calendar)){ Info info = map.get(calendar); JSONObject value = info.getValue(); jsonObject.toMap(); }else{ map.put(calendar, new Info(calendar, obj)); } } return map; } /** * {@inheritDoc} */ @Override public Set getKeys(Class recordClass) throws Exception { // TODO Auto-generated method stub return null; } /** * {@inheritDoc} */ @Override public Set getPossibleValuesForKey( Class recordClass, String key) throws Exception { // TODO Auto-generated method stub // SELECT DISTINCT return null; } /* (non-Javadoc) * @see org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery#getPossibleValuesForKey(java.lang.Class, java.lang.String, int) */ @Override public Set getPossibleValuesForKey( Class recordClass, String key, int limit) throws Exception { // TODO Auto-generated method stub return null; } }