/** * */ package org.gcube.accounting.analytics.persistence.postgresql; import java.io.Serializable; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.time.OffsetDateTime; import java.util.Calendar; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; import org.gcube.accounting.analytics.NumberedFilter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.UsageValue; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.AccountingPersistenceQuery; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.utility.postgresql.RecordToDBConnection; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public class AccountingPersistenceQueryPostgreSQL implements AccountingPersistenceBackendQuery { protected static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceQueryPostgreSQL.class); public static final String URL_PROPERTY_KEY = AccountingPersistenceConfiguration.URL_PROPERTY_KEY; public static final int MAX_TOP_LIMIT = 50; protected AccountingPersistenceBackendQueryConfiguration configuration; protected ObjectMapper objectMapper; protected Class> clz; protected TemporalConstraint temporalConstraint; protected Set contexts; protected Collection filters; public AccountingPersistenceQueryPostgreSQL() { objectMapper = new ObjectMapper(); } @Override public void setRequestedRecords(Class> clz) { this.clz = clz; } @Override public void setTemporalConstraint(TemporalConstraint temporalConstraint) { this.temporalConstraint = temporalConstraint; } @Override public void setContexts(Set contexts) { this.contexts = contexts; } @SuppressWarnings("unchecked") @Override public void setFilters(Collection filters) { this.filters = (Collection) filters; } static { // One Record per package is enough RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); } protected Connection getConnection(Class> clz) throws Exception { RecordToDBConnection recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz); if(recordDBInfo == null) { RecordToDBMapping.addRecordToDB(clz, configuration); recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz); } return recordDBInfo.getConnection(); } @Override public void prepareConnection(AccountingPersistenceBackendQueryConfiguration configuration) throws Exception { this.configuration = configuration; Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); for(String typeName : aggregatedRecords.keySet()) { try { Class> clz = aggregatedRecords.get(typeName); RecordToDBMapping.addRecordToDB(clz, configuration); } catch (Exception e) { new RuntimeException(e); } } } protected void addProperty(ObjectNode objectNode, String key, Object value) { if(value instanceof Number) { if(value instanceof Integer) { objectNode.put(key, (int) value); return; } Long longValue = Long.valueOf(value.toString()); objectNode.put(key, longValue); return; } objectNode.put(key, (String) value.toString()); } protected SortedMap getTimeSeries(Set contexts) throws Exception { Connection connection = getConnection(clz); try { Statement statement = connection.createStatement(); SortedMap result = new TreeMap<>(); Query query = new Query(clz); query.setTemporalConstraint(temporalConstraint); query.setFilters(filters); query.setContexts(contexts); String sql = query.getTimeSeriesQuery(); Set requestedTableField = query.getRequestedTableField(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); logger.trace("Going to request the following query: {}", sql); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { String tableFieldName = recordToDBMapper.getTableField(Query.DATE_OF_TIMESERIES_AS_FIELD); OffsetDateTime offsetDateTime = resultSet.getObject(tableFieldName, OffsetDateTime.class); Calendar calendar = getCalendar(offsetDateTime); ObjectNode objectNode = objectMapper.createObjectNode(); for(String tableField : requestedTableField) { String usageRecordField = recordToDBMapper.getRecordField(tableField); Object obj = resultSet.getObject(tableField); addProperty(objectNode, usageRecordField, obj); } Info info = new Info(calendar, objectNode); result.put(calendar, info); } return result; }finally { connection.close(); } } @Override public SortedMap getTimeSeries() throws Exception { return getTimeSeries(contexts); } protected Calendar getCalendar(OffsetDateTime offsetDateTime) { Calendar calendar = Calendar.getInstance(); long epochMillis = offsetDateTime.toInstant().toEpochMilli(); calendar.setTimeInMillis(epochMillis); return calendar; } @Override public SortedMap> getContextTimeSeries() throws Exception { SortedMap> ret = new TreeMap<>(); for(String context : contexts) { Filter contextFilter = new Filter("scope", context); Set ctxs = new HashSet<>(); ctxs.add(context); SortedMap timeSeries = getTimeSeries(ctxs); if(!timeSeries.isEmpty()) { ret.put(contextFilter, timeSeries); } } return ret; } protected SortedSet getNumberedValues(String key, String orderingProperty, Integer limit) throws Exception { Connection connection = getConnection(clz); try { Statement statement = connection.createStatement(); if(orderingProperty == null) { orderingProperty = AccountingPersistenceQuery.getDefaultOrderingProperties(clz); } if(limit<1 || limit >50) { limit = MAX_TOP_LIMIT; } SortedSet result = new TreeSet<>(); Query query = new Query(clz); query.setTemporalConstraint(temporalConstraint); query.setFilters(filters); query.setContexts(contexts); query.setTableFieldToRequest(key); query.setOrderByField(orderingProperty); query.setLimit(limit); String sql = query.getNextPossibleValueQuery(); // List requestedTableField = query.getRequestedTableField(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); logger.trace("Going to request the following query: {}", sql); ResultSet resultSet = statement.executeQuery(sql); String tableFieldORderingProperty = recordToDBMapper.getTableField(orderingProperty); while (resultSet.next()) { String tableFieldKey = recordToDBMapper.getTableField(key); Object value = resultSet.getObject(tableFieldKey); Object numberObject = resultSet.getObject(tableFieldORderingProperty); NumberedFilter numberedFilter = new NumberedFilter(key, value.toString(), (Number) numberObject, orderingProperty); result.add(numberedFilter); } return result; }finally { connection.close(); } } @Override public SortedSet getFilterValues(String key) throws Exception { return getNumberedValues(key, null, null); } @Override public SortedSet getFilterValues(String key, Integer limit) throws Exception { return getNumberedValues(key, null, limit); } @Override public SortedMap> getTopValues(String topKey, String orderingProperty, Integer limit) throws Exception { SortedMap> ret = new TreeMap<>(); SortedSet top = getNumberedValues(topKey, orderingProperty, limit); for(NumberedFilter numberedFilter : top) { filters.add(numberedFilter); SortedMap map = getTimeSeries(); ret.put(numberedFilter, map); filters.remove(numberedFilter); } return ret; } @Override public Record getRecord(String recordId, String type) throws Exception { Class> aggregatedRecordClass = RecordUtility.getAggregatedRecordClass(type); Connection connection = getConnection(aggregatedRecordClass); try { Statement statement = connection.createStatement(); Query query = new Query(aggregatedRecordClass); query.setRecordId(recordId); String sql = query.getRecordQuery(); RecordToDBFields recordToDBMapper = query.getRecordToDBMapper(); ResultSet resultSet = statement.executeQuery(sql); resultSet.next(); AggregatedRecord instance = aggregatedRecordClass.newInstance(); Set requiredFields = instance.getRequiredFields(); for(String recordField : requiredFields) { String tableField = recordToDBMapper.getTableField(recordField); Serializable serializable; switch (recordField) { case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME: OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class); Calendar calendar = getCalendar(offsetDateTime); serializable = calendar.getTimeInMillis(); break; default: serializable = resultSet.getObject(tableField).toString(); break; } instance.setResourceProperty(recordField, serializable); } return instance; }finally { connection.close(); } } @Override public SortedMap> getSpaceTimeSeries(Set dataTypes) throws Exception { /* SortedMap> sortedMap = new TreeMap<>(); setRequestedRecords(AggregatedStorageStatusRecord.class); for(String dataType : dataTypes) { Filter filter = new Filter(StorageStatusRecord.DATA_TYPE, dataType); if(filters == null) { filters = new HashSet<>(); } filters.add(filter); SortedMap timeSeries = getTimeSeries(); sortedMap.put(filter, timeSeries); filters.remove(filter); } return sortedMap; */ return null; } @Override public List getUsageValueQuotaTotal(List listUsage) throws Exception { return null; } @Override public void close() throws Exception { // OK } @Override public boolean isConnectionActive() throws Exception { return true; } }