From f8c8605523883ac1f940c35f65c84b85068e9064 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 16 Mar 2021 15:33:34 +0100 Subject: [PATCH] Implementing queries --- .../AccountingPersistenceQueryPostgreSQL.java | 163 ++++--------- .../persistence/postgresql/Query.java | 227 ++++++++++++++++++ .../postgresql/RecordToDBMapper.java | 94 ++++++++ .../postgresql/UsageRecordToDBMapping.java | 25 ++ ...ountingPersistenceQueryPostgreSQLTest.java | 14 +- .../persistence/postgresql/QueryTest.java | 57 +++++ 6 files changed, 456 insertions(+), 124 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/analytics/persistence/postgresql/Query.java create mode 100644 src/main/java/org/gcube/accounting/analytics/persistence/postgresql/RecordToDBMapper.java create mode 100644 src/main/java/org/gcube/accounting/analytics/persistence/postgresql/UsageRecordToDBMapping.java create mode 100644 src/test/java/org/gcube/accounting/analytics/persistence/postgresql/QueryTest.java diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQL.java b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQL.java index 79697f9..f28770b 100644 --- a/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQL.java +++ b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQL.java @@ -3,15 +3,17 @@ */ package org.gcube.accounting.analytics.persistence.postgresql; -import java.io.Serializable; -import java.text.SimpleDateFormat; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.OffsetDateTime; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.Info; @@ -27,7 +29,6 @@ import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.documentstore.records.AggregatedRecord; -import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.json.JSONObject; import org.slf4j.Logger; @@ -49,13 +50,19 @@ public class AccountingPersistenceQueryPostgreSQL implements AccountingPersisten protected Map connectionMap; - protected static Map>, String> recordTypes; - static { // One Record per package is enough RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); - recordTypes = new HashMap<>(); + Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); + for(String typeName : aggregatedRecords.keySet()) { + try { + Class> clz = aggregatedRecords.get(typeName); + UsageRecordToDBMapping.getRecordToDB(clz); + } catch (Exception e) { + new RuntimeException(e); + } + } } @Override @@ -66,26 +73,7 @@ public class AccountingPersistenceQueryPostgreSQL implements AccountingPersisten this.connectionMap = new HashMap<>(); } - protected static String getRecordTypeByClass(Class> clz) { - try { - Record r = clz.newInstance(); - return r.getRecordType(); - }catch (Exception e) { - String type = clz.getSimpleName(); - type = type.replace("Abstract", ""); - type = type.replace("Aggregated", ""); - return type; - } - } - protected static String getRecordType(Class> clz) { - String type = recordTypes.get(clz); - if(type == null) { - type = getRecordTypeByClass(clz); - recordTypes.put(clz, type); - } - return type; - } @Override public SortedMap getTimeSeries(Class> clz, @@ -95,111 +83,42 @@ public class AccountingPersistenceQueryPostgreSQL implements AccountingPersisten return null; } - protected void appendString(StringBuffer query, String string) { - query.append("'"); - query.append(string); - query.append("'"); - } - protected void appendValue(StringBuffer values, Serializable serializable) { - if(serializable instanceof Number) { - values.append(serializable.toString()); - return; - } - - if(serializable instanceof Calendar) { - Calendar calendar = (Calendar) serializable; - SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN); - String date = simpleDateFormat.format(calendar.getTime()); - appendString(values, date); - return; - } - - if(serializable instanceof Enum) { - Enum e = (Enum) serializable; - appendString(values, e.name()); - return; - } - - // String, URI etc - appendString(values, serializable.toString()); - } - - protected void appendKey(StringBuffer query, String fieldName) { - int lenght = fieldName.length(); - boolean lastLowerCase = true; - for (int i=0; i> clz, - TemporalConstraint temporalConstraint, List filters) - throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { - Set aggregatedField = clz.newInstance().getAggregatedFields(); - - StringBuffer query = new StringBuffer(); - query.append("SELECT "); - - /* - * We want StartTime as first field for this reason has been - * excluded from cycle - */ - - appendKey(query, AggregatedRecord.START_TIME); - - for(String fieldName : aggregatedField) { - switch (fieldName) { - case AggregatedRecord.START_TIME: - case AggregatedRecord.END_TIME: - case AggregatedRecord.AGGREGATED: - continue; - - default: - query.append(", "); - appendKey(query, fieldName); - break; - } - - } - query.append(" FROM "); - query.append(getRecordType(clz)); - - query.append(" WHERE "); - appendKey(query, AggregatedRecord.START_TIME); - query.append(" > "); - appendValue(query, temporalConstraint.getAlignedStartTime()); - - query.append(" AND "); - appendKey(query, AggregatedRecord.START_TIME); - query.append(" < "); - appendValue(query, temporalConstraint.getAlignedEndTime()); - - - return query.toString(); + + protected Calendar getCalendar(OffsetDateTime offsetDateTime) { + Calendar calendar = Calendar.getInstance(); + long epochMillis = offsetDateTime.toInstant().toEpochMilli(); + calendar.setTimeInMillis(epochMillis); + return calendar; } @Override public SortedMap getNoContextTimeSeries(Class> clz, TemporalConstraint temporalConstraint, List filters) throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { + Connection connection = null; + Statement statement = connection.createStatement(); + + SortedMap result = new TreeMap<>(); + + Query query = new Query(clz); + query.setTemporalConstraint(temporalConstraint); + query.setFilters(filters); + String sql = query.getNoContextTimeSeriesQuery(); + + List requestedTableField = query.getRequestedTableField(); + ResultSet resultSet = statement.executeQuery(sql); + + while (resultSet.next()) { + + OffsetDateTime offsetDateTime = resultSet.getObject(Query.DATE_OF_TIMESERIES_AS_FIELD, OffsetDateTime.class); + Calendar calendar = getCalendar(offsetDateTime); + + + + } diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/Query.java b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/Query.java new file mode 100644 index 0000000..ce8c5b8 --- /dev/null +++ b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/Query.java @@ -0,0 +1,227 @@ +package org.gcube.accounting.analytics.persistence.postgresql; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Set; + +import org.gcube.accounting.analytics.Filter; +import org.gcube.accounting.analytics.TemporalConstraint; +import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; +import org.gcube.accounting.analytics.TemporalConstraint.CalendarEnum; +import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; +import org.gcube.documentstore.records.AggregatedRecord; + +public class Query { + + public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z"; + public static final String DATE_OF_TIMESERIES_AS_FIELD = "startTime"; + + private Class> clz; + private final RecordToDBMapper recordToDBMapper; + + private List requestedTableField; + private StringBuffer stringBuffer; + + protected TemporalConstraint temporalConstraint; + protected List filters; + protected Set contexts; + + public Query(Class> clz) throws Exception { + this.clz = clz; + this.recordToDBMapper = UsageRecordToDBMapping.getRecordToDB(clz); + } + + public List getRequestedTableField() { + return requestedTableField; + } + + public void setTemporalConstraint(TemporalConstraint temporalConstraint) { + this.temporalConstraint = temporalConstraint; + } + + public void setFilters(List filters) { + this.filters = filters; + } + public void setContexts(Set contexts) { + this.contexts = contexts; + } + + public RecordToDBMapper getRecordToDBMapper() { + return recordToDBMapper; + } + + private void appendString(String string) { + stringBuffer.append("'"); + stringBuffer.append(string); + stringBuffer.append("'"); + } + + protected String getTableField(String fieldName) { + return recordToDBMapper.getTableField(fieldName); + } + + protected void appendTableField(String fieldName) { + stringBuffer.append(getTableField(fieldName)); + } + + protected void appendValue(Serializable serializable) { + if(serializable instanceof Number) { + stringBuffer.append(serializable.toString()); + return; + } + + if(serializable instanceof Calendar) { + Calendar calendar = (Calendar) serializable; + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN); + String date = simpleDateFormat.format(calendar.getTime()); + appendString(date); + return; + } + + if(serializable instanceof Enum) { + Enum e = (Enum) serializable; + appendString(e.name()); + return; + } + + // String, URI etc + appendString(serializable.toString()); + } + + protected void addFilters() { + if(filters!=null && filters.size()>0) { + // The first filter if the time_bucket + stringBuffer.append(" AND "); + boolean first = true; + for(Filter filter : filters) { + if(first) { + first = false; + }else { + stringBuffer.append(" AND "); + } + appendTableField(filter.getKey()); + stringBuffer.append("="); + appendValue(filter.getValue()); + } + } + } + + protected void addEmittedFields() throws Exception { + Set aggregatedField = clz.newInstance().getAggregatedFields(); + for(String fieldName : aggregatedField) { + String dbField = getTableField(fieldName); + switch (fieldName) { + case AggregatedRecord.START_TIME: + case AggregatedRecord.END_TIME: + case AggregatedRecord.AGGREGATED: + continue; + + + case AggregatedRecord.OPERATION_COUNT: + stringBuffer.append(", SUM("); + stringBuffer.append(dbField); + stringBuffer.append(") AS "); + break; + + // WEIGHTED AVERAGE + case AggregatedServiceUsageRecord.DURATION: + stringBuffer.append(", SUM("); + stringBuffer.append(dbField); + stringBuffer.append("*"); + appendTableField(AggregatedRecord.OPERATION_COUNT); + stringBuffer.append(")/SUM("); + appendTableField(AggregatedRecord.OPERATION_COUNT); + stringBuffer.append(") AS "); + break; + + case AggregatedServiceUsageRecord.MAX_INVOCATION_TIME: + stringBuffer.append(", MAX("); + stringBuffer.append(dbField); + stringBuffer.append(") AS "); + break; + + case AggregatedServiceUsageRecord.MIN_INVOCATION_TIME: + stringBuffer.append(", MIN("); + stringBuffer.append(dbField); + stringBuffer.append(") AS "); + break; + + default: + stringBuffer.append(", "); + break; + } + + stringBuffer.append(dbField); + requestedTableField.add(dbField); + } + } + + protected String getTimeBucketCalendarInterval(AggregationMode aggregationMode) { + CalendarEnum calendarEnum = CalendarEnum.values()[aggregationMode.ordinal()]; + return calendarEnum.name().toLowerCase(); + } + + protected void addTimeBucket() { + stringBuffer.append("time_bucket('1 "); + String calendarInterval = getTimeBucketCalendarInterval(temporalConstraint.getAggregationMode()); + stringBuffer.append(calendarInterval); + stringBuffer.append("',"); + appendTableField(AggregatedRecord.START_TIME); + stringBuffer.append(") AS "); + stringBuffer.append(DATE_OF_TIMESERIES_AS_FIELD); + requestedTableField.add(DATE_OF_TIMESERIES_AS_FIELD); + } + + private void newQuery() { + stringBuffer = new StringBuffer(); + requestedTableField = new ArrayList<>(); + stringBuffer.append("SELECT "); + } + + protected void addTemporalConstraintToQuery() { + stringBuffer.append(" WHERE "); + String tableField = getTableField(AggregatedRecord.START_TIME); + stringBuffer.append(tableField); + stringBuffer.append(" > "); + appendValue(temporalConstraint.getAlignedStartTime()); + + stringBuffer.append(" AND "); + stringBuffer.append(tableField); + stringBuffer.append(" < "); + appendValue(temporalConstraint.getAlignedEndTime()); + } + + protected void addGropuBy() { + stringBuffer.append(" GROUP BY "); + stringBuffer.append(DATE_OF_TIMESERIES_AS_FIELD); + } + + protected void addOrderBy() { + stringBuffer.append(" ORDER BY "); + stringBuffer.append(DATE_OF_TIMESERIES_AS_FIELD); + stringBuffer.append(" ASC"); + } + + public String getNoContextTimeSeriesQuery() throws Exception { + newQuery(); + + addTimeBucket(); + + addEmittedFields(); + + stringBuffer.append(" FROM "); + stringBuffer.append(recordToDBMapper.getTableName()); + + addTemporalConstraintToQuery(); + + addFilters(); + + addGropuBy(); + addOrderBy(); + + return stringBuffer.toString(); + } +} diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/RecordToDBMapper.java b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/RecordToDBMapper.java new file mode 100644 index 0000000..148da90 --- /dev/null +++ b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/RecordToDBMapper.java @@ -0,0 +1,94 @@ +package org.gcube.accounting.analytics.persistence.postgresql; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.gcube.documentstore.records.AggregatedRecord; +import org.gcube.documentstore.records.Record; + +public class RecordToDBMapper { + + protected static String getRecordTypeByClass(Class> clz) { + try { + Record r = clz.newInstance(); + return r.getRecordType(); + }catch (Exception e) { + String type = clz.getSimpleName(); + type = type.replace("Abstract", ""); + type = type.replace("Aggregated", ""); + return type; + } + } + + protected static String getKey(String fieldName) { + StringBuffer stringBuffer = new StringBuffer(); + int lenght = fieldName.length(); + boolean lastLowerCase = true; + for (int i=0; i> clz; + + protected final String typeName; + protected final String tableName; + + protected final Map tableFieldToUsageRecordField; + protected final Map usageRecordFieldToTableField; + + public RecordToDBMapper(Class> clz) throws Exception { + this.clz = clz; + this.typeName = getRecordTypeByClass(clz); + this.tableName = typeName.toLowerCase(); + this.tableFieldToUsageRecordField = new HashMap<>(); + this.usageRecordFieldToTableField = new HashMap<>(); + mapFields(); + } + + public RecordToDBMapper(String typeName, Class> clz) throws Exception { + this.clz = clz; + this.typeName = typeName; + this.tableName = typeName.toLowerCase(); + this.tableFieldToUsageRecordField = new HashMap<>(); + this.usageRecordFieldToTableField = new HashMap<>(); + mapFields(); + } + + protected void mapFields() throws Exception { + Set requiredFields = clz.newInstance().getRequiredFields(); + for(String usageRecordField : requiredFields) { + String dbField = getKey(usageRecordField); + tableFieldToUsageRecordField.put(dbField, usageRecordField); + usageRecordFieldToTableField.put(usageRecordField, dbField); + } + } + + public String getTableField(String usageRecordField) { + return usageRecordFieldToTableField.get(usageRecordField); + } + + public String getUsageRecordField(String tableField) { + return tableFieldToUsageRecordField.get(tableField); + } + + public String getTypeName() { + return typeName; + } + + public String getTableName() { + return tableName; + } + +} diff --git a/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/UsageRecordToDBMapping.java b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/UsageRecordToDBMapping.java new file mode 100644 index 0000000..bfa3b28 --- /dev/null +++ b/src/main/java/org/gcube/accounting/analytics/persistence/postgresql/UsageRecordToDBMapping.java @@ -0,0 +1,25 @@ +package org.gcube.accounting.analytics.persistence.postgresql; + +import java.util.HashMap; +import java.util.Map; + +import org.gcube.documentstore.records.AggregatedRecord; + +public class UsageRecordToDBMapping { + + protected final static Map>, RecordToDBMapper> classToRecordToDBMapper; + + static { + classToRecordToDBMapper = new HashMap<>(); + } + + public static synchronized RecordToDBMapper getRecordToDB(Class> clz) throws Exception { + RecordToDBMapper recordToDBMapper = classToRecordToDBMapper.get(clz); + if(recordToDBMapper==null) { + recordToDBMapper = new RecordToDBMapper(clz); + classToRecordToDBMapper.put(clz, recordToDBMapper); + } + return recordToDBMapper; + } + +} diff --git a/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQLTest.java b/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQLTest.java index 90d3c66..3c167d9 100644 --- a/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQLTest.java +++ b/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/AccountingPersistenceQueryPostgreSQLTest.java @@ -3,13 +3,17 @@ */ package org.gcube.accounting.analytics.persistence.postgresql; +import java.util.ArrayList; import java.util.Calendar; +import java.util.List; +import org.gcube.accounting.analytics.Filter; import org.gcube.accounting.analytics.TemporalConstraint; import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; import org.gcube.accounting.analytics.exception.KeyException; import org.gcube.accounting.analytics.exception.ValueException; +import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.junit.Test; import org.slf4j.Logger; @@ -37,9 +41,15 @@ public class AccountingPersistenceQueryPostgreSQLTest extends ContextTest { entTimeCalendar.set(Calendar.HOUR_OF_DAY, 16); entTimeCalendar.set(Calendar.MINUTE, 17); + List filters = new ArrayList<>(); + Filter filter = new Filter(UsageRecord.CONSUMER_ID, "name.surname"); + filters.add(filter); + TemporalConstraint temporalConstraint = new TemporalConstraint(startTimeCalendar.getTimeInMillis(), entTimeCalendar.getTimeInMillis(), AggregationMode.MINUTELY); - AccountingPersistenceQueryPostgreSQL accountingPersistenceQueryPostgreSQL = new AccountingPersistenceQueryPostgreSQL(); - String ret = accountingPersistenceQueryPostgreSQL.getNoContextTimeSeriesQuery(AggregatedServiceUsageRecord.class, temporalConstraint, null); + Query query = new Query(AggregatedServiceUsageRecord.class); + query.setTemporalConstraint(temporalConstraint); + query.setFilters(filters); + String ret = query.getNoContextTimeSeriesQuery(); logger.debug(ret); } diff --git a/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/QueryTest.java b/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/QueryTest.java new file mode 100644 index 0000000..56909f2 --- /dev/null +++ b/src/test/java/org/gcube/accounting/analytics/persistence/postgresql/QueryTest.java @@ -0,0 +1,57 @@ +/** + * + */ +package org.gcube.accounting.analytics.persistence.postgresql; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +import org.gcube.accounting.analytics.Filter; +import org.gcube.accounting.analytics.TemporalConstraint; +import org.gcube.accounting.analytics.TemporalConstraint.AggregationMode; +import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; +import org.gcube.accounting.analytics.exception.KeyException; +import org.gcube.accounting.analytics.exception.ValueException; +import org.gcube.accounting.datamodel.UsageRecord; +import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) + */ +public class QueryTest extends ContextTest { + + private static final Logger logger = LoggerFactory.getLogger(QueryTest.class); + + @Test + public void testGetNoContextTimeSeriesQuery() throws DuplicatedKeyFilterException, KeyException, ValueException, Exception { + logger.debug("test"); + Calendar startTimeCalendar = Calendar.getInstance(); + startTimeCalendar.set(Calendar.MONTH, Calendar.MARCH); + startTimeCalendar.set(Calendar.DAY_OF_MONTH, 15); + startTimeCalendar.set(Calendar.HOUR_OF_DAY, 16); + startTimeCalendar.set(Calendar.MINUTE, 15); + + Calendar entTimeCalendar = Calendar.getInstance(); + entTimeCalendar.set(Calendar.MONTH, Calendar.MARCH); + entTimeCalendar.set(Calendar.DAY_OF_MONTH, 15); + entTimeCalendar.set(Calendar.HOUR_OF_DAY, 16); + entTimeCalendar.set(Calendar.MINUTE, 17); + + List filters = new ArrayList<>(); + Filter filter = new Filter(UsageRecord.CONSUMER_ID, "name.surname"); + filters.add(filter); + + TemporalConstraint temporalConstraint = new TemporalConstraint(startTimeCalendar.getTimeInMillis(), entTimeCalendar.getTimeInMillis(), AggregationMode.MINUTELY); + Query query = new Query(AggregatedServiceUsageRecord.class); + query.setTemporalConstraint(temporalConstraint); + query.setFilters(filters); + String ret = query.getNoContextTimeSeriesQuery(); + logger.debug(ret); + + } + +}