Calculate top from map and if not exist use a query

Add a new bucket storage status and new method for calculate usage value quota "getUsageValueQuotaTotal"

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-analytics-persistence-couchbase@141629 82a268e6-3cf1-43bd-a215-b396298e98cf
feature/19115
Alessandro Pieve 7 years ago
parent 3ecf8938e5
commit eb4d8e3330

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-analytics-persistence-couchbase</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<name>Accounting Analytics Persistence CouchBase</name>
<description>Accounting Analytics Persistence CouchBase Implementation</description>

@ -31,6 +31,7 @@ import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum;
import org.gcube.accounting.analytics.UsageServiceValue;
import org.gcube.accounting.analytics.UsageStorageValue;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.ValueException;
@ -40,6 +41,7 @@ import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
@ -69,6 +71,7 @@ import com.couchbase.client.java.view.ViewRow;
/**
* @author Luca Frosini (ISTI - CNR)
* @author Alessandro Pieve (ISTI - CNR) alessandro.pieve@isti.cnr.it
*
*/
public class AccountingPersistenceQueryCouchBase implements
@ -81,13 +84,17 @@ AccountingPersistenceBackendQuery {
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY;
public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
public static final String BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY="AggregatedStorageStatusRecord";
public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
public static final String SEPARATOR_DESIGN_DOC="_";
public static final String DESIGN_DOC_ID_LIST_USAGE="ListUsage";
public static final long ENV_TIME_OUT=180000;
/* The environment configuration */
@ -100,6 +107,11 @@ AccountingPersistenceBackendQuery {
protected Bucket bucketStorage;
protected String bucketNameStorage;
protected Bucket bucketStorageStatus;
protected String bucketNameStorageStatus;
protected Bucket bucketService;
protected String bucketNameService;
@ -114,6 +126,13 @@ AccountingPersistenceBackendQuery {
private Map <String, Bucket> connectionMap;
protected static final String MAP_REDUCE__DESIGN = "";
protected static final String MAP_REDUCE_ALL = "all";
// Used in the name of map reduce to separate keys used as filter
protected static final String KEYS_SEPARATOR = "__";
protected static final String DESIGN_DOC_ID="top_";
/**
* {@inheritDoc}
*/
@ -127,6 +146,9 @@ AccountingPersistenceBackendQuery {
cluster = CouchbaseCluster.create(ENV, url);
bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
bucketNameStorageStatus = configuration.getProperty(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY);
bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
@ -135,9 +157,13 @@ AccountingPersistenceBackendQuery {
connectionMap = new HashMap<String, Bucket>();
bucketStorage = cluster.openBucket(bucketNameStorage, password);
connectionMap.put(BUCKET_STORAGE_NAME_PROPERTY_KEY, bucketStorage);
bucketStorageStatus = cluster.openBucket( bucketNameStorageStatus,password);
connectionMap.put(BUCKET_STORAGESTATUS_NAME_PROPERTY_KEY, bucketStorageStatus);
logger.debug("connectionMap"+connectionMap.toString());
bucketService = cluster.openBucket(bucketNameService, password);
connectionMap.put(BUCKET_SERVICE_NAME_PROPERTY_KEY, bucketService);
@ -181,6 +207,16 @@ AccountingPersistenceBackendQuery {
return calendar;
}
/**
* IS NOT USED
* @param clz
* @param temporalConstraint
* @param filters
* @return
* @throws Exception
*/
@Deprecated
protected Map<Calendar, Info> selectQuery(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters)
@ -309,13 +345,9 @@ AccountingPersistenceBackendQuery {
}
protected static final String MAP_REDUCE__DESIGN = "";
protected static final String MAP_REDUCE_ALL = "all";
/**
* Used in the name of map reduce to separate keys used as filter
*/
protected static final String KEYS_SEPARATOR = "__";
//OLD METHOD OF DIVISION MAP_REDUCE DESIGN
@Deprecated
protected String getDesignDocId(
Class<? extends AggregatedRecord<?,?>> recordClass)
throws InstantiationException, IllegalAccessException {
@ -324,7 +356,7 @@ AccountingPersistenceBackendQuery {
}
/**
*
* New division of designDocId and map-reduce
* @param recordClass
* @param keys
* @return
@ -344,6 +376,11 @@ AccountingPersistenceBackendQuery {
return getDesigndocIdSpecific;
}
/**
* generate a name of map-reduce view
* @param collection
* @return
*/
public static String getMapReduceFunctionName(
Collection<String> collection) {
String reduceFunction = MAP_REDUCE_ALL;
@ -360,6 +397,163 @@ AccountingPersistenceBackendQuery {
return reduceFunction;
}
/**EXPERIMENTAL
* generate a name of design doc id for a top
* @param collection
* @return
*/
public static String getDesignDocIdName(
Collection<String> collection) {
String reduceFunction = MAP_REDUCE_ALL;
if (!collection.isEmpty()){
reduceFunction = null;
String property = collection.iterator().next();
reduceFunction = property;
}
return reduceFunction;
}
/*
* EXPERIMENTAL
*
* This method is used for call a map-reduce ListUsage
*the filter on which you want to know its use
*and the parameters you want to know have been used
*Example:
*Input:
* request interval (start date end-date)
* filter consumerId: alessandro.pieve
* Used parameters: Service Class
*Return:
* List of service class used by alessandro in the required period
*/
public SortedMap<String,Integer> getListUsage(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,String context,List<String> parameters)throws Exception{
//TODO
String currentScope=null;
if (context==null)
currentScope = ScopeProvider.instance.get();
else
currentScope = context;
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
JsonArray endKey = JsonArray.create();
endKey.add(currentScope);
AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(temporalConstraint.getStartTime(),aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(temporalConstraint.getEndTime(), aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
/*
int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
int groupLevel = scopeDateGroupLevel;
if (filters != null) {
groupLevel += keys.size();
}
*/
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
String viewName = getMapReduceFunctionName(keys);
if (parameters!=null){
for (String name:parameters){
viewName+=KEYS_SEPARATOR+name;
}
}
//TODO DA COMPLETARE con in piu' alle chiavi anche la lista parametri passata
ViewQuery query = ViewQuery.from(DESIGN_DOC_ID_LIST_USAGE, viewName);
query.inclusiveEnd();
//query.groupLevel(groupLevel);
query.reduce(false);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),DESIGN_DOC_ID_LIST_USAGE, viewName, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
SortedMap<String, Integer> infos = new TreeMap<>();
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
for (ViewRow row : viewResult) {
JsonArray array = (JsonArray) row.key();
Calendar calendar = getCalendarFromArray(array);
JsonObject value = (JsonObject) row.value();
JSONObject obj = new JSONObject(value.toString());
Info info = new Info(calendar, obj);
//infos.put(info);
}
//TODO DA Completare per la richiesta utente e servizi/altro utilizzati nel periodo
return null;
}
protected SortedMap<Calendar, Info> mapReduceQuery(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,String context)
@ -385,9 +579,7 @@ AccountingPersistenceBackendQuery {
temporalConstraint.getEndTime(),
aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery
.getQuerableKeys(clz.newInstance());
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
@ -407,27 +599,22 @@ AccountingPersistenceBackendQuery {
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
@ -438,7 +625,6 @@ AccountingPersistenceBackendQuery {
if (filters != null) {
groupLevel += keys.size();
}
//String designDocId = getDesignDocId(clz);
String designDocId = getDesignDocIdSpecific(clz,keys);
//logger.trace("designDocIdNew :{}",designDocId);
@ -541,7 +727,16 @@ AccountingPersistenceBackendQuery {
return map;
}
/**
* Used for calculate a top value
* @param clz
* @param temporalConstraint
* @param filters
* @param topKey
* @param orderingProperty
* @return
* @throws Exception
*/
@Override
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(
Class<? extends AggregatedRecord<?, ?>> clz,
@ -563,56 +758,296 @@ AccountingPersistenceBackendQuery {
SortedMap<NumberedFilter, SortedMap<Calendar, Info>> ret =
new TreeMap<>(comparator);
SortedSet<NumberedFilter> top = getNextPossibleValues(clz,
temporalConstraint, filters, topKey, orderingProperty);
SortedSet<NumberedFilter> top=null;
//top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
if (usingNextPossibleValuesWithMap(clz,topKey,filters)){
logger.trace("getNextPossibleValues using map");
top = getNextPossibleValuesWithMap(clz,temporalConstraint, filters, topKey, orderingProperty);
}else{
logger.trace("getNextPossibleValues using query");
top = getNextPossibleValues(clz,temporalConstraint, filters, topKey, orderingProperty);
}
logger.trace("getNextPossibleValues:{}",top.toString());
for(NumberedFilter nf : top){
filters.add(nf);
SortedMap<Calendar, Info> map =
mapReduceQuery(clz, temporalConstraint, filters,null);
ret.put(nf, map);
filters.remove(nf);
}
return ret;
}
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries(
/**
* SPERIMENTAL
* Used for verify if have exist map for calculate a top
* @param clz
* @param topKey
* @return
*/
protected boolean usingNextPossibleValuesWithMap(Class<? extends AggregatedRecord<?, ?>> clz, String topKey,List<Filter> filters){
Collection<String> keys = new TreeSet<>();
Set<String> recordKeysSet;
try {
recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
keys.add(topKey);
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
}catch (Exception e1) {
logger.warn("usingNextPossibleValuesWithMap -exception with filter:{}",filters.toString() );
return false;
}
String viewName=getMapReduceFunctionName(keys);
String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
ViewQuery query = ViewQuery.from(designDocId, viewName);
logger.debug("usingNextPossibleValuesWithMap using designDocId:{}, viewName:{}",designDocId,viewName);
ViewResult viewResult;
try {
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
return false;
}
return true;
}
/**
* Calculate a next possible value with map
* (faster but with greater demand for resources)
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @param orderingProperty
* @return
* @throws Exception
*/
public SortedSet<NumberedFilter> getNextPossibleValuesWithMap(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,List<String> contexts)
throws Exception {
logger.trace("getContextTimeSeries for contexts:{}",contexts.toString());
SortedSet<Filter> listContexts = new TreeSet();
for (String context:contexts){
Filter contextLabel= new Filter("context",context);
listContexts.add(contextLabel);
TemporalConstraint temporalConstraint, List<Filter> filters,
String key, String orderingProperty) throws Exception {
logger.debug("getNextPossibleValuesWithMap init");
String currentScope = ScopeProvider.instance.get();
if(orderingProperty==null){
orderingProperty = AccountingPersistenceQuery.
getDefaultOrderingProperties(clz);
}
SortedMap<Filter, SortedMap<Calendar, Info>> ret = new TreeMap<>();
for(Filter nf : listContexts){
logger.debug("detail time series :{}",nf.toString());
SortedMap<Calendar, Info> map =
mapReduceQuery(clz, temporalConstraint, filters,nf.getValue());
ret.put(nf, map);
filters.remove(nf);
JsonArray startKey = JsonArray.create();
startKey.add(currentScope);
JsonArray endKey = JsonArray.create();
endKey.add(currentScope);
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
Set<String> recordKeysSet = AccountingPersistenceQuery.getQuerableKeys(clz.newInstance());
Collection<String> keys = new TreeSet<>();
keys.add(key);
/*BEGIN DELETE ONLY FOR TEST*/
Collection<Expression> selectExpressions = new ArrayList<>();
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
Expression whereExpression =
x(getSpecializedProperty(clz,BasicUsageRecord.SCOPE)).
eq(s(currentScope));
long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
);
long endTime = temporalConstraint.getEndTime();
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.END_TIME)).lt(endTime)
);
Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
Sort sort = Sort.desc(orderingProperty);
OffsetPath path = select(selectExpressionArray).from(connectionMap.get(clz.getSimpleName()).name())
.where(whereExpression).groupBy(key).orderBy(sort);
/*END DELETE ONLY FOR TEST*/
if (filters != null && filters.size() != 0) {
// Sorting filter for call a mapreduce
Collections.sort(filters, new Comparator<Filter>() {
@Override
public int compare(Filter filter1, Filter filter2)
{
int result =filter1.getKey().compareTo(filter2.getKey());
return result;
}
});
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
startKey.add(filterValue);
endKey.add(filterValue);
whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
return ret;
}
logger.debug("Alternative Query for top:"+path.toString());
protected String getQualifiedProperty(String property){
//DEVELOPING
return (property);
}
int groupLevel=1;
//Use for property into a specify bucket
protected String getSpecializedProperty(Class<? extends AggregatedRecord<?, ?>> clz,String property){
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKey.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKey.add(temporal);
}
count++;
}
String viewName = getMapReduceFunctionName(keys);
String designDocId =DESIGN_DOC_ID+getDesignDocIdName(keys);
logger.trace("keys:{}",keys.toString());
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
query.startKey(startKey);
query.endKey(endKey);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
int compareResult = -o1.compareTo(o2);
if(compareResult==0){
compareResult=1;
}
return compareResult;
}
};
SortedSet<NumberedFilter> ret = new TreeSet<>(comparator);
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
ViewRow row =viewResult.allRows().get(0);
JsonObject value = (JsonObject) row.value();
JSONObject objectValueTop = new JSONObject(value.toString());
Iterator<?> iterateJosn = objectValueTop.keys();
while( iterateJosn.hasNext() ) {
String keyTop = (String)iterateJosn.next();
Number n = (Number) objectValueTop.get(keyTop);
if (n==null)
n=0;
NumberedFilter numberedFilter = new NumberedFilter(key, keyTop, n, orderingProperty);
ret.add(numberedFilter);
}
return ret;
return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
}
/**
* Calculate a next possible value with query
* (more slow but with fewer resources )
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @param orderingProperty
* @return
* @throws Exception
*/
@Override
public SortedSet<NumberedFilter> getNextPossibleValues(
Class<? extends AggregatedRecord<?, ?>> clz,
@ -628,13 +1063,10 @@ AccountingPersistenceBackendQuery {
}
Collection<Expression> selectExpressions = new ArrayList<>();
//add select expression
//selectExpressions.add(x("SUM(" + getSpecializedProperty(clz,orderingProperty) + ")").as(orderingProperty));
//add select expression and check if exist
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
//selectExpressions.add(x(getSpecializedProperty(clz,key)).as(key));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
@ -647,7 +1079,6 @@ AccountingPersistenceBackendQuery {
whereExpression = whereExpression.and(
x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
);
//long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
long endTime = temporalConstraint.getEndTime();
whereExpression = whereExpression.and(
@ -674,26 +1105,20 @@ AccountingPersistenceBackendQuery {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
whereExpression =
whereExpression.and(
x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
whereExpression = whereExpression.and(x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
keys.add(filterKey);
} else {
throw new KeyException(
String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
} else {
throw new ValueException(String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
throw new ValueException(String.format("Invalid %s : %s",Filter.class.getSimpleName(), filter.toString()));
}
}
}
Expression[] selectExpressionArray =
new Expression[selectExpressions.size()];
Expression[] selectExpressionArray = new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
Sort sort = Sort.desc(orderingProperty);
@ -701,6 +1126,9 @@ AccountingPersistenceBackendQuery {
.where(whereExpression).groupBy(key).orderBy(sort);
logger.debug("Query for top:"+path.toString());
Comparator<NumberedFilter> comparator = new Comparator<NumberedFilter>() {
@Override
public int compare(NumberedFilter o1, NumberedFilter o2) {
@ -733,6 +1161,7 @@ AccountingPersistenceBackendQuery {
if (n==null)
n=0;
//logger.trace("pre:{}, key:{}, value:{}, n:{},orderingProperty:{}",jsonObject.toString(),key, value, n, orderingProperty);
NumberedFilter numberedFilter =
new NumberedFilter(key, value, n, orderingProperty);
@ -748,6 +1177,59 @@ AccountingPersistenceBackendQuery {
return ret;
}
/**
* Return a list of context time series
* (used for portlet accounting context)
* @param clz
* @param temporalConstraint
* @param filters
* @param contexts
* @return
* @throws Exception
*/
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters,List<String> contexts)
throws Exception {
logger.trace("getContextTimeSeries for contexts:{}",contexts.toString());
SortedSet<Filter> listContexts = new TreeSet();
for (String context:contexts){
Filter contextLabel= new Filter("context",context);
listContexts.add(contextLabel);
}
SortedMap<Filter, SortedMap<Calendar, Info>> ret = new TreeMap<>();
for(Filter nf : listContexts){
logger.debug("detail time series :{}",nf.toString());
SortedMap<Calendar, Info> map =
mapReduceQuery(clz, temporalConstraint, filters,nf.getValue());
ret.put(nf, map);
filters.remove(nf);
}
return ret;
}
protected String getQualifiedProperty(String property){
//DEVELOPING
return (property);
}
//Use for property into a specify bucket
protected String getSpecializedProperty(Class<? extends AggregatedRecord<?, ?>> clz,String property){
return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
}
/**
* Used for list a possible values for each filter
* @param clz
* @param temporalConstraint
* @param filters
* @param key
* @return
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Override
public SortedSet<NumberedFilter> getFilterValues(
@ -811,6 +1293,16 @@ AccountingPersistenceBackendQuery {
return ret;
}
/**
* SPERIMENTAL now is not used
* @param clz
* @param temporalConstraint
* @param applicant
* @return
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Override
public JSONObject getUsageValue(
@ -865,12 +1357,10 @@ AccountingPersistenceBackendQuery {
Collection<String> keys = new TreeSet<>();
keys.add(applicant.getKey());
//String designDocId = getDesignDocId(clz);
//ADD A SPECIFIY DESIGN DOC ID FAMILY
String designDocId = getDesignDocIdSpecific(clz,keys);
String viewName = applicant.getKey();
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.inclusiveEnd();
query.groupLevel(groupLevel);
@ -919,17 +1409,45 @@ AccountingPersistenceBackendQuery {
}
/**
*
* SPERIMENTAL
* Used for calculate a usage value for each element of list
* (QUOTA)
* @param listUsage
* @return
* @throws Exception
*/
@Override
public List<UsageValue> getUsageValueQuotaTotal(
List<UsageValue> listUsage)
throws Exception {
logger.debug("getUsageValueQuotaTotal init with list:{}",listUsage);
//String currentScope = ScopeProvider.instance.get();
String keyOrderingProperty = null;
for (UsageValue totalFilters:listUsage){
logger.debug("----------------- init for totalFilters");
/*
gli arriva una lista di usage value da verificare...
gli usage value possono essere di diversi macro tipi:
storage
storage senza limite temporale (richiesta da per una quota totale )
utilizzo identifier come consumer id
utilizzo accounting_storage_status come bucket !
utilizzo quotaTotal come design
ed il nome della view sara composto da consumerid ed eventuali filtri (al momento non previsti)
storage con limiti temporali (richiesto per una quota parziale)
utilizzo identifier come consumer id
utilizzo accounting_storage_status come bucket !
utilizzo quota come design
ed il nome della view sara composto da consumerid ed eventuali filtri (al momento non previsti)
service
lista service con limite temporali
*/
String currentScope = totalFilters.getContext();
Collection<String> keys= new TreeSet<>();
@ -938,8 +1456,11 @@ AccountingPersistenceBackendQuery {
JsonArray temporalStartKey=null;
JsonArray temporalEndKey=null;
logger.trace("temporalConstraint:{}",totalFilters);
TemporalConstraint temporalConstraint=totalFilters.getTemporalConstraint();
if (temporalConstraint==(null)){
//used for no temporalConstraint
logger.trace("Not found temporalConstraint");
Calendar startTime = Calendar.getInstance();
startTime.set(1970, Calendar.JANUARY, 1);
@ -947,6 +1468,18 @@ AccountingPersistenceBackendQuery {
temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
//if type=storage then bucket =accounting_storage_status and designDocId = quotaTotal
//logger.trace("totalFilters.getClass().getSimpleName():{}",totalFilters.getClass().getSimpleName());
//logger.trace("totalFilters.getClz().getSimpleName():{}",totalFilters.getClz().getSimpleName());
if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
//TODO sarebbe possibile invece con il nome verificare che getClz e' un istanza di AggregatedStorageStatusRecord?
designDocId = "QuotaTotal";
}
}
else if (totalFilters.getClz().getSimpleName().equals(AggregatedStorageStatusRecord.class.getSimpleName())){
logger.trace("AggregatedStorageStatusRecord with temporalConstraint");
designDocId = "Quota";
}
AggregationMode aggregationMode = temporalConstraint.getAggregationMode();
@ -1008,7 +1541,7 @@ AccountingPersistenceBackendQuery {
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
totalFilters.getClz().getSimpleName(),designDocId, viewNameTmp, groupLevelTmp, startKeyTmp, endKeyTmp,temporalStartKey.toString(), temporalEndKey.toString());
logger.trace("connectionMap:{}",connectionMap.toString());
ViewQuery query = ViewQuery.from(designDocId, viewNameTmp);
query.inclusiveEnd();
query.groupLevel(groupLevelTmp);
@ -1027,7 +1560,7 @@ AccountingPersistenceBackendQuery {
Map<String, Float> map = new HashMap<String, Float>();
for (ViewRow row : viewResult) {
logger.trace("ViewRow row:{}",row.toString());
JsonObject jsnobject = (JsonObject) row.value();
JSONObject objJson = new JSONObject(jsnobject.toString());
@ -1037,13 +1570,16 @@ AccountingPersistenceBackendQuery {
Float valuetmp=Float.parseFloat(objJson.get(key).toString());
//logger.debug("totalFilters.getClass().getSimpleName():{} UsageStorageValue.class.getSimpleName():{}", totalFilters.getClass().getSimpleName(),UsageStorageValue.class.getSimpleName());
if (key.equals("operationCount") || key.equals("dataVolume")){
if (map.containsKey(key)) {
map.put(key, valuetmp + map.get(key));
// TODO verify a better method
if (designDocId.equals("StorageUsageRecord")){
if (totalFilters.getClass().getSimpleName().equals(UsageStorageValue.class.getSimpleName())){
logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
@ -1054,6 +1590,7 @@ AccountingPersistenceBackendQuery {
}
}
else{
logger.debug("?UsageRecord -designDocId:{}",designDocId);
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
singleFilter.setD(singleFilter.getD()+valuetmp.doubleValue());
@ -1067,7 +1604,8 @@ AccountingPersistenceBackendQuery {
map.put(key, valuetmp);
// TODO verify a better method
if (designDocId.equals("StorageUsageRecord")){
if (totalFilters.getClass().getSimpleName().equals(UsageStorageValue.class.getSimpleName())){
logger.debug("storageUsageRecord -designDocId:{}",designDocId);
if (key.equals("dataVolume")){
keyOrderingProperty=key;
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
@ -1078,6 +1616,7 @@ AccountingPersistenceBackendQuery {
}
}
else{
logger.debug("?UsageRecord -designDocId:{}",designDocId);
keyOrderingProperty=key;
if (totalFilters.getClass().getSimpleName().equals(UsageServiceValue.class.getSimpleName())){
singleFilter.setOrderingProperty(key);
@ -1094,7 +1633,13 @@ AccountingPersistenceBackendQuery {
totalFilters.setOrderingProperty(keyOrderingProperty);
totalFilters.setD(totalQuota);
}
return listUsage;
}
}

@ -18,14 +18,15 @@ import org.gcube.accounting.analytics.FiltersValue;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.UsageServiceValue;
import org.gcube.accounting.analytics.UsageStorageValue;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.aggregation.AggregatedJobUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
import org.gcube.common.scope.api.ScopeProvider;
@ -257,11 +258,8 @@ public class AccountingPersistenceQueryCouchBaseTest {
filters1.add(new Filter("serviceClass", "VREManagement"));
simpleFilter1.setFiltersValue(filters1);
filterPackageQuota.add(simpleFilter1);
UsageValue totalfilter=new UsageServiceValue(context,"lucio.lelii",AggregatedServiceUsageRecord.class,temporalConstraint,filterPackageQuota);
/*ask quota for user lucio lelii
* SERVICE
* */
@ -286,10 +284,7 @@ public class AccountingPersistenceQueryCouchBaseTest {
UsageValue totalfilter1=new UsageServiceValue("gcube","lucio.lelii",AggregatedServiceUsageRecord.class,temporalConstraint,filterPackageQuota1);
//totalfilter1.setTotalFilters(filterPackageQuota1);
/*ask quota for alessandro pieve
*STORAGE
**/
@ -305,17 +300,46 @@ public class AccountingPersistenceQueryCouchBaseTest {
//TotalFilters totalfilter2=new TotalFilters("alessandro.pieve",AggregatedStorageUsageRecord.class,temporalConstraint,filterPackageQuota2);
List<FiltersValue> filtersList=new ArrayList<FiltersValue>();
//UsageValue totalfilter2=new UsageStorageValue("alessandro.pieve",AggregatedStorageUsageRecord.class,temporalConstraint);
/****
*Example call storage status for each consumer id (quota total used )
*/
//Call quota total for consumerID
UsageValue totalfilterStorageStatus=new UsageStorageValue(context,"name.surname",AggregatedStorageStatusRecord.class);
UsageValue totalfilterStorageStatus_2=new UsageStorageValue(context,"lucio.lelii",AggregatedStorageStatusRecord.class);
UsageValue totalfilterStorageStatus_3=new UsageStorageValue(context,"alessandro.pieve",AggregatedStorageStatusRecord.class);
UsageValue totalfilterStorageStatus_4=new UsageStorageValue(context,"giancarlo.panichi",AggregatedStorageStatusRecord.class);
UsageValue totalfilter2=new UsageStorageValue(context,"alessandro.pieve",AggregatedStorageUsageRecord.class);
/****
*Example call storage status for each consumer id( quota into period)
*/
//get temporalConstraintStorage
Calendar startTimeStorage = Calendar.getInstance();
startTimeStorage.set(2015, Calendar.MAY, 1);
Calendar endTimeStorage = Calendar.getInstance();
endTimeStorage.set(2016, Calendar.DECEMBER, 18);
TemporalConstraint temporalConstraintStorage =new TemporalConstraint(startTimeStorage.getTimeInMillis(),
endTimeStorage.getTimeInMillis(), AggregationMode.DAILY);
UsageValue totalfilterStorageStatusPeriod=new UsageStorageValue(context,"name.surname",AggregatedStorageStatusRecord.class,temporalConstraintStorage);
//richiedo la lista di dati usati totali
List<UsageValue> listTotalFilter=new ArrayList<UsageValue>();
// listTotalFilter.add(totalfilter);
listTotalFilter.add(totalfilter1);
// listTotalFilter.add(totalfilter);
listTotalFilter.add(totalfilter1);
listTotalFilter.add(totalfilterStorageStatus);
listTotalFilter.add(totalfilterStorageStatus_2);
listTotalFilter.add(totalfilterStorageStatus_3);
listTotalFilter.add(totalfilterStorageStatus_4);
listTotalFilter.add(totalfilterStorageStatusPeriod);
listTotalFilter.add(totalfilter2);
logger.info("filterPackageQuota:"+listTotalFilter);
List<UsageValue> filterValue =
@ -343,13 +367,13 @@ listTotalFilter.add(totalfilter1);
@Test
public void testTopService() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.set(2016, Calendar.SEPTEMBER, 27);
startTime.set(2016, Calendar.AUGUST, 27);
Calendar endTime = Calendar.getInstance();
endTime.set(2016, Calendar.SEPTEMBER, 28,23,59);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new Filter(AggregatedServiceUsageRecord.SERVICE_CLASS, "Common"));
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
@ -360,12 +384,47 @@ listTotalFilter.add(totalfilter1);
SortedMap<NumberedFilter, SortedMap<Calendar, Info>> set =
accountingPersistenceQueryCouchBase.getTopValues(
clz, temporalConstraint, filters,
AggregatedServiceUsageRecord.CALLED_METHOD, null);
AggregatedServiceUsageRecord.HOST, null);
logger.debug("Result final{}", set);
}
@Test
public void testTopStorage() throws Exception {
Calendar startTime = Calendar.getInstance();
startTime.set(2016, Calendar.AUGUST, 27);
Calendar endTime = Calendar.getInstance();
endTime.set(2016, Calendar.SEPTEMBER, 28,23,59);
List<Filter> filters = new ArrayList<Filter>();
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
Class<AggregatedStorageUsageRecord> clz =
AggregatedStorageUsageRecord.class;
SortedMap<NumberedFilter, SortedMap<Calendar, Info>> set =
accountingPersistenceQueryCouchBase.getTopValues(
clz, temporalConstraint, filters,
AggregatedStorageUsageRecord.CONSUMER_ID, null);
logger.debug("Result final{}", set);
}
@Test
public void getQuerableKeyService() throws Exception{
SortedSet<String> keys;
@ -504,5 +563,25 @@ listTotalFilter.add(totalfilter1);
}
@Test
public void getListUsage() throws Exception{
Calendar startTime = Calendar.getInstance();
startTime.set(2015, Calendar.SEPTEMBER, 1);
Calendar endTime = Calendar.getInstance();
endTime.set(2016, Calendar.OCTOBER, 20,23,59);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new Filter(AggregatedServiceUsageRecord.CONSUMER_ID, "valentina.marioli"));
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
String context="/gcube/devNext";
List <String>parameters=new ArrayList<String>();
parameters.add("serviceClass");
parameters.add("serviceName");
Class<AggregatedServiceUsageRecord> clz =
AggregatedServiceUsageRecord.class;
SortedMap<String,Integer> result= accountingPersistenceQueryCouchBase.getListUsage(clz,temporalConstraint,
filters,context,parameters);
}
}