accounting-analytics-persis.../src/main/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryP...

356 lines
13 KiB
Java
Raw Normal View History

2021-03-15 16:58:07 +01:00
/**
*
*/
package org.gcube.accounting.analytics.persistence.postgresql;
2021-03-22 18:08:35 +01:00
import java.io.Serializable;
2021-03-16 15:33:34 +01:00
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.time.OffsetDateTime;
2021-03-15 16:58:07 +01:00
import java.util.Calendar;
2021-03-17 17:55:56 +01:00
import java.util.HashSet;
2021-03-15 16:58:07 +01:00
import java.util.List;
import java.util.Map;
2021-03-17 17:55:56 +01:00
import java.util.Set;
2021-03-15 16:58:07 +01:00
import java.util.SortedMap;
import java.util.SortedSet;
2021-03-16 15:33:34 +01:00
import java.util.TreeMap;
2021-03-19 15:55:06 +01:00
import java.util.TreeSet;
2021-03-15 16:58:07 +01:00
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.KeyException;
import org.gcube.accounting.analytics.exception.ValueException;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
2021-03-19 15:55:06 +01:00
import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery;
2021-03-15 16:58:07 +01:00
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
2021-03-22 19:08:31 +01:00
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord;
2021-03-15 16:58:07 +01:00
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
2021-03-18 10:55:07 +01:00
import org.gcube.accounting.utility.postgresql.RecordToDBConnection;
2021-03-19 15:55:06 +01:00
import org.gcube.accounting.utility.postgresql.RecordToDBFields;
2021-03-18 10:55:07 +01:00
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
2021-03-15 16:58:07 +01:00
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
2021-03-15 16:58:07 +01:00
import org.gcube.documentstore.records.RecordUtility;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class AccountingPersistenceQueryPostgreSQL implements AccountingPersistenceBackendQuery {
2021-03-18 10:55:07 +01:00
protected static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryPostgreSQL.class);
2021-03-15 16:58:07 +01:00
2021-03-16 10:37:52 +01:00
public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
2021-03-19 15:55:06 +01:00
2021-03-15 16:58:07 +01:00
public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY;
2021-03-17 15:44:50 +01:00
// private String baseURL;
2021-03-15 16:58:07 +01:00
protected AccountingPersistenceBackendQueryConfiguration configuration;
static {
// One Record per package is enough
RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage());
}
2021-03-16 18:06:38 +01:00
protected Connection getConnection(Class<? extends AggregatedRecord<?, ?>> clz) throws Exception {
2021-03-18 10:55:07 +01:00
RecordToDBConnection recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz);
if(recordDBInfo == null) {
RecordToDBMapping.addRecordToDB(clz, configuration);
2021-03-18 13:34:02 +01:00
recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz);
2021-03-17 15:44:50 +01:00
}
2021-03-18 10:55:07 +01:00
return recordDBInfo.getConnection();
2021-03-16 18:06:38 +01:00
}
2021-03-15 16:58:07 +01:00
@Override
public void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception {
this.configuration = configuration;
2021-03-18 10:55:07 +01:00
Map<String, Class<? extends AggregatedRecord<?,?>>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound();
for(String typeName : aggregatedRecords.keySet()) {
try {
Class<? extends AggregatedRecord<?,?>> clz = aggregatedRecords.get(typeName);
2021-03-18 17:05:33 +01:00
RecordToDBMapping.addRecordToDB(clz, configuration);
2021-03-18 10:55:07 +01:00
} catch (Exception e) {
new RuntimeException(e);
}
}
2021-03-15 16:58:07 +01:00
}
2021-03-16 10:37:52 +01:00
2021-03-19 15:55:06 +01:00
protected SortedMap<Calendar, Info> getTimeSeries(Class<? extends AggregatedRecord<?, ?>> clz,
2021-03-17 17:55:56 +01:00
TemporalConstraint temporalConstraint, List<Filter> filters, Set<String> contexts)
throws Exception {
Connection connection = getConnection(clz);
try {
Statement statement = connection.createStatement();
SortedMap<Calendar, Info> result = new TreeMap<>();
Query query = new Query(clz);
query.setTemporalConstraint(temporalConstraint);
query.setFilters(filters);
query.setContexts(contexts);
String sql = query.getTimeSeriesQuery();
List<String> requestedTableField = query.getRequestedTableField();
2021-03-18 10:55:07 +01:00
RecordToDBFields recordToDBMapper = query.getRecordToDBMapper();
2021-03-17 17:55:56 +01:00
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
2021-03-30 22:13:28 +02:00
String tableFieldName = recordToDBMapper.getTableField(Query.DATE_OF_TIMESERIES_AS_FIELD);
OffsetDateTime offsetDateTime = resultSet.getObject(tableFieldName, OffsetDateTime.class);
2021-03-17 17:55:56 +01:00
Calendar calendar = getCalendar(offsetDateTime);
JSONObject jsonObject = new JSONObject();
for(String tableField : requestedTableField) {
2021-03-18 10:55:07 +01:00
String usageRecordField = recordToDBMapper.getRecordField(tableField);
2021-03-17 17:55:56 +01:00
Object object = resultSet.getObject(tableField);
jsonObject.put(usageRecordField, object);
}
Info info = new Info(calendar, jsonObject);
result.put(calendar, info);
}
return result;
}finally {
connection.close();
}
}
2021-03-15 16:58:07 +01:00
@Override
public SortedMap<Calendar, Info> getTimeSeries(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters)
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
2021-03-22 11:53:12 +01:00
String context = AccountingPersistenceBackendQuery.getScopeToQuery();
2021-03-17 17:55:56 +01:00
Set<String> contexts = new HashSet<>();
2021-03-22 11:53:12 +01:00
contexts.add(context);
2021-03-17 17:55:56 +01:00
return getTimeSeries(clz, temporalConstraint, filters, contexts);
2021-03-15 16:58:07 +01:00
}
2021-03-16 10:37:52 +01:00
2021-03-16 15:33:34 +01:00
protected Calendar getCalendar(OffsetDateTime offsetDateTime) {
Calendar calendar = Calendar.getInstance();
long epochMillis = offsetDateTime.toInstant().toEpochMilli();
calendar.setTimeInMillis(epochMillis);
return calendar;
2021-03-16 10:37:52 +01:00
}
2021-03-16 15:33:34 +01:00
@Override
public SortedMap<Calendar, Info> getNoContextTimeSeries(Class<? extends AggregatedRecord<?, ?>> clz,
2021-03-16 10:37:52 +01:00
TemporalConstraint temporalConstraint, List<Filter> filters)
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
2021-03-17 17:55:56 +01:00
return getTimeSeries(clz, temporalConstraint, filters, null);
2021-03-15 16:58:07 +01:00
}
2021-03-22 11:53:12 +01:00
@Override
public SortedMap<Filter, SortedMap<Calendar, Info>> getContextTimeSeries(
Class<? extends AggregatedRecord<?, ?>> clz, TemporalConstraint temporalConstraint, List<Filter> filters,
List<String> contexts) throws Exception {
SortedMap<Filter,SortedMap<Calendar,Info>> ret = new TreeMap<>();
for(String context : contexts) {
Filter contextFilter = new Filter("context", context);
Set<String> timeSeriesContexts = new HashSet<>();
timeSeriesContexts.add(context);
SortedMap<Calendar, Info> timeSeries = getTimeSeries(clz, temporalConstraint, filters, timeSeriesContexts);
if(!timeSeries.isEmpty()) {
ret.put(contextFilter, timeSeries);
}
}
return ret;
}
2021-03-19 15:55:06 +01:00
protected SortedSet<NumberedFilter> getNumberedValues(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters, String key,
String orderingProperty, Integer limit) throws Exception {
Connection connection = getConnection(clz);
try {
Statement statement = connection.createStatement();
if(orderingProperty == null) {
orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz);
}
if(limit == null) {
limit = 50;
}
SortedSet<NumberedFilter> result = new TreeSet<>();
2021-03-22 11:53:12 +01:00
String context = AccountingPersistenceBackendQuery.getScopeToQuery();
2021-03-19 15:55:06 +01:00
Set<String> contexts = new HashSet<>();
2021-03-22 11:53:12 +01:00
contexts.add(context);
2021-03-19 15:55:06 +01:00
Query query = new Query(clz);
query.setTemporalConstraint(temporalConstraint);
query.setFilters(filters);
query.setContexts(contexts);
2021-03-22 19:08:31 +01:00
query.setTableFieldToRequest(key);
2021-03-19 15:55:06 +01:00
query.setOrderByField(orderingProperty);
query.setLimit(limit);
String sql = query.getNextPossibleValueQuery();
// List<String> requestedTableField = query.getRequestedTableField();
RecordToDBFields recordToDBMapper = query.getRecordToDBMapper();
ResultSet resultSet = statement.executeQuery(sql);
2021-03-30 22:13:28 +02:00
String tableFieldORderingProperty = recordToDBMapper.getTableField(orderingProperty);
2021-03-19 15:55:06 +01:00
while (resultSet.next()) {
String tableFieldKey = recordToDBMapper.getTableField(key);
Object value = resultSet.getObject(tableFieldKey);
2021-03-30 22:13:28 +02:00
Object numberObject = resultSet.getObject(tableFieldORderingProperty);
2021-03-19 15:55:06 +01:00
NumberedFilter numberedFilter = new NumberedFilter(key, value.toString(), (Number) numberObject, orderingProperty);
result.add(numberedFilter);
}
return result;
}finally {
connection.close();
}
2021-03-15 16:58:07 +01:00
}
2021-03-17 17:55:56 +01:00
@Override
public SortedSet<NumberedFilter> getFilterValues(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters, String key) throws Exception {
2021-03-19 15:55:06 +01:00
return getNumberedValues(clz, temporalConstraint, filters, key, null, null);
2021-03-17 17:55:56 +01:00
}
2021-03-15 16:58:07 +01:00
@Override
2021-03-19 15:55:06 +01:00
public SortedSet<NumberedFilter> getFilterValues(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters, String key, Integer limit) throws Exception {
return getNumberedValues(clz, temporalConstraint, filters, key, null, limit);
}
2021-03-15 16:58:07 +01:00
@Override
2021-03-19 15:55:06 +01:00
public SortedMap<NumberedFilter, SortedMap<Calendar, Info>> getTopValues(
Class<? extends AggregatedRecord<?, ?>> clz, TemporalConstraint temporalConstraint, List<Filter> filters,
String topKey, String orderingProperty)
throws DuplicatedKeyFilterException, KeyException, ValueException, Exception {
2021-03-22 11:53:12 +01:00
String context = AccountingPersistenceBackendQuery.getScopeToQuery();
Set<String> contexts = new HashSet<>();
contexts.add(context);
SortedMap<NumberedFilter,SortedMap<Calendar,Info>> ret = new TreeMap<>();
2021-03-22 16:06:50 +01:00
SortedSet<NumberedFilter> top = getNumberedValues(clz, temporalConstraint, filters, topKey, orderingProperty, 10);
2021-03-22 11:53:12 +01:00
for(NumberedFilter numberedFilter : top) {
filters.add(numberedFilter);
SortedMap<Calendar,Info> map = getTimeSeries(clz, temporalConstraint, filters, contexts);
ret.put(numberedFilter, map);
filters.remove(numberedFilter);
}
return ret;
2021-03-15 16:58:07 +01:00
}
@Override
public Record getRecord(String recordId, String type) throws Exception {
2021-03-22 18:08:35 +01:00
Class<? extends AggregatedRecord<?, ?>> aggregatedRecordClass = RecordUtility.getAggregatedRecordClass(type);
Connection connection = getConnection(aggregatedRecordClass);
try {
Statement statement = connection.createStatement();
Query query = new Query(aggregatedRecordClass);
2021-03-30 22:13:28 +02:00
query.setRecordId(recordId);
2021-03-22 18:08:35 +01:00
String sql = query.getRecordQuery();
RecordToDBFields recordToDBMapper = query.getRecordToDBMapper();
ResultSet resultSet = statement.executeQuery(sql);
resultSet.next();
AggregatedRecord<?, ?> instance = aggregatedRecordClass.newInstance();
Set<String> requiredFields = instance.getRequiredFields();
for(String recordField : requiredFields) {
String tableField = recordToDBMapper.getTableField(recordField);
Serializable serializable;
switch (recordField) {
case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME:
OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class);
Calendar calendar = getCalendar(offsetDateTime);
serializable = calendar.getTimeInMillis();
break;
default:
serializable = resultSet.getObject(tableField).toString();
break;
}
instance.setResourceProperty(recordField, serializable);
}
return instance;
}finally {
connection.close();
}
2021-03-15 16:58:07 +01:00
}
2021-03-22 19:08:31 +01:00
@Override
public SortedSet<String> getSpaceProvidersIds() throws Exception {
Class<? extends AggregatedRecord<?, ?>> aggregatedRecordClass = AggregatedStorageStatusRecord.class;
Connection connection = getConnection(aggregatedRecordClass);
try {
Statement statement = connection.createStatement();
Query query = new Query(aggregatedRecordClass);
query.setTableFieldToRequest(AggregatedStorageStatusRecord.PROVIDER_ID);
String sql = query.getDinstinctValuesQuery();
SortedSet<String> providersIds = new TreeSet<>();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String id = resultSet.getString(1);
providersIds.add(id);
}
return providersIds;
}finally {
connection.close();
}
}
2021-03-22 18:29:39 +01:00
@Override
public List<UsageValue> getUsageValueQuotaTotal(List<UsageValue> listUsage) throws Exception {
return null;
}
2021-03-22 16:06:50 +01:00
@Override
public SortedMap<Filter, SortedMap<Calendar, Long>> getSpaceTimeSeries(Class<? extends AggregatedRecord<?, ?>> clz,
TemporalConstraint temporalConstraint, List<Filter> filters, List<String> providersId) throws Exception {
return null;
}
2021-03-19 15:55:06 +01:00
@Override
public void close() throws Exception {
// OK
}
2021-03-15 16:58:07 +01:00
@Override
public boolean isConnectionActive() throws Exception {
2021-03-17 15:44:50 +01:00
return true;
2021-03-15 16:58:07 +01:00
}
}