/** * */ package org.gcube.accounting.analytics.persistence.postgresql; import java.io.Serializable; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.time.OffsetDateTime; import java.util.Calendar; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; import org.gcube.accounting.analytics.NumberedFilter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.UsageValue; import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; import org.gcube.accounting.analytics.exception.KeyException; import org.gcube.accounting.analytics.exception.ValueException; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedStorageStatusRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.utility.postgresql.RecordToDBConnection; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public class AccountingPersistenceQueryPostgreSQL implements AccountingPersistenceBackendQuery { protected static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryPostgreSQL.class); public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z"; public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; // private String baseURL; protected AccountingPersistenceBackendQueryConfiguration configuration; static { // One Record per package is enough RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); } protected Connection getConnection(Class> clz) throws Exception { RecordToDBConnection recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz); if(recordDBInfo == null) { RecordToDBMapping.addRecordToDB(clz, configuration); recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz); } return recordDBInfo.getConnection(); } @Override public void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { this.configuration = configuration; Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); for(String typeName : aggregatedRecords.keySet()) { try { Class> clz = aggregatedRecords.get(typeName); RecordToDBMapping.addRecordToDB(clz, configuration); } catch (Exception e) { new RuntimeException(e); } } } protected SortedMap getTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters, Set contexts) throws Exception { Connection connection = getConnection(clz); try { Statement statement = connection.createStatement(); SortedMap result = new TreeMap<>(); Query query = new Query(clz); query.setTemporalConstraint(temporalConstraint); query.setFilters(filters); query.setContexts(contexts); String sql = query.getTimeSeriesQuery(); List requestedTableField = query.getRequestedTableField(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { OffsetDateTime offsetDateTime = resultSet.getObject(Query.DATE_OF_TIMESERIES_AS_FIELD, OffsetDateTime.class); Calendar calendar = getCalendar(offsetDateTime); JSONObject jsonObject = new JSONObject(); for(String tableField : requestedTableField) { String usageRecordField = recordToDBMapper.getRecordField(tableField); Object object = resultSet.getObject(tableField); jsonObject.put(usageRecordField, object); } Info info = new Info(calendar, jsonObject); result.put(calendar, info); } return result; }finally { connection.close(); } } @Override public SortedMap getTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters) throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { String context = AccountingPersistenceBackendQuery.getScopeToQuery(); Set contexts = new HashSet<>(); contexts.add(context); return getTimeSeries(clz, temporalConstraint, filters, contexts); } protected Calendar getCalendar(OffsetDateTime offsetDateTime) { Calendar calendar = Calendar.getInstance(); long epochMillis = offsetDateTime.toInstant().toEpochMilli(); calendar.setTimeInMillis(epochMillis); return calendar; } @Override public SortedMap getNoContextTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters) throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { return getTimeSeries(clz, temporalConstraint, filters, null); } @Override public SortedMap> getContextTimeSeries( Class> clz, TemporalConstraint temporalConstraint, List filters, List contexts) throws Exception { SortedMap> ret = new TreeMap<>(); for(String context : contexts) { Filter contextFilter = new Filter("context", context); Set timeSeriesContexts = new HashSet<>(); timeSeriesContexts.add(context); SortedMap timeSeries = getTimeSeries(clz, temporalConstraint, filters, timeSeriesContexts); if(!timeSeries.isEmpty()) { ret.put(contextFilter, timeSeries); } } return ret; } protected SortedSet getNumberedValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key, String orderingProperty, Integer limit) throws Exception { Connection connection = getConnection(clz); try { Statement statement = connection.createStatement(); if(orderingProperty == null) { orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz); } if(limit == null) { limit = 50; } SortedSet result = new TreeSet<>(); String context = AccountingPersistenceBackendQuery.getScopeToQuery(); Set contexts = new HashSet<>(); contexts.add(context); Query query = new Query(clz); query.setTemporalConstraint(temporalConstraint); query.setFilters(filters); query.setContexts(contexts); query.setTableFieldToRequest(key); query.setOrderByField(orderingProperty); query.setLimit(limit); String sql = query.getNextPossibleValueQuery(); // List requestedTableField = query.getRequestedTableField(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { String tableFieldKey = recordToDBMapper.getTableField(key); Object value = resultSet.getObject(tableFieldKey); Object numberObject = resultSet.getObject(orderingProperty); NumberedFilter numberedFilter = new NumberedFilter(key, value.toString(), (Number) numberObject, orderingProperty); result.add(numberedFilter); } return result; }finally { connection.close(); } } @Override public SortedSet getFilterValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key) throws Exception { return getNumberedValues(clz, temporalConstraint, filters, key, null, null); } @Override public SortedSet getFilterValues(Class> clz, TemporalConstraint temporalConstraint, List filters, String key, Integer limit) throws Exception { return getNumberedValues(clz, temporalConstraint, filters, key, null, limit); } @Override public SortedMap> getTopValues( Class> clz, TemporalConstraint temporalConstraint, List filters, String topKey, String orderingProperty) throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { String context = AccountingPersistenceBackendQuery.getScopeToQuery(); Set contexts = new HashSet<>(); contexts.add(context); SortedMap> ret = new TreeMap<>(); SortedSet top = getNumberedValues(clz, temporalConstraint, filters, topKey, orderingProperty, 10); for(NumberedFilter numberedFilter : top) { filters.add(numberedFilter); SortedMap map = getTimeSeries(clz, temporalConstraint, filters, contexts); ret.put(numberedFilter, map); filters.remove(numberedFilter); } return ret; } @Override public Record getRecord(String recordId, String type) throws Exception { Class> aggregatedRecordClass = RecordUtility.getAggregatedRecordClass(type); Connection connection = getConnection(aggregatedRecordClass); try { Statement statement = connection.createStatement(); Query query = new Query(aggregatedRecordClass); query.setRecordId("fa573711-ceb6-44ba-9c83-bd47e0915b80"); String sql = query.getRecordQuery(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); ResultSet resultSet = statement.executeQuery(sql); resultSet.next(); AggregatedRecord instance = aggregatedRecordClass.newInstance(); Set requiredFields = instance.getRequiredFields(); for(String recordField : requiredFields) { String tableField = recordToDBMapper.getTableField(recordField); Serializable serializable; switch (recordField) { case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME: OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class); Calendar calendar = getCalendar(offsetDateTime); serializable = calendar.getTimeInMillis(); break; default: serializable = resultSet.getObject(tableField).toString(); break; } instance.setResourceProperty(recordField, serializable); } return instance; }finally { connection.close(); } } @Override public SortedSet getSpaceProvidersIds() throws Exception { Class> aggregatedRecordClass = AggregatedStorageStatusRecord.class; Connection connection = getConnection(aggregatedRecordClass); try { Statement statement = connection.createStatement(); Query query = new Query(aggregatedRecordClass); query.setTableFieldToRequest(AggregatedStorageStatusRecord.PROVIDER_ID); String sql = query.getDinstinctValuesQuery(); SortedSet providersIds = new TreeSet<>(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { String id = resultSet.getString(1); providersIds.add(id); } return providersIds; }finally { connection.close(); } } @Override public List getUsageValueQuotaTotal(List listUsage) throws Exception { return null; } @Override public SortedMap> getSpaceTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters, List providersId) throws Exception { return null; } @Override public void close() throws Exception { // OK } @Override public boolean isConnectionActive() throws Exception { return true; } }