accounting-analytics/src/main/java/org/gcube/accounting/analytics/persistence/AccountingPersistenceQuery....

344 lines
10 KiB
Java

/**
*
*/
package org.gcube.accounting.analytics.persistence;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.activity.InvalidActivityException;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.KeyException;
import org.gcube.accounting.analytics.exception.ValueException;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractStorageUsageRecord.DataType;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.com.fasterxml.jackson.databind.ObjectMapper;
import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class AccountingPersistenceQuery implements AccountingPersistenceBackendQuery {
private static final AccountingPersistenceQuery accountingPersistenceQuery;
public static final int DEFAULT_LIMIT_RESULT_NUMBER = 5;
static {
try {
accountingPersistenceQuery = new AccountingPersistenceQuery();
}catch (Exception e) {
throw new RuntimeException(e);
}
}
protected static synchronized AccountingPersistenceQuery getInstance() {
return accountingPersistenceQuery;
}
protected Class<? extends AggregatedRecord<?, ?>> clz;
protected TemporalConstraint temporalConstraint;
protected Set<String> contexts;
protected Set<Filter> filters;
protected ObjectMapper objectMapper;
protected AccountingPersistenceBackendQuery accountingPersistenceBackendQuery;
private AccountingPersistenceQuery() throws Exception {
accountingPersistenceBackendQuery = AccountingPersistenceBackendQueryFactory.getInstance();
objectMapper = new ObjectMapper();
}
@Override
public void setRequestedRecords(Class<? extends AggregatedRecord<?, ?>> clz) {
this.clz = clz;
accountingPersistenceBackendQuery.setRequestedRecords(clz);
}
@Override
public void setTemporalConstraint(TemporalConstraint temporalConstraint) {
this.temporalConstraint = temporalConstraint;
accountingPersistenceBackendQuery.setTemporalConstraint(temporalConstraint);
}
@Override
public void setContexts(Set<String> contexts) {
this.contexts = contexts;
accountingPersistenceBackendQuery.setContexts(this.contexts);
}
@Override
public void setFilters(Set<Filter> filters) {
this.filters = filters;
accountingPersistenceBackendQuery.setFilters(filters);
}
public static SortedSet<String> getQuerableKeys(Class<? extends AggregatedRecord<?, ?>> clz) throws Exception {
AggregatedRecord<?, ?> instance = clz.newInstance();
// limit filter key for accounting storage status (used from portlet
// accounting for tad space)
if (clz.equals(AggregatedStorageStatusRecord.class)) {
SortedSet<String> storageStatus = new TreeSet<>();
storageStatus.add(AggregatedStorageStatusRecord.CONSUMER_ID);
// storageStatus.add(AggregatedStorageStatusRecord.DATA_SERVICEID);
return storageStatus;
} else {
return instance.getQuerableKeys();
}
}
public static String getDefaultOrderingProperties(Class<? extends AggregatedRecord<?, ?>> clz) {
if (clz.isAssignableFrom(AggregatedStorageUsageRecord.class)) {
return AggregatedStorageUsageRecord.DATA_VOLUME;
}
return AggregatedRecord.OPERATION_COUNT;
}
protected JsonNode getPaddingJsonNode(Map<Calendar, Info> unpaddedResults) throws Exception {
ObjectNode objectNode = objectMapper.createObjectNode();
// verify data consistency
if (unpaddedResults.size() != 0) {
Info auxInfo = new ArrayList<Info>(unpaddedResults.values()).get(0);
JsonNode auxJsonNode = auxInfo.getValue();
Iterator<String> keys = auxJsonNode.fieldNames();
while (keys.hasNext()) {
String key = keys.next();
objectNode.put(key, 0);
}
}
return objectNode;
}
/**
* Pad the data
*
* @param unpaddedData
* the data to be pad
* @param temporalConstraint
* temporalConstraint the temporal interval and the granularity
* of the data to pad
* @return the data padded taking in account the TemporalConstraint
* @throws Exception
* if fails
*/
protected SortedMap<Calendar, Info> padMap(SortedMap<Calendar, Info> unpaddedData) throws Exception {
JsonNode jsonNode = getPaddingJsonNode(unpaddedData);
SortedSet<Calendar> sequence = temporalConstraint.getCalendarSequence();
for (Calendar progressTime : sequence) {
Info info = unpaddedData.get(progressTime);
if (info == null) {
info = new Info(progressTime, jsonNode);
unpaddedData.put(progressTime, info);
}
}
return unpaddedData;
}
@Override
public SortedMap<Calendar, Info> getTimeSeries()
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
return this.getTimeSeries(true);
}
public SortedMap<Calendar, Info> getTimeSeries(boolean pad)
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
SortedMap<Calendar, Info> ret = accountingPersistenceBackendQuery.getTimeSeries();
if (ret == null) {
ret = new TreeMap<>();
}
if (pad) {
ret = padMap(ret);
}
return ret;
}
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(String topKey, String orderingProperty, boolean pad, int limit)
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
SortedMap<NumberedFilter, SortedMap<Calendar, Info>> got;
if (orderingProperty == null) {
orderingProperty = getDefaultOrderingProperties(clz);
}
got = accountingPersistenceBackendQuery.getTopValues(topKey, orderingProperty);
int count = got.size() > limit ? limit : got.size();
NumberedFilter firstRemovalKey = null;
for (NumberedFilter nf : got.keySet()) {
if (--count >= 0 || limit <= 0) {
if (pad) {
padMap(got.get(nf));
}
} else {
if (firstRemovalKey == null) {
firstRemovalKey = nf;
} else {
break;
}
}
}
if (firstRemovalKey != null) {
return got.subMap(got.firstKey(), firstRemovalKey);
}
return got;
}
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(String topKey) throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
String orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz);
return this.getTopValues(topKey, orderingProperty, false, 0);
}
@Override
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(String topKey, String orderingProperty)
throws Exception {
return this.getTopValues(topKey, orderingProperty, false, 0);
}
@Override
public void close() throws Exception {
AccountingPersistenceBackendQueryFactory.getInstance().close();
}
@Override
public void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception {
throw new InvalidActivityException();
}
@Override
public SortedSet<NumberedFilter> getFilterValues(String key) throws Exception {
return getFilterValues(key, null);
}
@Override
public SortedSet<NumberedFilter> getFilterValues(String key, Integer limit) throws Exception {
return accountingPersistenceBackendQuery.getFilterValues(key, limit);
}
@Override
public List<UsageValue> getUsageValueQuotaTotal(List<UsageValue> listUsage) throws Exception {
return accountingPersistenceBackendQuery.getUsageValueQuotaTotal(listUsage);
}
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries() throws Exception {
return getContextTimeSeries(true);
}
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries(boolean pad) throws DuplicatedKeyFilterException, Exception {
SortedMap<Filter, SortedMap<Calendar, Info>> got;
got = accountingPersistenceBackendQuery.getContextTimeSeries();
int count = got.size();
Filter firstRemovalKey = null;
for (Filter nf : got.keySet()) {
if (--count >= 0) {
if (pad) {
padMap(got.get(nf));
}
} else {
if (firstRemovalKey == null) {
firstRemovalKey = nf;
} else {
break;
}
}
}
if (firstRemovalKey != null) {
return got.subMap(got.firstKey(), firstRemovalKey);
}
return got;
}
@Override
public Record getRecord(String recordId, String type) throws Exception {
Record record = AccountingPersistenceBackendQueryFactory.getInstance().getRecord(recordId, type);
return record;
}
public SortedSet<String> getDataType() throws Exception {
SortedSet<String> dataTypes = new TreeSet<String>();
for(DataType dataType : DataType.values()) {
dataTypes.add(dataType.name());
}
return dataTypes;
}
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getSpaceTimeSeries(Set<String> dataTypes) throws Exception {
SortedMap<Filter, SortedMap<Calendar, Info>> spaceTimeSeries = accountingPersistenceBackendQuery.getSpaceTimeSeries(dataTypes);
int count = spaceTimeSeries.size();
Filter firstRemovalKey = null;
for (Filter nf : spaceTimeSeries.keySet()) {
if (--count >= 0) {
padMapStorage(spaceTimeSeries.get(nf));
} else {
if (firstRemovalKey == null) {
firstRemovalKey = nf;
} else {
break;
}
}
}
if (firstRemovalKey != null) {
return spaceTimeSeries.subMap(spaceTimeSeries.firstKey(), firstRemovalKey);
}
return spaceTimeSeries;
}
protected SortedMap<Calendar, Info> padMapStorage(SortedMap<Calendar, Info> unpaddedData) throws Exception {
SortedSet<Calendar> sequence = temporalConstraint.getCalendarSequence();
Info previuosInfo = null;
for (Calendar progressTime : sequence) {
Info info = unpaddedData.get(progressTime);
if (info == null) {
unpaddedData.put(progressTime, previuosInfo);
} else {
previuosInfo = info;
}
}
return unpaddedData;
}
@Override
public boolean isConnectionActive() throws Exception {
return AccountingPersistenceBackendQueryFactory.getInstance().isConnectionActive();
}
}