diff --git a/.classpath b/.classpath
index e43402f..add55f2 100644
--- a/.classpath
+++ b/.classpath
@@ -30,6 +30,7 @@
+
diff --git a/.project b/.project
index 936c10a..664a398 100644
--- a/.project
+++ b/.project
@@ -5,6 +5,11 @@
+
+ org.eclipse.wst.common.project.facet.core.builder
+
+
+
org.eclipse.jdt.core.javabuilder
@@ -15,9 +20,17 @@
+
+ org.eclipse.wst.validation.validationbuilder
+
+
+
+ org.eclipse.jem.workbench.JavaEMFNature
+ org.eclipse.wst.common.modulecore.ModuleCoreNature
org.eclipse.jdt.core.javanature
org.eclipse.m2e.core.maven2Nature
+ org.eclipse.wst.common.project.facet.core.nature
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
index ec4300d..443e085 100644
--- a/.settings/org.eclipse.jdt.core.prefs
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,8 @@
eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.compliance=1.7
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.7
diff --git a/.settings/org.eclipse.wst.common.component b/.settings/org.eclipse.wst.common.component
new file mode 100644
index 0000000..99215a1
--- /dev/null
+++ b/.settings/org.eclipse.wst.common.component
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/.settings/org.eclipse.wst.common.project.facet.core.xml b/.settings/org.eclipse.wst.common.project.facet.core.xml
new file mode 100644
index 0000000..1b22d70
--- /dev/null
+++ b/.settings/org.eclipse.wst.common.project.facet.core.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/distro/README b/distro/README
index ed2b585..f84d4a0 100644
--- a/distro/README
+++ b/distro/README
@@ -21,11 +21,13 @@ Authors
--------------------------------------------------
* Luca Frosini (luca.frosini-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
+* Alessandro Pieve (alessandro.pieve-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
Maintainers
-----------
* Luca Frosini (luca.frosini-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
+* Alessandro Pieve (alessandro.pieve-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
Download information
diff --git a/distro/changelog.xml b/distro/changelog.xml
index 27e47c7..8f870f9 100644
--- a/distro/changelog.xml
+++ b/distro/changelog.xml
@@ -1,5 +1,5 @@
- First Release
+ First Release #1665
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 5f31125..1986a7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
com.couchbase.client
java-client
- 2.2.3
+ 2.2.7
diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
index d378532..4ffd73a 100644
--- a/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
+++ b/src/main/java/org/gcube/accounting/analytics/persistence/couchbase/AccountingPersistenceQueryCouchBase.java
@@ -11,8 +11,12 @@ import java.security.KeyException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -35,6 +39,8 @@ import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
+import org.gcube.documentstore.records.Record;
+import org.gcube.documentstore.records.RecordUtility;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
@@ -51,7 +57,8 @@ import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.query.dsl.Expression;
import com.couchbase.client.java.query.dsl.Sort;
-import com.couchbase.client.java.query.dsl.path.LimitPath;
+import com.couchbase.client.java.query.dsl.path.GroupByPath;
+import com.couchbase.client.java.query.dsl.path.OffsetPath;
import com.couchbase.client.java.view.ViewQuery;
import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
@@ -61,24 +68,52 @@ import com.couchbase.client.java.view.ViewRow;
*
*/
public class AccountingPersistenceQueryCouchBase implements
- AccountingPersistenceBackendQuery {
+AccountingPersistenceBackendQuery {
private static final Logger logger = LoggerFactory
.getLogger(AccountingPersistenceQueryCouchBase.class);
+
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY;
// public static final String USERNAME_PROPERTY_KEY =
// AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY;
public static final String PASSWORD_PROPERTY_KEY = AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY;
- public static final String BUCKET_NAME_PROPERTY_KEY = "bucketName";
+ public static final String BUCKET_STORAGE_NAME_PROPERTY_KEY="AggregatedStorageUsageRecord";
+ public static final String BUCKET_SERVICE_NAME_PROPERTY_KEY="AggregatedServiceUsageRecord";
+
+ public static final String BUCKET_PORTLET_NAME_PROPERTY_KEY="AggregatedPortletUsageRecord";
+ public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
+ public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
+
+
+ public static final long ENV_TIME_OUT=180000;
/* The environment configuration */
protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment
- .builder().queryEnabled(true).build();
+ .builder().maxRequestLifetime(ENV_TIME_OUT).queryTimeout(ENV_TIME_OUT).queryEnabled(true).build();
+
protected Cluster cluster;
- protected Bucket bucket;
- protected String bucketName;
+ /* One Bucket for type*/
+ protected Bucket bucketStorage;
+ protected String bucketNameStorage;
+
+ protected Bucket bucketService;
+ protected String bucketNameService;
+
+ protected Bucket bucketPortlet;
+ protected String bucketNamePortlet;
+
+ protected Bucket bucketJob;
+ protected String bucketNameJob;
+
+ protected Bucket bucketTask;
+ protected String bucketNameTask;
+
+
+
+
+ private Map connectionMap;
/**
* {@inheritDoc}
@@ -86,14 +121,51 @@ public class AccountingPersistenceQueryCouchBase implements
@Override
public void prepareConnection(
AccountingPersistenceBackendQueryConfiguration configuration)
- throws Exception {
+ throws Exception {
+
+
+
+
+
String url = configuration.getProperty(URL_PROPERTY_KEY);
- // String username = configuration.getProperty(USERNAME_PROPERTY_KEY);
+
+
+
String password = configuration.getProperty(PASSWORD_PROPERTY_KEY);
cluster = CouchbaseCluster.create(ENV, url);
- bucketName = configuration.getProperty(BUCKET_NAME_PROPERTY_KEY);
- bucket = cluster.openBucket(bucketName, password);
+
+ logger.trace("env"+ENV.toString());
+
+ bucketNameStorage = configuration.getProperty(BUCKET_STORAGE_NAME_PROPERTY_KEY);
+ bucketNameService = configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY);
+ bucketNameJob = configuration.getProperty(BUCKET_JOB_NAME_PROPERTY_KEY);
+ bucketNamePortlet = configuration.getProperty(BUCKET_PORTLET_NAME_PROPERTY_KEY);
+ bucketNameTask = configuration.getProperty(BUCKET_TASK_NAME_PROPERTY_KEY);
+
+ connectionMap = new HashMap();
+
+ bucketStorage = cluster.openBucket(bucketNameStorage, password);
+
+ connectionMap.put(BUCKET_STORAGE_NAME_PROPERTY_KEY, bucketStorage);
+
+ bucketService = cluster.openBucket(bucketNameService, password);
+ connectionMap.put(BUCKET_SERVICE_NAME_PROPERTY_KEY, bucketService);
+
+ bucketJob= cluster.openBucket(bucketNameJob, password);
+ connectionMap.put(BUCKET_JOB_NAME_PROPERTY_KEY, bucketJob);
+
+ bucketPortlet= cluster.openBucket(bucketNamePortlet, password);
+ connectionMap.put(BUCKET_PORTLET_NAME_PROPERTY_KEY, bucketPortlet);
+
+ bucketTask= cluster.openBucket(bucketNameTask, password);
+ connectionMap.put(BUCKET_TASK_NAME_PROPERTY_KEY, bucketTask);
+
+
+ logger.trace("Open cluster Service Bucket Url:"+url+" Pwd:"+configuration.getProperty(PASSWORD_PROPERTY_KEY)+
+ " BucketName "+configuration.getProperty(BUCKET_SERVICE_NAME_PROPERTY_KEY));
+
+
}
/**
@@ -125,80 +197,100 @@ public class AccountingPersistenceQueryCouchBase implements
return calendar;
}
-// private Map selectQuery(
-// Class extends AggregatedRecord, ?>> clz,
-// TemporalConstraint temporalConstraint, List filters)
-// throws Exception {
-// // String currentScope = BasicUsageRecord.getScopeFromToken();
-// String currentScope = ScopeProvider.instance.get();
-// String recordType = clz.newInstance().getRecordType();
-//
-// Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
-// expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(
-// s(recordType)));
-//
-// long startTime = temporalConstraint.getAlignedStartTime()
-// .getTimeInMillis();
-// expression = expression.and(x(AggregatedRecord.START_TIME)
-// .gt(startTime).or(
-// x(AggregatedRecord.CREATION_TIME).gt(startTime)));
-//
-// long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
-// expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime))
-// .or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
-//
-// AggregationMode aggregationMode = temporalConstraint
-// .getAggregationMode();
-// // TODO Aggregate Results
-//
-// if (filters != null) {
-// for (Filter filter : filters) {
-// expression = expression.and(x(filter.getKey()).eq(
-// s(filter.getValue())));
-// }
-// }
-//
-// GroupByPath groupByPath = select("*").from(bucketName)
-// .where(expression);
-//
-// Map map = new HashMap();
-//
-// N1qlQueryResult result = bucket.query(groupByPath);
-// if (!result.finalSuccess()) {
-// logger.debug("{} failed : {}",
-// N1qlQueryResult.class.getSimpleName(), result.errors());
-// return map;
-// }
-//
-// List rows = result.allRows();
-//
-// for (N1qlQueryRow row : rows) {
-// try {
-// logger.trace("Row : {}", row.toString());
-// JsonObject jsonObject = row.value().getObject(bucketName);
-// logger.trace("JsonObject : {}", row.toString());
-// String recordString = jsonObject.toMap().toString();
-// logger.trace("Record String : {}", recordString);
-// Record record = RecordUtility.getRecord(recordString);
-//
-// JSONObject obj = new JSONObject(jsonObject.toString());
-// Calendar calendar = getCalendar(obj, aggregationMode);
-// if (map.containsKey(calendar)) {
-// Info info = map.get(calendar);
-// JSONObject value = info.getValue();
-// jsonObject.toMap();
-// } else {
-// map.put(calendar, new Info(calendar, obj));
-// }
-// } catch (Exception e) {
-// logger.warn("Unable to eleborate result for {}", row.toString());
-// }
-//
-// logger.trace("\n\n\n");
-// }
-//
-// return map;
-// }
+ protected Map selectQuery(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters)
+ throws Exception {
+ String currentScope = ScopeProvider.instance.get();
+
+ String recordType = clz.newInstance().getRecordType();
+
+ Expression expression = x(BasicUsageRecord.SCOPE).eq(s(currentScope));
+ expression = expression.and(x(BasicUsageRecord.RECORD_TYPE).eq(
+ s(recordType)));
+
+ long startTime = temporalConstraint.getAlignedStartTime()
+ .getTimeInMillis();
+ expression = expression.and(x(AggregatedRecord.START_TIME)
+ .gt(startTime).or(
+ x(AggregatedRecord.CREATION_TIME).gt(startTime)));
+
+ long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
+ expression = expression.and(x(AggregatedRecord.END_TIME).lt(endTime))
+ .or(x(AggregatedRecord.CREATION_TIME).lt(endTime));
+
+ AggregationMode aggregationMode = temporalConstraint
+ .getAggregationMode();
+ // TODO Aggregate Results
+
+ if (filters != null) {
+ for (Filter filter : filters) {
+ expression = expression.and(x(filter.getKey()).eq(
+ s(filter.getValue())));
+ }
+ }
+
+ GroupByPath groupByPath = select("*").from(connectionMap.get(clz.getSimpleName()).name())
+ .where(expression);
+
+
+
+
+ Map map = new HashMap();
+
+ // using the DSL
+ /*e.g.
+ select sum(dataVolume) as Volume ,
+ sum(operationCount) as Operation ,
+ consumerId
+ from accounting_storage_test
+ group by order by Volume desc
+ */
+ // N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(select("sum(dataVolume) as Volume ,sum(operationCount) as Operation , consumerId").from("accounting_storage").groupBy("consumerId").limit(5));
+
+
+
+ //logger.info("result"+result.toString());
+ N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(groupByPath);
+
+ if (!result.finalSuccess()) {
+ logger.debug("{} failed : {}",
+ N1qlQueryResult.class.getSimpleName(), result.errors());
+ return map;
+ }
+
+ List rows = result.allRows();
+
+ for (N1qlQueryRow row : rows) {
+ try {
+ logger.trace("Row : {}", row.toString());
+ JsonObject jsonObject = row.value().getObject(clz.getSimpleName());
+ logger.trace("JsonObject : {}", row.toString());
+ String recordString = jsonObject.toMap().toString();
+ logger.trace("Record String : {}", recordString);
+ Record record = RecordUtility.getRecord(recordString);
+
+ JSONObject obj = new JSONObject(jsonObject.toString());
+ Calendar calendar = getCalendar(obj, aggregationMode);
+ if (map.containsKey(calendar)) {
+ Info info = map.get(calendar);
+ JSONObject value = info.getValue();
+
+
+
+ jsonObject.toMap();
+ } else {
+ map.put(calendar, new Info(calendar, obj));
+ }
+ } catch (Exception e) {
+ logger.warn("Unable to eleborate result for {}", row.toString());
+ }
+
+ logger.trace("\n\n\n");
+ }
+
+ return map;
+ }
protected Calendar getCalendarFromArray(JsonArray array)
throws JSONException {
@@ -221,7 +313,7 @@ public class AccountingPersistenceQueryCouchBase implements
/*
logger.trace("The provide value is not an int. {}", array
.get(i).toString());
- */
+ */
if (startFound) {
break;
}
@@ -270,41 +362,44 @@ public class AccountingPersistenceQueryCouchBase implements
return array;
}
-
- protected static final String MAP_REDUCE__DESIGN = "_design/";
+
+
+ protected static final String MAP_REDUCE__DESIGN = "";
protected static final String MAP_REDUCE_ALL = "all";
-
+
/**
* Used in the name of map reduce to separate keys used as filter
*/
protected static final String KEYS_SEPARATOR = "__";
-
+
protected String getDesignDocId(
Class extends AggregatedRecord,?>> recordClass)
- throws InstantiationException, IllegalAccessException {
+ throws InstantiationException, IllegalAccessException {
return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
.newInstance().getRecordType());
}
-
+
public static String getMapReduceFunctionName(
Collection collection) {
String reduceFunction = MAP_REDUCE_ALL;
- for (String property : collection) {
- if (reduceFunction == null) {
- reduceFunction = property;
- } else {
- reduceFunction = reduceFunction + KEYS_SEPARATOR + property;
+ if (!collection.isEmpty()){
+ reduceFunction = null;
+ for (String property : collection) {
+ if (reduceFunction == null) {
+ reduceFunction = property;
+ } else {
+ reduceFunction = reduceFunction + KEYS_SEPARATOR + property;
+ }
}
}
return reduceFunction;
}
-
+
protected SortedMap mapReduceQuery(
Class extends AggregatedRecord, ?>> clz,
- TemporalConstraint temporalConstraint, List filters)
- throws Exception {
+ TemporalConstraint temporalConstraint, List filters)
+ throws Exception {
- // String currentScope = BasicUsageRecord.getScopeFromToken();
String currentScope = ScopeProvider.instance.get();
JsonArray startKey = JsonArray.create();
@@ -314,49 +409,63 @@ public class AccountingPersistenceQueryCouchBase implements
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
-
+
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
- aggregationMode, true, false);
+ aggregationMode, false, false);
+
-
Set recordKeysSet = AccountingPersistenceQuery
.getQuerableKeys(clz.newInstance());
Collection keys = new TreeSet<>();
-
- JsonArray filterStartKey = JsonArray.create();
- JsonArray filterEndKey = JsonArray.create();
+
+ //JsonArray filterStartKey = JsonArray.create();
+ //JsonArray filterEndKey = JsonArray.create();
+
if (filters != null && filters.size() != 0) {
+ // Sorting filter for call a mapreduce
+ Collections.sort(filters, new Comparator() {
+ @Override
+ public int compare(Filter filter1, Filter filter2)
+ {
+
+ return filter1.getKey().compareTo(filter2.getKey());
+ }
+ });
+ logger.trace("filter"+filters.toString());
+
+
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
-
+
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
-
+
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
- filterStartKey.add(filterValue);
- filterEndKey.add(filterValue);
-
+ startKey.add(filterValue);
+ endKey.add(filterValue);
+ //filterStartKey.add(filterValue);
+ //filterEndKey.add(filterValue);
keys.add(filterKey);
} else {
throw new KeyException(
- String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
}
-
+
} else {
throw new ValueException(String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
@@ -364,7 +473,7 @@ public class AccountingPersistenceQueryCouchBase implements
}
}
-
+
// +1 because mode start from 0
// +1 because of scope at the beginning
int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1;
@@ -375,34 +484,57 @@ public class AccountingPersistenceQueryCouchBase implements
String designDocId = getDesignDocId(clz);
+ for (Object temporal: temporalStartKey.toList()){
+ if (!temporal.toString().isEmpty())
+ startKey.add(temporal);
+ }
+ int count =1;
+ for (Object temporal: temporalEndKey.toList()){
+ if (!temporal.toString().isEmpty()){
+ //couchbase exclude last value
+ if (count==temporalEndKey.size())
+ temporal=(int)temporal+1;
+ endKey.add(temporal);
+ }
+ count++;
+ }
- startKey.add(filterStartKey.toList());
- startKey.add(temporalStartKey.toList());
-
- endKey.add(filterEndKey.toList());
- endKey.add(temporalEndKey.toList());
+ logger.trace("startKey:{}"+startKey);
+ //startKey.add(temporalStartKey.toList());
+ //endKey.add(temporalEndKey.toList());
String viewName = getMapReduceFunctionName(keys);
ViewQuery query = ViewQuery.from(designDocId, viewName);
- query.group(true);
+
+ query.inclusiveEnd();
+
query.groupLevel(groupLevel);
-
query.startKey(startKey);
query.endKey(endKey);
+
+
query.descending(false);
- logger.trace("Design Doc ID : {}, View Name : {}, "
- + "Group Level : {}, Start Key : {}, End Key : {}",
- designDocId, viewName, groupLevel, startKey, endKey);
+
+
+
+ logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ + "Group Level : {}, Start Key : {}, End Key : {},"
+ + "temporalStartKey :{}, temporalEndKey :{}",
+ clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
+
SortedMap infos = new TreeMap<>();
ViewResult viewResult;
try {
- viewResult = bucket.query(query);
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
} catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
throw e;
}
@@ -411,100 +543,121 @@ public class AccountingPersistenceQueryCouchBase implements
Calendar calendar = getCalendarFromArray(array);
JsonObject value = (JsonObject) row.value();
- JSONObject obj = new JSONObject(value.toString());
-
+ JSONObject obj = new JSONObject(value.toString());
Info info = new Info(calendar, obj);
infos.put(calendar, info);
}
+ if (infos.isEmpty()){
+
+ //exec a map reduce for found name key
+ query = ViewQuery.from(designDocId, viewName);
+ query.groupLevel(groupLevel);
+ query.descending(false);
+ try {
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ throw e;
+ }
+
+ ViewRow row=viewResult.allRows().get(0);
+
+ JsonArray array = getRangeKey(
+ temporalConstraint.getStartTime(),
+ aggregationMode, false, false);
+ Calendar calendar = getCalendarFromArray(array);
+
+ JsonObject value = (JsonObject) row.value();
+
+ JSONObject objJson = new JSONObject(value.toString());
+ JSONObject objJsontemplate = new JSONObject();
+ Iterator> iterateJson = objJson.keys();
+ while( iterateJson.hasNext() ) {
+ String key = (String)iterateJson.next();
+ objJsontemplate.put(key, 0);
+ }
+ //generate an example object for json
+ Info info = new Info(calendar, objJsontemplate);
+ infos.put(calendar, info);
+ //break;
+ }
+
+
return infos;
}
-
+
@Override
public SortedMap getTimeSeries(
Class extends AggregatedRecord, ?>> clz,
- TemporalConstraint temporalConstraint, List filters)
- throws Exception {
- return mapReduceQuery(clz, temporalConstraint, filters);
+ TemporalConstraint temporalConstraint, List filters)
+ throws Exception {
+ SortedMap map = mapReduceQuery(clz, temporalConstraint, filters);
+
+
+ return map;
}
@Override
public SortedMap> getTopValues(
Class extends AggregatedRecord, ?>> clz,
- TemporalConstraint temporalConstraint, List filters,
- String topKey, String orderingProperty) throws Exception {
-
+ TemporalConstraint temporalConstraint, List filters,
+ String topKey, String orderingProperty) throws Exception {
+
+
+
+ Comparator comparator = new Comparator() {
+
+ @Override
+ public int compare(NumberedFilter o1, NumberedFilter o2) {
+ return - o1.compareTo(o2);
+ }
+
+ };
+
SortedMap> ret =
- new TreeMap<>();
-
+ new TreeMap<>(comparator);
+
SortedSet top = getNextPossibleValues(clz,
temporalConstraint, filters, topKey, orderingProperty);
-
+
for(NumberedFilter nf : top){
filters.add(nf);
-
SortedMap map =
mapReduceQuery(clz, temporalConstraint, filters);
-
+
ret.put(nf, map);
-
+
filters.remove(nf);
}
-
return ret;
}
-
+
+
protected String getQualifiedProperty(String property){
- return String.format("%s.%s", bucketName, property);
+ //DEVELOPING
+ return (property);
+ //return String.format("%s.%s", bucketName, property);
+ }
+
+ //Use for proprerty into a specify bucket
+ protected String getSpecializedProperty(Class extends AggregatedRecord, ?>> clz,String property){
+
+ return String.format("%s.%s", connectionMap.get(clz.getSimpleName()).name(), property);
+
}
- @SuppressWarnings("deprecation")
@Override
public SortedSet getNextPossibleValues(
Class extends AggregatedRecord, ?>> clz,
- TemporalConstraint temporalConstraint, List filters,
- String key, String orderingProperty) throws Exception {
-
- /*
- SELECT
- SUM(accounting.dataVolume) as dataVolume,
- // SUM(accounting.operationCount) as operationCount,
-
- // Filter List e.g.
- accounting.consumerId as consumerId,
- accounting.serviceClass as serviceClass,
-
- // topKey e.g
- accounting.serviceName as serviceName
-
- FROM accounting
- WHERE
- accounting.scope="/gcube/devsec" AND
- (accounting.recordType="StorageUsageRecord" OR
- accounting.usageRecordType="StorageUsageRecord") AND
-
- AND
- (accounting.startTime >= temporalConstraint.startTime AND
- accounting.endTime >= temporalConstraint.endTime)
-
+ TemporalConstraint temporalConstraint, List filters,
+ String key, String orderingProperty) throws Exception {
+
+
- // Filter List e.g
- AND
- accounting.consumerId="luca.frosini" AND
- accounting.serviceClass="VREManagement"
-
- // topKey
- GROUP BY accounting.serviceName
-
- ORDER BY dataVolume DESC
- // ORDER BY operationsCount DESC
-
- LIMIT 10
- */
-
-
- // String currentScope = BasicUsageRecord.getScopeFromToken();
String currentScope = ScopeProvider.instance.get();
String recordType = clz.newInstance().getRecordType();
@@ -512,66 +665,68 @@ public class AccountingPersistenceQueryCouchBase implements
orderingProperty = AccountingPersistenceQuery.
getDefaultOrderingProperties(clz);
}
-
+
Collection selectExpressions = new ArrayList<>();
-
- selectExpressions.add(x("SUM(" + getQualifiedProperty(orderingProperty) + ")").
+
+ //add select expression
+ selectExpressions.add(x("SUM(" + getSpecializedProperty(clz,orderingProperty) + ")").
as(orderingProperty));
-
- selectExpressions.add(x(getQualifiedProperty(key)).as(key));
-
+ selectExpressions.add(x(getSpecializedProperty(clz,key)).as(key));
+
+ //add where expression
Expression whereExpression =
- x(getQualifiedProperty(BasicUsageRecord.SCOPE)).
+ x(getSpecializedProperty(clz,BasicUsageRecord.SCOPE)).
eq(s(currentScope));
+
+ long startTime = temporalConstraint.getAlignedStartTime().getTimeInMillis();
whereExpression = whereExpression.and(
- x(getQualifiedProperty(BasicUsageRecord.RECORD_TYPE)).
- eq(s(recordType)).or(
- x(getQualifiedProperty(BasicUsageRecord.USAGE_RECORD_TYPE)).
- eq(s(recordType)))
- );
-
- long startTime = temporalConstraint.getAlignedStartTime()
- .getTimeInMillis();
- whereExpression = whereExpression.and(
- x(getQualifiedProperty(AggregatedRecord.START_TIME)).gt(startTime).or(
- x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).gt(startTime)));
-
+ x(getSpecializedProperty(clz,AggregatedRecord.START_TIME)).gt(startTime)
+
+ );
+
long endTime = temporalConstraint.getAlignedEndTime().getTimeInMillis();
whereExpression = whereExpression.and(
- x(getQualifiedProperty(AggregatedRecord.END_TIME)).lt(endTime)).or(
- x(getQualifiedProperty(AggregatedRecord.CREATION_TIME)).lt(endTime));
+ x(getSpecializedProperty(clz,AggregatedRecord.END_TIME)).lt(endTime)
+ );
+
+
Set recordKeysSet = AccountingPersistenceQuery
.getQuerableKeys(clz.newInstance());
-
+
+
+ //list filter used for remove duplicate filter
Collection keys = new TreeSet<>();
-
+
+
+
+
if (filters != null && filters.size() != 0) {
for (Filter filter : filters) {
String filterKey = filter.getKey();
String filterValue = filter.getValue();
-
+
if (filterKey != null && filterKey.compareTo("") != 0
&& recordKeysSet.contains(filterKey)) {
-
+
if (filterValue != null && filterValue.compareTo("") != 0) {
if (keys.contains(filterKey)) {
throw new DuplicatedKeyFilterException(
"Only one value per Filter key is allowed");
}
-
- whereExpression.and(
- x(getQualifiedProperty(filterKey)).eq(filterValue));
-
+ whereExpression =
+ whereExpression.and(
+ x(getSpecializedProperty(clz,filterKey)).eq(s(filterValue)));
+
keys.add(filterKey);
} else {
throw new KeyException(
- String.format("Invalid %s : %s",
- Filter.class.getSimpleName(), filter.toString()));
+ String.format("Invalid %s : %s",
+ Filter.class.getSimpleName(), filter.toString()));
}
-
+
} else {
throw new ValueException(String.format("Invalid %s : %s",
Filter.class.getSimpleName(), filter.toString()));
@@ -580,18 +735,19 @@ public class AccountingPersistenceQueryCouchBase implements
}
}
-
+
Expression[] selectExpressionArray =
new Expression[selectExpressions.size()];
selectExpressions.toArray(selectExpressionArray);
-
+
Sort sort = Sort.desc(orderingProperty);
-
- LimitPath path = select(selectExpressionArray).from(bucketName)
+ OffsetPath path = select(selectExpressionArray).from(connectionMap.get(clz.getSimpleName()).name())
.where(whereExpression).groupBy(key).orderBy(sort);
+
- logger.debug(path.toString());
+ logger.debug(path.toString());
+
Comparator comparator = new Comparator() {
@@ -599,43 +755,227 @@ public class AccountingPersistenceQueryCouchBase implements
public int compare(NumberedFilter o1, NumberedFilter o2) {
return - o1.compareTo(o2);
}
-
+
};
-
+
SortedSet ret = new TreeSet<>(comparator);
- N1qlQueryResult result = bucket.query(path);
+ N1qlQueryResult result = connectionMap.get(clz.getSimpleName()).query(path);
+
+
+
if (!result.finalSuccess()) {
logger.debug("{} failed : {}",
N1qlQueryResult.class.getSimpleName(), result.errors());
throw new Exception("Query Failed :\n" + result.errors());
}
-
+
List rows = result.allRows();
for (N1qlQueryRow row : rows) {
try {
- logger.trace("Row : {}", row.toString());
JsonObject jsonObject = row.value();
- logger.trace("JsonObject : {}", row.toString());
-
- String value = jsonObject.getString(key);
+ //logger.trace("JsonObject : {}", row.value()+" key"+key);
+ //verify for a not null value
+ String value = jsonObject.getString(key);
Number n = jsonObject.getDouble(orderingProperty);
-
+
NumberedFilter numberedFilter =
new NumberedFilter(key, value, n, orderingProperty);
-
+
ret.add(numberedFilter);
-
+
} catch (Exception e) {
logger.warn("Unable to eleborate result for {}", row.toString());
+ //logger.warn("Error:"+e.getLocalizedMessage());
}
- logger.trace("\n\n\n");
+ //logger.trace("\n\n");
+ }
+
+ return ret;
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public SortedSet getFilterValues(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, List filters,
+ String key) throws Exception {
+
+
+ String currentScope = ScopeProvider.instance.get();
+
+ JsonArray startKey = JsonArray.create();
+ startKey.add(currentScope);
+
+ int scopeDateGroupLevel = 2;
+ int groupLevel = scopeDateGroupLevel;
+
+ String designDocId = getDesignDocId(clz)+"Value";
+
+ String viewName = key;
+ logger.trace("designDocId:{} view:{} ",designDocId,key);
+ logger.trace("startKey:{}",startKey);
+ logger.trace("groupLevel"+groupLevel);
+ ViewQuery query = ViewQuery.from(designDocId, viewName);
+
+ query.inclusiveEnd();
+
+ query.groupLevel(groupLevel);
+ query.startKey(startKey);
+ query.descending(false);
+
+ String orderingProperty = AccountingPersistenceQuery.
+ getDefaultOrderingProperties(clz);
+
+ ViewResult viewResult;
+ try {
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
+ } catch (Exception e) {
+ logger.error("error executing the query",e);
+ throw e;
+ }
+
+ Comparator comparator = new Comparator() {
+
+ @Override
+ public int compare(NumberedFilter o1, NumberedFilter o2) {
+ if (o1.getValue()==null)
+ o1.setValue("");
+ if (o2.getValue()==null)
+ o2.setValue("");
+ return o1.getValue().compareTo(o2.getValue());
+ }
+
+ };
+ SortedSet ret = new TreeSet<>(comparator);
+
+ for (ViewRow row : viewResult) {
+ String value =(String) row.value();
+ NumberedFilter numberedFilter =
+ new NumberedFilter(key, value, 0, orderingProperty);
+ ret.add(numberedFilter);
}
+ logger.trace("returning {} values",ret.size());
+
return ret;
+
+
+
+
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public JSONObject getUsageValue(
+ Class extends AggregatedRecord, ?>> clz,
+ TemporalConstraint temporalConstraint, Filter applicant) throws Exception {
+
+
+ String currentScope = ScopeProvider.instance.get();
+
+ JsonArray startKey = JsonArray.create();
+ startKey.add(currentScope);
+
+ JsonArray endKey = JsonArray.create();
+ endKey.add(currentScope);
+
+
+ AggregationMode aggregationMode = temporalConstraint
+ .getAggregationMode();
+
+ JsonArray temporalStartKey = getRangeKey(
+ temporalConstraint.getStartTime(),
+ aggregationMode, false, false);
+
+ JsonArray temporalEndKey = getRangeKey(
+ temporalConstraint.getEndTime(),
+ aggregationMode, false, false);
+
+ startKey.add(applicant.getValue());
+
+
+ for (Object temporal: temporalStartKey.toList()){
+ if (!temporal.toString().isEmpty())
+ startKey.add(temporal);
+ }
+
+ endKey.add(applicant.getValue());
+ int count =1;
+ for (Object temporal: temporalEndKey.toList()){
+ if (!temporal.toString().isEmpty()){
+ //couchbase exclude last value
+ if (count==temporalEndKey.size())
+ temporal=(int)temporal+1;
+ endKey.add(temporal);
+ }
+ count++;
+ }
+
+ // +1 because mode start from 0
+ // +1 because have afilter value from 1
+ // +1 because of scope at the beginning
+ int scopeDateGroupLevel = aggregationMode.ordinal() + 1 + 1 +1;
+ int groupLevel = scopeDateGroupLevel;
+
+ String designDocId = getDesignDocId(clz);
+
+ String viewName = applicant.getKey();
+
+ ViewQuery query = ViewQuery.from(designDocId, viewName);
+
+ query.inclusiveEnd();
+ query.groupLevel(groupLevel);
+ query.startKey(startKey);
+ query.endKey(endKey);
+ query.descending(false);
+
+ logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ + "Group Level : {}, Start Key : {}, End Key : {},"
+ + "temporalStartKey :{}, temporalEndKey :{}",
+ clz.getSimpleName(),designDocId, viewName, groupLevel, startKey, endKey,temporalStartKey.toString(), temporalEndKey.toString());
+
+ ViewResult viewResult;
+ try {
+ //execute query in a specify bucket
+ viewResult = connectionMap.get(clz.getSimpleName()).query(query);
+
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ throw e;
+ }
+
+ Map map = new HashMap();
+
+ for (ViewRow row : viewResult) {
+
+ JsonObject jsnobject = (JsonObject) row.value();
+ JSONObject objJson = new JSONObject(jsnobject.toString());
+
+ Iterator> iterateJosn = objJson.keys();
+ while( iterateJosn.hasNext() ) {
+ String key = (String)iterateJosn.next();
+ Float valuetmp=Float.parseFloat(objJson.get(key).toString());
+
+ if (key.equals("operationCount") || key.equals("dataVolume")){
+ if (map.containsKey(key)) {
+ map.put(key, valuetmp + map.get(key));
+ }
+ else
+ map.put(key, valuetmp);
+ }
+ }
+ }
+ JSONObject result= new JSONObject(map);
+ return result;
+ }
+
+
+
}
diff --git a/src/main/resources/logback-test.xml b/src/main/resources/logback-test.xml
new file mode 100644
index 0000000..4f36cc8
--- /dev/null
+++ b/src/main/resources/logback-test.xml
@@ -0,0 +1,16 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{0}: %msg%n
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file