package org.gcube.accounting.aggregator.persistence; import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Map; import java.util.TimeZone; import java.util.UUID; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStateEvent; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.postgresql.core.Utils; /** * @author Luca Frosini (ISTI-CNR) */ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { private static final String UTC_TIME_ZONE = "UTC"; public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE); protected Connection connection; private static PostgreSQLConnector postgreSQLConnector; public static PostgreSQLConnector getPostgreSQLConnector() throws Exception { if(postgreSQLConnector == null) { postgreSQLConnector = new PostgreSQLConnector(); } return postgreSQLConnector; } private PostgreSQLConnector() throws Exception { this.configuration = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryPostgreSQL.class); Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); for(String typeName : aggregatedRecords.keySet()) { try { Class> clz = aggregatedRecords.get(typeName); RecordToDBMapping.addRecordToDB(clz, configuration); } catch (Exception e) { new RuntimeException(e); } } } protected Connection getConnection() throws Exception { if(connection==null) { Class.forName("org.postgresql.Driver"); String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY); String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY); String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY); connection = DriverManager.getConnection(url, username, password); logger.trace("Database {} opened successfully", url); connection.setAutoCommit(false); } return connection; } protected String getQuotedString(String string) throws SQLException { StringBuilder builder = new StringBuilder(); builder.append("'"); Utils.escapeLiteral(builder, string, false); builder.append("'"); return builder.toString(); } protected String getValue(Serializable serializable) throws SQLException { if(serializable == null) { return "null"; } if(serializable instanceof Number) { return serializable.toString(); } if(serializable instanceof Calendar) { Calendar calendar = (Calendar) serializable; SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN); simpleDateFormat.setTimeZone(DEFAULT_TIME_ZONE); Date date = calendar.getTime(); String dateString = simpleDateFormat.format(date); return getQuotedString(dateString); } if(serializable instanceof Date) { Date date = (Date) serializable; SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN); simpleDateFormat.setTimeZone(DEFAULT_TIME_ZONE); String dateString = simpleDateFormat.format(date); return getQuotedString(dateString); } if(serializable instanceof Enum) { Enum e = (Enum) serializable; return getQuotedString(e.name()); } // String, URI etc return getQuotedString(serializable.toString()); } public String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("INSERT INTO "); stringBuffer.append("aggregation_status"); stringBuffer.append(" (id, "); stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, "); stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, "); stringBuffer.append("context, current_aggregation_state, last_update_time, previous)"); stringBuffer.append(" VALUES ("); stringBuffer.append(getValue(aggregationStatus.getUUID().toString())); AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationInfo.getRecordType())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationInfo.getAggregationType())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getOriginalRecordsNumber())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getAggregatedRecordsNumber())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getRecoveredRecordNumber())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getPercentage())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getContext())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getAggregationState())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getLastUpdateTime())); stringBuffer.append(", "); AggregationStatus previous = aggregationStatus.getPrevious(); if(previous!=null) { stringBuffer.append(getValue(previous.getUUID().toString())); }else { stringBuffer.append(getValue(null)); } if(upsert) { stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET "); stringBuffer.append("original_records_number=EXCLUDED.original_records_number, aggregated_records_number=EXCLUDED.aggregated_records_number, recovered_records_number=EXCLUDED.recovered_records_number, malformed_records_number=EXCLUDED.malformed_records_number, percentage=EXCLUDED.percentage, "); stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;"); }else { stringBuffer.append(");"); } return stringBuffer.toString(); } public String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException { StringBuffer stringBuffer = new StringBuffer(); for(AggregationStateEvent aggregationStateEvent : aggregationStatus.getAggregationStateEvents()) { stringBuffer.append("INSERT INTO "); stringBuffer.append("aggregation_status_event"); stringBuffer.append(" ( aggregation_state, start_time, end_time, aggregation_status)"); stringBuffer.append(" VALUES ("); stringBuffer.append(getValue(aggregationStateEvent.getAggregationState())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStateEvent.getStartTime())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStateEvent.getEndTime())); stringBuffer.append(", "); stringBuffer.append(getValue(aggregationStatus.getUUID().toString())); stringBuffer.append(") ON CONFLICT DO NOTHING;"); } return stringBuffer.toString(); } public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception { Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true); statement.executeUpdate(sqlCommand); sqlCommand = getInsertAggregationStateQuery(aggregationStatus); statement.executeUpdate(sqlCommand); statement.close(); connection.commit(); } // private Calendar getCalendar(ResultSet resultSet, String columnLabel) throws SQLException { // Date date = resultSet.getDate(columnLabel); // Calendar calendar = Calendar.getInstance(); // calendar.setTime(date); // return calendar; // } private Date getDateFromResultSet(ResultSet resultSet, String columnLabel) throws Exception { String dateString = resultSet.getString(columnLabel); DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssX"); Date date = dateFormat.parse(dateString); return date; } public AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { String recordType = resultSet.getString("record_type"); String aggregationTypeString = resultSet.getString("aggregation_type"); AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString); Date aggregationStartDate = getDateFromResultSet(resultSet,"aggregation_start_date"); Date aggregationEndDate = getDateFromResultSet(resultSet,"aggregation_end_date"); AggregationInfo aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate); AggregationStatus aggregationStatus = new AggregationStatus(aggregationInfo); UUID uuid = UUID.fromString(resultSet.getString("id")); aggregationStatus.setUUID(uuid); int originalRecordsNumber = resultSet.getInt("original_records_number"); int aggregatedRecordsNumber = resultSet.getInt("aggregated_records_number"); int malformedRecordNumber = resultSet.getInt("malformed_records_number"); aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber); String context = resultSet.getString("context"); aggregationStatus.setContext(context); String current_aggregation_state = resultSet.getString("current_aggregation_state"); AggregationState aggregationState = AggregationState.valueOf(current_aggregation_state); aggregationStatus.setAggregationState(aggregationState); Date last_update_time = resultSet.getDate("last_update_time"); Calendar lastUpdateTime = Calendar.getInstance(); lastUpdateTime.setTime(last_update_time); aggregationStatus.setLastUpdateTime(lastUpdateTime); return aggregationStatus; } public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ /* * SELECT * FROM aggregation_status * * WHERE * record_type = 'ServiceUsageRecord' AND * aggregation_type = 'DAILY' AND * aggregation_start_date >= '2017-05-01 00:00:00.000 +0000' AND * aggregation_start_date <= '2017-05-31 00:00:00.000 +0000' * * ORDER BY last_update_time DESC LIMIT 1 * */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * "); stringBuffer.append("FROM aggregation_status"); stringBuffer.append(" WHERE "); stringBuffer.append("record_type = "); stringBuffer.append(getValue(recordType)); stringBuffer.append(" AND "); stringBuffer.append("aggregation_type = "); stringBuffer.append(getValue(aggregationType.name())); if(aggregationStartDate!=null && aggregationEndDate!=null) { stringBuffer.append(" AND "); stringBuffer.append("aggregation_start_date >= "); stringBuffer.append(getValue(aggregationStartDate)); stringBuffer.append(" AND "); stringBuffer.append("aggregation_start_date <= "); stringBuffer.append(getValue(aggregationEndDate)); } stringBuffer.append(" ORDER BY "); stringBuffer.append("aggregation_start_date DESC LIMIT 1"); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlQuery = stringBuffer.toString(); logger.trace("Going to request the following query: {}", sqlQuery); ResultSet resultSet = statement.executeQuery(sqlQuery); AggregationStatus aggregationStatus = null; while(resultSet.next()) { aggregationStatus = getAggregationStatusFromResultSet(resultSet); break; } return aggregationStatus; } // public List getUnterminated(Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{ // return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart); // } public List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{ /* * SELECT * * FROM aggregation_status * WHERE * aggregation_state != "COMPLETED" AND * last_update_time < "2017-07-31 09:31:10.984 +0000" AND * record_type = "ServiceUsageRecord" AND * aggregation_type` = "DAILY" AND * aggregation_start_date >= "2017-05-01 00:00:00.000 +0000" * aggregation_start_date <= "2017-05-31 00:00:00.000 +0000" * * ORDER BY aggregation_start_date ASC */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * "); stringBuffer.append("FROM aggregation_status"); stringBuffer.append(" WHERE "); stringBuffer.append("current_aggregation_state != "); stringBuffer.append(getValue(AggregationState.COMPLETED)); if(!forceRestart) { Calendar now = Utility.getUTCCalendarInstance(); now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED); stringBuffer.append(" AND "); stringBuffer.append("last_update_time < "); stringBuffer.append(getValue(now)); } if(recordType!=null) { stringBuffer.append(" AND "); stringBuffer.append("record_type = "); stringBuffer.append(getValue(recordType)); } if(aggregationType!=null) { stringBuffer.append(" AND "); stringBuffer.append("aggregation_type = "); stringBuffer.append(getValue(aggregationType)); } if(aggregationStartDate!=null && aggregationEndDate!=null) { stringBuffer.append(" AND "); stringBuffer.append("aggregation_start_date >= "); stringBuffer.append(getValue(aggregationStartDate)); stringBuffer.append(" AND "); stringBuffer.append("aggregation_end_date <= "); stringBuffer.append(getValue(aggregationEndDate)); } stringBuffer.append(" ORDER BY "); stringBuffer.append("aggregation_start_date ASC"); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlQuery = stringBuffer.toString(); logger.trace("Going to request the following query: {}", sqlQuery); ResultSet resultSet = statement.executeQuery(sqlQuery); List aggregationStatuses = new ArrayList<>(); while(resultSet.next()) { AggregationStatus aggregationStatus = getAggregationStatusFromResultSet(resultSet); aggregationStatuses.add(aggregationStatus); } return aggregationStatuses; } public List getAll() throws Exception{ /* * SELECT * * FROM aggregation_status * ORDER BY aggregation_start_date ASC */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * "); stringBuffer.append("FROM aggregation_status"); stringBuffer.append("ORDER BY aggregation_start_date ASC"); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlQuery = stringBuffer.toString(); logger.trace("Going to request the following query: {}", sqlQuery); ResultSet resultSet = statement.executeQuery(sqlQuery); List aggregationStatuses = new ArrayList<>(); while(resultSet.next()) { AggregationStatus aggregationStatus = getAggregationStatusFromResultSet(resultSet); aggregationStatuses.add(aggregationStatus); } return aggregationStatuses; } public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ /* * SELECT * * FROM aggregation_status * WHERE * record_type = "ServiceUsageRecord" AND * aggregation_type = "DAILY" AND * aggregation_start_date` = "2017-06-24 00:00:00.000 +0000" * ORDER BY aggregation_start_date DESC LIMIT 1 */ StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * "); stringBuffer.append("FROM aggregation_status"); stringBuffer.append(" WHERE "); stringBuffer.append("record_type = "); stringBuffer.append(getValue(recordType)); stringBuffer.append(" AND "); stringBuffer.append("aggregation_type = "); stringBuffer.append(getValue(aggregationType.name())); stringBuffer.append(" AND "); stringBuffer.append("aggregation_start_date = "); stringBuffer.append(getValue(aggregationStartDate)); stringBuffer.append(" ORDER BY "); stringBuffer.append("last_update_time DESC LIMIT 1"); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlQuery = stringBuffer.toString(); logger.trace("Going to request the following query: {}", sqlQuery); ResultSet resultSet = statement.executeQuery(sqlQuery); AggregationStatus aggregationStatus = null; while(resultSet.next()) { aggregationStatus = getAggregationStatusFromResultSet(resultSet); break; } return aggregationStatus; } public void deleteRecord(JsonNode jsonNode) throws Exception { Record record = DSMapper.unmarshal(Record.class, jsonNode.toString()); Class clz = record.getClass(); String type = RecordToDBMapping.getRecordTypeByClass(clz); String tableName = RecordToDBFields.getKey(type); String id = jsonNode.get(Record.ID).asText(); StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("DELETE "); stringBuffer.append("FROM "); stringBuffer.append(tableName); stringBuffer.append(" WHERE "); stringBuffer.append("id = "); stringBuffer.append(getValue(id)); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlCommand = stringBuffer.toString(); logger.trace("Going to execute {}", sqlCommand); statement.execute(sqlCommand); statement.close(); connection.commit(); } public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception { AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); String tableName = RecordToDBFields.getKey(aggregationInfo.getRecordType()); String startTimeColumnName = RecordToDBFields.getKey(AggregatedUsageRecord.START_TIME); StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT *"); stringBuffer.append(" FROM "); stringBuffer.append(tableName); stringBuffer.append(" WHERE "); stringBuffer.append(startTimeColumnName); stringBuffer.append(" >= "); stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate())); stringBuffer.append(" AND "); stringBuffer.append(startTimeColumnName); stringBuffer.append(" < "); stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate())); Connection connection = getConnection(); Statement statement = connection.createStatement(); String sqlQuery = stringBuffer.toString(); logger.trace("Going to request the following query: {}", sqlQuery); ResultSet resultSet = statement.executeQuery(sqlQuery); return resultSet; } }