diff --git a/pom.xml b/pom.xml
index fae78c3..8a72f76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
org.gcube.accounting
accounting-analytics-persistence-couchbase
- 1.2.0-SNAPSHOT
+ 1.3.0-SNAPSHOT
Accounting Analytics Persistence CouchBase
Accounting Analytics Persistence CouchBase Implementation
diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
index e9eb54c..659fcd9 100644
--- a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
+++ b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
@@ -31,6 +31,7 @@ import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum;
import org.gcube.accounting.analytics.UsageServiceValue;
+import org.gcube.accounting.analytics.UsageStorageValue;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.ValueException;
@@ -40,6 +41,7 @@ import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
+import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
@@ -69,6 +71,7 @@ import com.couchbase.client.java.view.ViewRow;
/**
* @author Luca Frosini (ISTI - CNR)
+ * @author Alessandro Pieve (ISTI - CNR) alessandro.pieve@isti.cnr.it
*
*/
public class AccountingPersistenceQueryCouchBase implements
@@ -81,13 +84,17 @@ AccountingPersistenceBackendQuery {
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY;
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
+
+ public static final String BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord";
+
+
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
- public static final String SEPARATOR_DESIGN_DOC="_";
+ public static final String DESIGN_DOC_ID_LIST_USAGE="ListUsage";
public static final long ENV_TIME_OUT=180000;
/* The environment configuration */
@@ -100,6 +107,11 @@ AccountingPersistenceBackendQuery {
protected Bucket bucketStorage;
protected String bucketNameStorage;
+
+ protected Bucket bucketStorageStatus;
+ protected String bucketNameStorageStatus;
+
+
protected Bucket bucketService;
protected String bucketNameService;
@@ -114,6 +126,13 @@ AccountingPersistenceBackendQuery {
private Map connectionMap;
+ protected static final String MAP_REDUCE__DESIGN = "";
+ protected static final String MAP_REDUCE_ALL = "all";
+ // Used in the name of map reduce to separate keys used as filter
+ protected static final String KEYS_SEPARATOR = "__";
+
+ protected static final String DESIGN_DOC_ID="top_";
+
/**
* {@inheritDoc}
*/
@@ -127,6 +146,9 @@ AccountingPersistenceBackendQuery {
cluster = CouchbaseCluster.create(ENV, url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
+
+ bucketNameStorageStatus = configuration.getProperty(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY);
+
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
@@ -135,9 +157,13 @@ AccountingPersistenceBackendQuery {
connectionMap = new HashMap();
bucketStorage = cluster.openBucket(bucketNameStorage, password);
-
connectionMap.put(BUCKET_STORAGE_NAME_PROPERTY_KEY, bucketStorage);
+ bucketStorageStatus = cluster.openBucket( bucketNameStorageStatus,password);
+ connectionMap.put(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY, bucketStorageStatus);
+
+ logger.debug("connectionMap"+connectionMap.toString());
+
bucketService = cluster.openBucket(bucketNameService, password);
connectionMap.put(BUCKET_SERVICE_NAME_PROPERTY_KEY, bucketService);
@@ -181,6 +207,16 @@ AccountingPersistenceBackendQuery {
return calendar;
}
+
+ /**
+ * IS NOT USED
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @return
+ * @throws Exception
+ */
+ @Deprecated
protected Map selectQuery(
Class extends AggregatedRecord, ?>> clz,
TemporalConstraint temporalConstraint, List filters)
@@ -309,13 +345,9 @@ AccountingPersistenceBackendQuery {
}
- protected static final String MAP_REDUCE__DESIGN = "";
- protected static final String MAP_REDUCE_ALL = "all";
- /**
- * Used in the name of map reduce to separate keys used as filter
- */
- protected static final String KEYS_SEPARATOR = "__";
+ //OLD METHOD OF DIVISION MAP_REDUCE DESIGN
+ @Deprecated
protected String getDesignDocId(
Class extends AggregatedRecord,?>> recordClass)
throws InstantiationException, IllegalAccessException {
@@ -324,7 +356,7 @@ AccountingPersistenceBackendQuery {
}
/**
- *
+ * New division of designDocId and map-reduce
* @param recordClass
* @param keys
* @return
@@ -344,6 +376,11 @@ AccountingPersistenceBackendQuery {
return getDesigndocIdSpecific;
}
+ /**
+ * generate a name of map-reduce view
+ * @param collection
+ * @return
+ */
public static String getMapReduceFunctionName(
Collection collection) {
String reduceFunction = MAP_REDUCE_ALL;
@@ -360,6 +397,163 @@ AccountingPersistenceBackendQuery {
return reduceFunction;
}
+ /**EXPERIMENTAL
+ * generate a name of design doc id for a top
+ * @param collection
+ * @return
+ */
+ public static String getDesignDocIdName(
+ Collection collection) {
+ String reduceFunction = MAP_REDUCE_ALL;
+ if (!collection.isEmpty()){
+ reduceFunction = null;
+ String property = collection.iterator().next();
+ reduceFunction = property;
+
+
+ }
+ return reduceFunction;
+ }
+
+ /*
+ * EXPERIMENTAL
+ *
+ * This method is used for call a map-reduce ListUsage
+ *the filter on which you want to know its use
+ *and the parameters you want to know have been used
+ *Example:
+ *Input:
+ * request interval (start date end-date)
+ * filter consumerId: alessandro.pieve
+ * Used parameters: Service Class
+ *Return:
+ * List of service class used by alessandro in the required period
+ */
+ public SortedMap getListUsage(Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters,String context,List parameters)throws Exception{
+ //TODO
+ String currentScope=null;
+ if (context==null)
+ currentScope = ScopeProvider.instance.get();
+ else
+ currentScope = context;
+ JsonArray startKey = JsonArray.create();
+ startKey.add(currentScope);
+ JsonArray endKey = JsonArray.create();
+ endKey.add(currentScope);
+
+ AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
+ JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(),aggregationMode, false, false);
+ JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false);
+
+ Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
+
+ Collection keys = new TreeSet<>();
+
+ if (filters != null && filters.size() != 0) {
+ // Sorting filter for call a mapreduce
+ Collections.sort(filters, new Comparator() {
+ @Override
+ public int compare(Filter filter1, Filter filter2)
+ {
+ int result =filter1.getKey().compareTo(filter2.getKey());
+ return result;
+ }
+ });
+ for (Filter filter : filters) {
+ String filterKey = filter.getKey();
+ String filterValue = filter.getValue();
+ if (filterKey != null && filterKey.compareTo("") != 0
+ && recordKeysSet.contains(filterKey)) {
+ if (filterValue != null && filterValue.compareTo("") != 0) {
+ if (keys.contains(filterKey)) {
+ throw new DuplicatedKeyFilterException(
+ "Only one value per Filter key is allowed");
+ }
+ startKey.add(filterValue);
+ endKey.add(filterValue);
+ keys.add(filterKey);
+ } else {
+ throw new KeyException(
+ String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ } else {
+ throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+ }
+ }
+ /*
+ int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
+ int groupLevel = scopeDateGroupLevel;
+ if (filters != null) {
+ groupLevel += keys.size();
+ }
+ */
+ for (Object temporal: temporalStartKey.toList()){
+ if (!temporal.toString().isEmpty())
+ startKey.add(temporal);
+ }
+ int count =1;
+ for (Object temporal: temporalEndKey.toList()){
+ if (!temporal.toString().isEmpty()){
+ //couchbase exclude last value
+ if (count==temporalEndKey.size())
+ temporal=(int)temporal+1;
+ endKey.add(temporal);
+ }
+ count++;
+ }
+ String viewName = getMapReduceFunctionName(keys);
+ if (parameters!=null){
+ for (String name:parameters){
+ viewName+=KEYS_SEPARATOR+name;
+ }
+ }
+ //TODO DA COMPLETARE con in piu' alle chiavi anche la lista parametri passata
+
+ ViewQuery query = ViewQuery.from(DESIGN_DOC_ID_LIST_USAGE, viewName);
+ query.inclusiveEnd();
+ //query.groupLevel(groupLevel);
+ query.reduce(false);
+ query.startKey(startKey);
+ query.endKey(endKey);
+ query.descending(false);
+
+ logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ + "Start Key : {}, End Key : {},"
+ + "temporalStartKey :{}, temporalEndKey :{}",
+ clz.getSimpleName(),DESIGN_DOC_ID_LIST_USAGE, viewName, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
+ SortedMap infos = new TreeMap<>();
+ ViewResult viewResult;
+ try {
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ throw e;
+ }
+
+ for (ViewRow row : viewResult) {
+
+ JsonArray array = (JsonArray) row.key();
+ Calendar calendar = getCalendarFromArray(array);
+ JsonObject value = (JsonObject) row.value();
+ JSONObject obj = new JSONObject(value.toString());
+ Info info = new Info(calendar, obj);
+ //infos.put(info);
+
+ }
+ //TODO DA Completare per la richiesta utente e servizi/altro utilizzati nel periodo
+
+
+ return null;
+
+
+ }
+
+
protected SortedMap mapReduceQuery(
Class extends AggregatedRecord, ?>> clz,
TemporalConstraint temporalConstraint, List filters,String context)
@@ -385,9 +579,7 @@ AccountingPersistenceBackendQuery {
temporalConstraint.getEndTime(),
aggregationMode, false, false);
-
- Set recordKeysSet = AccountingPersistenceQuery
- .getQuerableKeys(clz.newInstance());
+ Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection keys = new TreeSet<>();
@@ -407,27 +599,22 @@ AccountingPersistenceBackendQuery {
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
-
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
-
startKey.add(filterValue);
endKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
- String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
- throw new ValueException(String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
-
}
}
@@ -438,7 +625,6 @@ AccountingPersistenceBackendQuery {
if (filters != null) {
groupLevel += keys.size();
}
-
//String designDocId = getDesignDocId(clz);
String designDocId = getDesignDocIdSpecific(clz,keys);
//logger.trace("designDocIdNew :{}",designDocId);
@@ -541,7 +727,16 @@ AccountingPersistenceBackendQuery {
return map;
}
-
+ /**
+ * Used for calculate a top value
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @param topKey
+ * @param orderingProperty
+ * @return
+ * @throws Exception
+ */
@Override
public SortedMap> getTopValues(
Class extends AggregatedRecord, ?>> clz,
@@ -563,56 +758,296 @@ AccountingPersistenceBackendQuery {
SortedMap> ret =
new TreeMap<>(comparator);
- SortedSet top = getNextPossibleValues(clz,
- temporalConstraint, filters, topKey, orderingProperty);
+ SortedSet top=null;
+
+
+ //top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
+
+ if (usingNextPossibleValuesWithMap(clz,topKey,filters)){
+ logger.trace("getNextPossibleValues using map");
+ top = getNextPossibleValuesWithMap(clz,temporalConstraint, filters, topKey, orderingProperty);
+ }else{
+ logger.trace("getNextPossibleValues using query");
+ top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
+ }
+ logger.trace("getNextPossibleValues:{}",top.toString());
for(NumberedFilter nf : top){
filters.add(nf);
SortedMap map =
mapReduceQuery(clz, temporalConstraint, filters,null);
ret.put(nf, map);
-
filters.remove(nf);
}
+
return ret;
}
- @Override
- public SortedMap> getContextTimeSeries(
+ /**
+ * SPERIMENTAL
+ * Used for verify if have exist map for calculate a top
+ * @param clz
+ * @param topKey
+ * @return
+ */
+ protected boolean usingNextPossibleValuesWithMap(Class extends AggregatedRecord, ?>> clz, String topKey,List filters){
+
+
+ Collection keys = new TreeSet<>();
+ Set recordKeysSet;
+ try {
+ recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
+ keys.add(topKey);
+ if (filters != null && filters.size() != 0) {
+ // Sorting filter for call a mapreduce
+ Collections.sort(filters, new Comparator() {
+ @Override
+ public int compare(Filter filter1, Filter filter2)
+ {
+ int result =filter1.getKey().compareTo(filter2.getKey());
+ return result;
+ }
+ });
+ for (Filter filter : filters) {
+ String filterKey = filter.getKey();
+ String filterValue = filter.getValue();
+
+ if (filterKey != null && filterKey.compareTo("") != 0
+ && recordKeysSet.contains(filterKey)) {
+ if (filterValue != null && filterValue.compareTo("") != 0) {
+ if (keys.contains(filterKey)) {
+ throw new DuplicatedKeyFilterException(
+ "Only one value per Filter key is allowed");
+ }
+ keys.add(filterKey);
+ } else {
+ throw new KeyException(
+ String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ } else {
+ throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+ }
+ }
+ }catch (Exception e1) {
+ logger.warn("usingNextPossibleValuesWithMap -exception with filter:{}",filters.toString() );
+ return false;
+ }
+
+ String viewName=getMapReduceFunctionName(keys);
+ String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
+ ViewQuery query = ViewQuery.from(designDocId, viewName);
+ logger.debug("usingNextPossibleValuesWithMap using designDocId:{}, viewName:{}",designDocId,viewName);
+ ViewResult viewResult;
+ try {
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Calculate a next possible value with map
+ * (faster but with greater demand for resources)
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @param key
+ * @param orderingProperty
+ * @return
+ * @throws Exception
+ */
+ public SortedSet getNextPossibleValuesWithMap(
Class extends AggregatedRecord, ?>> clz,
- TemporalConstraint temporalConstraint, List filters,List contexts)
- throws Exception {
- logger.trace("getContextTimeSeries for contexts:{}",contexts.toString());
- SortedSet listContexts = new TreeSet();
- for (String context:contexts){
- Filter contextLabel= new Filter("context",context);
- listContexts.add(contextLabel);
+ TemporalConstraint temporalConstraint, List filters,
+ String key, String orderingProperty) throws Exception {
+
+ logger.debug("getNextPossibleValuesWithMap init");
+ String currentScope = ScopeProvider.instance.get();
+
+ if(orderingProperty==null){
+ orderingProperty = AccountingPersistenceQuery.
+ getDefaultOrderingProperties(clz);
}
- SortedMap> ret = new TreeMap<>();
- for(Filter nf : listContexts){
- logger.debug("detail time series :{}",nf.toString());
- SortedMap map =
- mapReduceQuery(clz, temporalConstraint, filters,nf.getValue());
- ret.put(nf, map);
- filters.remove(nf);
+
+
+ JsonArray startKey = JsonArray.create();
+ startKey.add(currentScope);
+ JsonArray endKey = JsonArray.create();
+ endKey.add(currentScope);
+
+ AggregationMode aggregationMode = temporalConstraint
+ .getAggregationMode();
+
+ JsonArray temporalStartKey = getRangeKey(
+ temporalConstraint.getStartTime(),
+ aggregationMode, false, false);
+
+ JsonArray temporalEndKey = getRangeKey(
+ temporalConstraint.getEndTime(),
+ aggregationMode, false, false);
+
+ Set recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
+
+ Collection keys = new TreeSet<>();
+
+ keys.add(key);
+
+ /*BEGIN DELETE ONLY FOR TEST*/
+ Collection selectExpressions = new ArrayList<>();
+ selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
+ " IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
+ as(orderingProperty));
+ selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
+ " IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
+ Expression whereExpression =
+ x(getSpecializedProperty(clz,BasicUsageRecord.SCOPE)).
+ eq(s(currentScope));
+ long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
+ whereExpression = whereExpression.and(
+ x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
+ );
+ long endTime = temporalConstraint.getEndTime();
+ whereExpression = whereExpression.and(
+ x(getSpecializedProperty(clz,AggregatedRecord.END_TIME)).lt(endTime)
+ );
+ Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
+ selectExpressions.toArray(selectExpressionArray);
+ Sort sort = Sort.desc(orderingProperty);
+ OffsetPath path = select(selectExpressionArray).from(connectionMap.get(clz.getSimpleName()).name())
+ .where(whereExpression).groupBy(key).orderBy(sort);
+
+
+
+
+
+ /*END DELETE ONLY FOR TEST*/
+ if (filters != null && filters.size() != 0) {
+ // Sorting filter for call a mapreduce
+ Collections.sort(filters, new Comparator() {
+ @Override
+ public int compare(Filter filter1, Filter filter2)
+ {
+ int result =filter1.getKey().compareTo(filter2.getKey());
+ return result;
+ }
+ });
+ for (Filter filter : filters) {
+ String filterKey = filter.getKey();
+ String filterValue = filter.getValue();
+
+ if (filterKey != null && filterKey.compareTo("") != 0
+ && recordKeysSet.contains(filterKey)) {
+ if (filterValue != null && filterValue.compareTo("") != 0) {
+ if (keys.contains(filterKey)) {
+ throw new DuplicatedKeyFilterException(
+ "Only one value per Filter key is allowed");
+ }
+ startKey.add(filterValue);
+ endKey.add(filterValue);
+ whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
+ keys.add(filterKey);
+ } else {
+ throw new KeyException(
+ String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+
+ } else {
+ throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
+ }
+ }
+ }
+
+ logger.debug("Alternative Query for top:"+path.toString());
+
+
+ int groupLevel=1;
+
+ for (Object temporal: temporalStartKey.toList()){
+ if (!temporal.toString().isEmpty())
+ startKey.add(temporal);
+ }
+ int count =1;
+ for (Object temporal: temporalEndKey.toList()){
+ if (!temporal.toString().isEmpty()){
+ //couchbase exclude last value
+ if (count==temporalEndKey.size())
+ temporal=(int)temporal+1;
+ endKey.add(temporal);
+ }
+ count++;
+ }
+ String viewName = getMapReduceFunctionName(keys);
+ String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
+ logger.trace("keys:{}",keys.toString());
+ ViewQuery query = ViewQuery.from(designDocId, viewName);
+ query.inclusiveEnd();
+ query.groupLevel(groupLevel);
+ query.startKey(startKey);
+ query.endKey(endKey);
+ query.descending(false);
+ logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ + "Group Level : {}, Start Key : {}, End Key : {},"
+ + "temporalStartKey :{}, temporalEndKey :{}",
+ clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
+
+ Comparator comparator = new Comparator() {
+ @Override
+ public int compare(NumberedFilter o1, NumberedFilter o2) {
+ int compareResult = -o1.compareTo(o2);
+ if(compareResult==0){
+ compareResult=1;
+ }
+ return compareResult;
+ }
+ };
+ SortedSet ret = new TreeSet<>(comparator);
+
+
+ ViewResult viewResult;
+ try {
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ throw e;
+ }
+
+ ViewRow row =viewResult.allRows().get(0);
+
+ JsonObject value = (JsonObject) row.value();
+ JSONObject objectValueTop = new JSONObject(value.toString());
+ Iterator> iterateJosn = objectValueTop.keys();
+ while( iterateJosn.hasNext() ) {
+ String keyTop = (String)iterateJosn.next();
+ Number n = (Number) objectValueTop.get(keyTop);
+ if (n==null)
+ n=0;
+ NumberedFilter numberedFilter = new NumberedFilter(key, keyTop, n, orderingProperty);
+ ret.add(numberedFilter);
}
return ret;
+
}
- protected String getQualifiedProperty(String property){
- //DEVELOPING
- return (property);
-
- }
-
- //Use for property into a specify bucket
- protected String getSpecializedProperty(Class extends AggregatedRecord, ?>> clz,String property){
-
- return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
- }
+ /**
+ * Calculate a next possible value with query
+ * (more slow but with fewer resources )
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @param key
+ * @param orderingProperty
+ * @return
+ * @throws Exception
+ */
@Override
public SortedSet getNextPossibleValues(
Class extends AggregatedRecord, ?>> clz,
@@ -628,13 +1063,10 @@ AccountingPersistenceBackendQuery {
}
Collection selectExpressions = new ArrayList<>();
- //add select expression
- //selectExpressions.add(x("SUM(" + getSpecializedProperty(clz,orderingProperty) + ")").as(orderingProperty));
//add select expression and check if exist
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
- //selectExpressions.add(x(getSpecializedProperty(clz,key)).as(key));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
@@ -647,7 +1079,6 @@ AccountingPersistenceBackendQuery {
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
);
- //long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
long endTime = temporalConstraint.getEndTime();
whereExpression = whereExpression.and(
@@ -674,26 +1105,20 @@ AccountingPersistenceBackendQuery {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
- whereExpression =
- whereExpression.and(
- x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
-
+ whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
keys.add(filterKey);
} else {
throw new KeyException(
- String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
- throw new ValueException(String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
- Expression[] selectExpressionArray =
- new Expression[selectExpressions.size()];
+ Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
Sort sort = Sort.desc(orderingProperty);
@@ -701,6 +1126,9 @@ AccountingPersistenceBackendQuery {
.where(whereExpression).groupBy(key).orderBy(sort);
logger.debug("Query for top:"+path.toString());
+
+
+
Comparator comparator = new Comparator() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
@@ -733,6 +1161,7 @@ AccountingPersistenceBackendQuery {
if (n==null)
n=0;
//logger.trace("pre:{}, key:{}, value:{}, n:{},orderingProperty:{}",jsonObject.toString(),key, value, n, orderingProperty);
+
NumberedFilter numberedFilter =
new NumberedFilter(key, value, n, orderingProperty);
@@ -748,6 +1177,59 @@ AccountingPersistenceBackendQuery {
return ret;
}
+ /**
+ * Return a list of context time series
+ * (used for portlet accounting context)
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @param contexts
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public SortedMap> getContextTimeSeries(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters,List contexts)
+ throws Exception {
+ logger.trace("getContextTimeSeries for contexts:{}",contexts.toString());
+ SortedSet listContexts = new TreeSet();
+ for (String context:contexts){
+ Filter contextLabel= new Filter("context",context);
+ listContexts.add(contextLabel);
+ }
+ SortedMap> ret = new TreeMap<>();
+ for(Filter nf : listContexts){
+ logger.debug("detail time series :{}",nf.toString());
+ SortedMap map =
+ mapReduceQuery(clz, temporalConstraint, filters,nf.getValue());
+ ret.put(nf, map);
+ filters.remove(nf);
+ }
+ return ret;
+ }
+
+ protected String getQualifiedProperty(String property){
+ //DEVELOPING
+ return (property);
+
+ }
+
+ //Use for property into a specify bucket
+ protected String getSpecializedProperty(Class extends AggregatedRecord, ?>> clz,String property){
+
+ return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
+ }
+
+ /**
+ * Used for list a possible values for each filter
+ * @param clz
+ * @param temporalConstraint
+ * @param filters
+ * @param key
+ * @return
+ * @throws Exception
+ */
@SuppressWarnings("deprecation")
@Override
public SortedSet getFilterValues(
@@ -811,6 +1293,16 @@ AccountingPersistenceBackendQuery {
return ret;
}
+
+
+ /**
+ * SPERIMENTAL now is not used
+ * @param clz
+ * @param temporalConstraint
+ * @param applicant
+ * @return
+ * @throws Exception
+ */
@SuppressWarnings("deprecation")
@Override
public JSONObject getUsageValue(
@@ -865,12 +1357,10 @@ AccountingPersistenceBackendQuery {
Collection keys = new TreeSet<>();
keys.add(applicant.getKey());
- //String designDocId = getDesignDocId(clz);
//ADD A SPECIFIY DESIGN DOC ID FAMILY
String designDocId = getDesignDocIdSpecific(clz,keys);
String viewName = applicant.getKey();
-
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
@@ -919,17 +1409,45 @@ AccountingPersistenceBackendQuery {
}
/**
- *
+ * SPERIMENTAL
+ * Used for calculate a usage value for each element of list
+ * (QUOTA)
+ * @param listUsage
+ * @return
+ * @throws Exception
*/
@Override
public List getUsageValueQuotaTotal(
List listUsage)
throws Exception {
+ logger.debug("getUsageValueQuotaTotal init with list:{}",listUsage);
+
//String currentScope = ScopeProvider.instance.get();
String keyOrderingProperty = null;
for (UsageValue totalFilters:listUsage){
+ logger.debug("----------------- init for totalFilters");
+ /*
+ gli arriva una lista di usage value da verificare...
+ gli usage value possono essere di diversi macro tipi:
+ storage
+ storage senza limite temporale (richiesta da per una quota totale )
+ utilizzo identifier come consumer id
+ utilizzo accounting_storage_status come bucket !
+ utilizzo quotaTotal come design
+ ed il nome della view sara composto da consumerid ed eventuali filtri (al momento non previsti)
+ storage con limiti temporali (richiesto per una quota parziale)
+ utilizzo identifier come consumer id
+ utilizzo accounting_storage_status come bucket !
+ utilizzo quota come design
+ ed il nome della view sara composto da consumerid ed eventuali filtri (al momento non previsti)
+ service
+ lista service con limite temporali
+ */
+
+
+
String currentScope = totalFilters.getContext();
Collection keys= new TreeSet<>();
@@ -938,8 +1456,11 @@ AccountingPersistenceBackendQuery {
JsonArray temporalStartKey=null;
JsonArray temporalEndKey=null;
+
+ logger.trace("temporalConstraint:{}",totalFilters);
TemporalConstraint temporalConstraint=totalFilters.getTemporalConstraint();
if (temporalConstraint==(null)){
+ //used for no temporalConstraint
logger.trace("Not found temporalConstraint");
Calendar startTime = Calendar.getInstance();
startTime.set(1970, Calendar.JANUARY, 1);
@@ -947,6 +1468,18 @@ AccountingPersistenceBackendQuery {
temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
+ //if type=storage then bucket =accounting_storage_status and designDocId = quotaTotal
+ //logger.trace("totalFilters.getClass().getSimpleName():{}",totalFilters.getClass().getSimpleName());
+ //logger.trace("totalFilters.getClz().getSimpleName():{}",totalFilters.getClz().getSimpleName());
+ if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
+ //TODO sarebbe possibile invece con il nome verificare che getClz e' un istanza di AggregatedStorageStatusRecord?
+ designDocId = "QuotaTotal";
+ }
+
+ }
+ else if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
+ logger.trace("AggregatedStorageStatusRecord with temporalConstraint");
+ designDocId = "Quota";
}
AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
@@ -1008,7 +1541,7 @@ AccountingPersistenceBackendQuery {
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
totalFilters.getClz().getSimpleName(),designDocId, viewNameTmp, groupLevelTmp, startKeyTmp, endKeyTmp,temporalStartKey.toString(), temporalEndKey.toString());
-
+ logger.trace("connectionMap:{}",connectionMap.toString());
ViewQuery query = ViewQuery.from(designDocId, viewNameTmp);
query.inclusiveEnd();
query.groupLevel(groupLevelTmp);
@@ -1027,7 +1560,7 @@ AccountingPersistenceBackendQuery {
Map map = new HashMap();
for (ViewRow row : viewResult) {
-
+ logger.trace("ViewRow row:{}",row.toString());
JsonObject jsnobject = (JsonObject) row.value();
JSONObject objJson = new JSONObject(jsnobject.toString());
@@ -1037,13 +1570,16 @@ AccountingPersistenceBackendQuery {
Float valuetmp=Float.parseFloat(objJson.get(key).toString());
+ //logger.debug("totalFilters.getClass().getSimpleName():{} UsageStorageValue.class.getSimpleName():{}", totalFilters.getClass().getSimpleName(),UsageStorageValue.class.getSimpleName());
+
if (key.equals("operationCount") || key.equals("dataVolume")){
if (map.containsKey(key)) {
map.put(key, valuetmp + map.get(key));
// TODO verify a better method
- if (designDocId.equals("StorageUsageRecord")){
+ if (totalFilters.getClass().getSimpleName().equals(UsageStorageValue.class.getSimpleName())){
+ logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
@@ -1054,6 +1590,7 @@ AccountingPersistenceBackendQuery {
}
}
else{
+ logger.debug("?UsageRecord -designDocId:{}",designDocId);
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
singleFilter.setD(singleFilter.getD()+valuetmp.doubleValue());
@@ -1067,7 +1604,8 @@ AccountingPersistenceBackendQuery {
map.put(key, valuetmp);
// TODO verify a better method
- if (designDocId.equals("StorageUsageRecord")){
+ if (totalFilters.getClass().getSimpleName().equals(UsageStorageValue.class.getSimpleName())){
+ logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
keyOrderingProperty=key;
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
@@ -1078,6 +1616,7 @@ AccountingPersistenceBackendQuery {
}
}
else{
+ logger.debug("?UsageRecord -designDocId:{}",designDocId);
keyOrderingProperty=key;
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
@@ -1094,7 +1633,13 @@ AccountingPersistenceBackendQuery {
totalFilters.setOrderingProperty(keyOrderingProperty);
totalFilters.setD(totalQuota);
}
+
+
+
return listUsage;
}
+
+
+
}
diff --git a/src/test/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBaseTest.java b/src/test/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBaseTest.java
index b67f74e..5568bfe 100644
--- a/src/test/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBaseTest.java
+++ b/src/test/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBaseTest.java
@@ -18,14 +18,15 @@ import org.gcube.accounting.analytics.FiltersValue;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
+import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.UsageServiceValue;
import org.gcube.accounting.analytics.UsageStorageValue;
import org.gcube.accounting.analytics.UsageValue;
-import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.aggregation.AggregatedJobUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
+import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
import org.gcube.common.scope.api.ScopeProvider;
@@ -257,11 +258,8 @@ public class AccountingPersistenceQueryCouchBaseTest {
filters1.add(new Filter("serviceClass", "VREManagement"));
simpleFilter1.setFiltersValue(filters1);
filterPackageQuota.add(simpleFilter1);
-
UsageValue totalfilter=new UsageServiceValue(context,"lucio.lelii",AggregatedServiceUsageRecord.class,temporalConstraint,filterPackageQuota);
-
-
/*ask quota for user lucio lelii
* SERVICE
* */
@@ -286,10 +284,7 @@ public class AccountingPersistenceQueryCouchBaseTest {
UsageValue totalfilter1=new UsageServiceValue("gcube","lucio.lelii",AggregatedServiceUsageRecord.class,temporalConstraint,filterPackageQuota1);
//totalfilter1.setTotalFilters(filterPackageQuota1);
-
-
-
-
+
/*ask quota for alessandro pieve
*STORAGE
**/
@@ -305,17 +300,46 @@ public class AccountingPersistenceQueryCouchBaseTest {
//TotalFilters totalfilter2=new TotalFilters("alessandro.pieve",AggregatedStorageUsageRecord.class,temporalConstraint,filterPackageQuota2);
- List filtersList=new ArrayList();
- //UsageValue totalfilter2=new UsageStorageValue("alessandro.pieve",AggregatedStorageUsageRecord.class,temporalConstraint);
+
- UsageValue totalfilter2=new UsageStorageValue(context,"alessandro.pieve",AggregatedStorageUsageRecord.class);
+ /****
+ *Example call storage status for each consumer id (quota total used )
+ */
+ //Call quota total for consumerID
+ UsageValue totalfilterStorageStatus=new UsageStorageValue(context,"name.surname",AggregatedStorageStatusRecord.class);
+ UsageValue totalfilterStorageStatus_2=new UsageStorageValue(context,"lucio.lelii",AggregatedStorageStatusRecord.class);
+ UsageValue totalfilterStorageStatus_3=new UsageStorageValue(context,"alessandro.pieve",AggregatedStorageStatusRecord.class);
+ UsageValue totalfilterStorageStatus_4=new UsageStorageValue(context,"giancarlo.panichi",AggregatedStorageStatusRecord.class);
+
+
+
+ /****
+ *Example call storage status for each consumer id( quota into period)
+ */
+ //get temporalConstraintStorage
+ Calendar startTimeStorage = Calendar.getInstance();
+ startTimeStorage.set(2015, Calendar.MAY, 1);
+ Calendar endTimeStorage = Calendar.getInstance();
+ endTimeStorage.set(2016, Calendar.DECEMBER, 18);
+ TemporalConstraint temporalConstraintStorage =new TemporalConstraint(startTimeStorage.getTimeInMillis(),
+ endTimeStorage.getTimeInMillis(), AggregationMode.DAILY);
+ UsageValue totalfilterStorageStatusPeriod=new UsageStorageValue(context,"name.surname",AggregatedStorageStatusRecord.class,temporalConstraintStorage);
+
+
+
+
+ //richiedo la lista di dati usati totali
List listTotalFilter=new ArrayList();
-// listTotalFilter.add(totalfilter);
-listTotalFilter.add(totalfilter1);
+ // listTotalFilter.add(totalfilter);
+ listTotalFilter.add(totalfilter1);
+ listTotalFilter.add(totalfilterStorageStatus);
+ listTotalFilter.add(totalfilterStorageStatus_2);
+ listTotalFilter.add(totalfilterStorageStatus_3);
+ listTotalFilter.add(totalfilterStorageStatus_4);
+ listTotalFilter.add(totalfilterStorageStatusPeriod);
- listTotalFilter.add(totalfilter2);
logger.info("filterPackageQuota:"+listTotalFilter);
List filterValue =
@@ -343,13 +367,13 @@ listTotalFilter.add(totalfilter1);
@Test
public void testTopService() throws Exception {
Calendar startTime = Calendar.getInstance();
- startTime.set(2016, Calendar.SEPTEMBER, 27);
+ startTime.set(2016, Calendar.AUGUST, 27);
Calendar endTime = Calendar.getInstance();
endTime.set(2016, Calendar.SEPTEMBER, 28,23,59);
List filters = new ArrayList();
-
+ filters.add(new Filter(AggregatedServiceUsageRecord.SERVICE_CLASS, "Common"));
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
@@ -360,12 +384,47 @@ listTotalFilter.add(totalfilter1);
SortedMap> set =
accountingPersistenceQueryCouchBase.getTopValues(
clz, temporalConstraint, filters,
- AggregatedServiceUsageRecord.CALLED_METHOD, null);
+ AggregatedServiceUsageRecord.HOST, null);
logger.debug("Result final{}", set);
}
+ @Test
+ public void testTopStorage() throws Exception {
+ Calendar startTime = Calendar.getInstance();
+ startTime.set(2016, Calendar.AUGUST, 27);
+ Calendar endTime = Calendar.getInstance();
+ endTime.set(2016, Calendar.SEPTEMBER, 28,23,59);
+
+
+ List filters = new ArrayList();
+
+ TemporalConstraint temporalConstraint =
+ new TemporalConstraint(startTime.getTimeInMillis(),
+ endTime.getTimeInMillis(), AggregationMode.DAILY);
+
+ Class clz =
+ AggregatedStorageUsageRecord.class;
+
+ SortedMap> set =
+ accountingPersistenceQueryCouchBase.getTopValues(
+ clz, temporalConstraint, filters,
+ AggregatedStorageUsageRecord.CONSUMER_ID, null);
+
+ logger.debug("Result final{}", set);
+
+ }
+
+
+
+
+
+
+
+
+
+
@Test
public void getQuerableKeyService() throws Exception{
SortedSet keys;
@@ -504,5 +563,25 @@ listTotalFilter.add(totalfilter1);
}
+ @Test
+ public void getListUsage() throws Exception{
+ Calendar startTime = Calendar.getInstance();
+ startTime.set(2015, Calendar.SEPTEMBER, 1);
+ Calendar endTime = Calendar.getInstance();
+ endTime.set(2016, Calendar.OCTOBER, 20,23,59);
+ List filters = new ArrayList();
+ filters.add(new Filter(AggregatedServiceUsageRecord.CONSUMER_ID, "valentina.marioli"));
+ TemporalConstraint temporalConstraint =
+ new TemporalConstraint(startTime.getTimeInMillis(),
+ endTime.getTimeInMillis(), AggregationMode.DAILY);
+ String context="/gcube/devNext";
+ List parameters=new ArrayList();
+ parameters.add("serviceClass");
+ parameters.add("serviceName");
+ Class clz =
+ AggregatedServiceUsageRecord.class;
+ SortedMap result= accountingPersistenceQueryCouchBase.getListUsage(clz,temporalConstraint,
+ filters,context,parameters);
+ }
}