Alessandro Pieve 8 years ago
parent 8bd729d2b6
commit 75b9d24451

@ -24,6 +24,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Filters;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
@ -41,6 +42,7 @@ import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
@ -86,6 +88,7 @@ AccountingPersistenceBackendQuery {
public static final String BUCKET_JOB_NAME_PROPERTY_KEY="AggregatedJobUsageRecord";
public static final String BUCKET_TASK_NAME_PROPERTY_KEY="AggregatedTaskUsageRecord";
public static final String SEPARATOR_DESIGN_DOC="_";
public static final long ENV_TIME_OUT=180000;
/* The environment configuration */
@ -340,16 +343,32 @@ AccountingPersistenceBackendQuery {
protected String getDesignDocId(
Class<? extends AggregatedRecord<?,?>> recordClass)
throws InstantiationException, IllegalAccessException {
/*
String getDesigndocid=String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
.newInstance().getRecordType());
logger.debug("use a designDocID"+getDesigndocid);
*/
//logger.debug("use a designDocID:{}",getDesigndocid);
return String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
.newInstance().getRecordType());
}
protected String getDesignDocIdSpecific(
Class<? extends AggregatedRecord<?,?>> recordClass,
Collection<String> keys)
throws InstantiationException, IllegalAccessException {
String getDesigndocId=String.format("%s%s", MAP_REDUCE__DESIGN, recordClass
.newInstance().getRecordType());
String specific="all";
if (!keys.isEmpty()){
specific = keys.iterator().next();
}
String getDesigndocIdSpecific=getDesigndocId+SEPARATOR_DESIGN_DOC+specific;
logger.debug("use a designDocIDSpecific:{}",getDesigndocIdSpecific);
return getDesigndocIdSpecific;
}
public static String getMapReduceFunctionName(
Collection<String> collection) {
String reduceFunction = MAP_REDUCE_ALL;
@ -444,6 +463,7 @@ AccountingPersistenceBackendQuery {
}
String designDocId = getDesignDocId(clz);
String designDocIdNew = getDesignDocIdSpecific(clz,keys);
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
@ -583,7 +603,7 @@ AccountingPersistenceBackendQuery {
protected String getQualifiedProperty(String property){
//DEVELOPING
return (property);
}
//Use for property into a specify bucket
@ -612,14 +632,14 @@ AccountingPersistenceBackendQuery {
/*
selectExpressions.add(x("SUM(" + getSpecializedProperty(clz,orderingProperty) + ")").
as(orderingProperty));
*/
*/
//add select expression and check if exist
selectExpressions.add(x("SUM(CASE WHEN " + getSpecializedProperty(clz,orderingProperty) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,orderingProperty)+" ELSE 1 END )").
as(orderingProperty));
//selectExpressions.add(x(getSpecializedProperty(clz,key)).as(key));
selectExpressions.add(x("(CASE WHEN " + getSpecializedProperty(clz,key) +
" IS NOT NULL THEN "+getSpecializedProperty(clz,key)+" ELSE 'UNKNOWN' END )").as(key));
@ -906,6 +926,139 @@ AccountingPersistenceBackendQuery {
return result;
}
@Override
public List<Filters> getUsageValueQuota(
Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint,
List<Filters> filterPackageQuota) throws Exception {
String currentScope = ScopeProvider.instance.get();
AggregationMode aggregationMode = temporalConstraint
.getAggregationMode();
JsonArray temporalStartKey = getRangeKey(
temporalConstraint.getStartTime(),
aggregationMode, false, false);
JsonArray temporalEndKey = getRangeKey(
temporalConstraint.getEndTime(),
aggregationMode, false, false);
String designDocId = getDesignDocId(clz);
for (Filters singleFilter:filterPackageQuota){
String viewNameTmp=null;
JsonArray startKeyTmp=JsonArray.create();
startKeyTmp.add(currentScope);
JsonArray endKeyTmp=JsonArray.create();
endKeyTmp.add(currentScope);
int groupLevelTmp= 2;
for (Filter filter:singleFilter.getFilters()){
if (groupLevelTmp==2)
viewNameTmp=filter.getKey();
else
viewNameTmp=viewNameTmp+"__"+filter.getKey();
startKeyTmp.add(filter.getValue());
endKeyTmp.add(filter.getValue());
groupLevelTmp++;
}
for (Object temporal: temporalStartKey.toList()){
if (!temporal.toString().isEmpty())
startKeyTmp.add(temporal);
}
int count =1;
for (Object temporal: temporalEndKey.toList()){
if (!temporal.toString().isEmpty()){
//couchbase exclude last value
if (count==temporalEndKey.size())
temporal=(int)temporal+1;
endKeyTmp.add(temporal);
}
count++;
}
ViewQuery query = ViewQuery.from(designDocId, viewNameTmp);
query.inclusiveEnd();
query.groupLevel(groupLevelTmp);
query.startKey(startKeyTmp);
query.endKey(endKeyTmp);
query.descending(false);
logger.trace("Bucket :{}, Design Doc ID : {}, View Name : {}, "
+ "Group Level : {}, Start Key : {}, End Key : {},"
+ "temporalStartKey :{}, temporalEndKey :{}",
clz.getSimpleName(),designDocId, viewNameTmp, groupLevelTmp, startKeyTmp, endKeyTmp,temporalStartKey.toString(), temporalEndKey.toString());
ViewResult viewResult;
try {
//execute query in a specify bucket
viewResult = connectionMap.get(clz.getSimpleName()).query(query);
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
throw e;
}
Map<String, Float> map = new HashMap<String, Float>();
for (ViewRow row : viewResult) {
JsonObject jsnobject = (JsonObject) row.value();
JSONObject objJson = new JSONObject(jsnobject.toString());
Iterator<?> iterateJosn = objJson.keys();
while( iterateJosn.hasNext() ) {
String key = (String)iterateJosn.next();
Float valuetmp=Float.parseFloat(objJson.get(key).toString());
if (key.equals("operationCount") || key.equals("dataVolume")){
if (map.containsKey(key)) {
map.put(key, valuetmp + map.get(key));
// TODO verify a better method
if (designDocId.equals("StorageUsageRecord")){
if (key.equals("dataVolume")){
singleFilter.setOrderingProperty(key);
singleFilter.setD(singleFilter.getD()+valuetmp.doubleValue());
}
}
else{
singleFilter.setOrderingProperty(key);
singleFilter.setD(singleFilter.getD()+valuetmp.doubleValue());
}
}
else{
map.put(key, valuetmp);
// TODO verify a better method
if (designDocId.equals("StorageUsageRecord")){
if (key.equals("dataVolume")){
singleFilter.setOrderingProperty(key);
singleFilter.setD(valuetmp.doubleValue());
}
}
else{
singleFilter.setOrderingProperty(key);
singleFilter.setD(valuetmp.doubleValue());
}
}
}
}
}
}
return filterPackageQuota;
}
}

@ -14,6 +14,7 @@ import java.util.SortedMap;
import java.util.SortedSet;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Filters;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
@ -22,8 +23,10 @@ import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQu
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
import org.gcube.accounting.datamodel.aggregation.AggregatedJobUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
import org.gcube.common.scope.api.ScopeProvider;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.After;
@ -122,13 +125,114 @@ public class AccountingPersistenceQueryCouchBaseTest {
clz, temporalConstraint, filters,
AggregatedJobUsageRecord.CONSUMER_ID, null);
JobUsageRecord record =new JobUsageRecord();
Set<String> result=record.getRequiredFields();
logger.debug("result"+result.toString());
logger.debug("Result final{}", set);
}
@Test
public void testTimeSeries() throws Exception{
Calendar startTime = Calendar.getInstance();
startTime.set(2015, Calendar.AUGUST, 20);
Calendar endTime = Calendar.getInstance();
endTime.set(2016, Calendar.SEPTEMBER, 29,23,59);
List<Filter> filters = new ArrayList<Filter>();
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
Class<AggregatedJobUsageRecord> clz =
AggregatedJobUsageRecord.class;
SortedMap<Calendar, Info> set =
accountingPersistenceQueryCouchBase.getTimeSeries(
clz, temporalConstraint, filters);
logger.debug("Result final{}", set);
}
@Test
public void getUsageValue() throws Exception{
Calendar startTime = Calendar.getInstance();
startTime.set(2015, Calendar.MAY, 1);
Calendar endTime = Calendar.getInstance();
ScopeProvider.instance.set("/gcube");
Filter filter =
new Filter(AggregatedServiceUsageRecord.CONSUMER_ID, "alessandro.pieve");
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
JSONObject filterValue =
accountingPersistenceQueryCouchBase.getUsageValue(AggregatedServiceUsageRecord.class,
temporalConstraint, filter);
logger.info("result:"+filterValue.toString());
}
@Test
public void getUsageValueQuota() throws Exception{
Calendar startTime = Calendar.getInstance();
startTime.set(2015, Calendar.MAY, 1);
Calendar endTime = Calendar.getInstance();
ScopeProvider.instance.set("/gcube");
TemporalConstraint temporalConstraint =
new TemporalConstraint(startTime.getTimeInMillis(),
endTime.getTimeInMillis(), AggregationMode.DAILY);
List<Filters> filterPackageQuota =new ArrayList<Filters>();
Filters simpleFilter = new Filters();
List<Filter> filters=new ArrayList<Filter>();
filters.add(new Filter(AggregatedServiceUsageRecord.CONSUMER_ID, "alessandro.pieve"));
filters.add(new Filter("serviceClass", "DataAccess"));
filters.add(new Filter("serviceName", "CkanConnector"));
simpleFilter.setFilters(filters);
filterPackageQuota.add(simpleFilter);
Filters simpleFilter1 = new Filters();
List<Filter> filters1=new ArrayList<Filter>();
filters1.add(new Filter(AggregatedServiceUsageRecord.CONSUMER_ID, "alessandro.pieve"));
filters1.add(new Filter("serviceClass", "VREManagement"));
simpleFilter1.setFilters(filters1);
filterPackageQuota.add(simpleFilter1);
logger.info("filterPackageQuota:"+filterPackageQuota);
List<Filters> filterValue =
accountingPersistenceQueryCouchBase.getUsageValueQuota(AggregatedServiceUsageRecord.class, temporalConstraint, filterPackageQuota);
logger.info("result:"+filterValue.toString());
}
@Test
public void getQuerableKeyJob() throws Exception{
SortedSet<String> keys;
@ -141,7 +245,7 @@ public class AccountingPersistenceQueryCouchBaseTest {
}
logger.debug("List FilterKeys:" + keys.toString());
}
@Test
public void testTopService() throws Exception {
Calendar startTime = Calendar.getInstance();
@ -179,7 +283,7 @@ public class AccountingPersistenceQueryCouchBaseTest {
}
logger.debug("List FilterKeys:" + keys.toString());
}
public static SortedMap<Calendar, Info> padMap(
SortedMap<Calendar, Info> unpaddedData,
TemporalConstraint temporalConstraint) throws Exception {