diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index 769950b..af7fd6f 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -59,9 +59,13 @@ public class Aggregator { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); + + // TODO query ResultSet resultSet = null; + + retrieveAndAggregate(resultSet); } } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java index 84117f2..f15117c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java @@ -28,7 +28,7 @@ public class RecoveryManager { } public void recovery() throws Exception { - PostgreSQLConnector postgreSQLConnector = new PostgreSQLConnector(); + PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); // TODO List aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index f1e9af4..4b4878b 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -3,6 +3,7 @@ package org.gcube.accounting.aggregator.persistence; import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.SimpleDateFormat; @@ -11,11 +12,15 @@ import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.TimeZone; +import java.util.UUID; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; +import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStateEvent; import org.gcube.accounting.aggregator.status.AggregationStatus; +import org.gcube.accounting.aggregator.utility.Constant; +import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; @@ -29,19 +34,32 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { private static final String UTC_TIME_ZONE = "UTC"; public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE); - public PostgreSQLConnector() throws Exception { + protected Connection connection; + + private static PostgreSQLConnector postgreSQLConnector; + + public static PostgreSQLConnector getPostgreSQLConnector() throws Exception { + if(postgreSQLConnector == null) { + postgreSQLConnector = new PostgreSQLConnector(); + } + return postgreSQLConnector; + } + + private PostgreSQLConnector() throws Exception { this.configuration = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryPostgreSQL.class); } protected Connection getConnection() throws Exception { - Class.forName("org.postgresql.Driver"); - String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY); - String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY); - String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY); - - Connection connection = DriverManager.getConnection(url, username, password); - logger.trace("Database {} opened successfully", url); - connection.setAutoCommit(false); + if(connection==null) { + Class.forName("org.postgresql.Driver"); + String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY); + String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY); + String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY); + + connection = DriverManager.getConnection(url, username, password); + logger.trace("Database {} opened successfully", url); + connection.setAutoCommit(false); + } return connection; } @@ -168,31 +186,113 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception { - Connection connection = getConnection(); - Statement statement = connection.createStatement(); + Statement statement = getConnection().createStatement(); String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true); statement.executeUpdate(sqlCommand); sqlCommand = getInsertAggregationStateQuery(aggregationStatus); statement.executeUpdate(sqlCommand); statement.close(); connection.commit(); - connection.close(); + } + +// private Calendar getCalendar(ResultSet resultSet, String columnLabel) throws SQLException { +// Date date = resultSet.getDate(columnLabel); +// Calendar calendar = Calendar.getInstance(); +// calendar.setTime(date); +// return calendar; +// } + + private AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { + String recordType = resultSet.getString("record_type"); + String aggregationTypeString = resultSet.getString("aggregation_type"); + AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString); + + + Date aggregationStartDate = resultSet.getDate("aggregation_start_date"); + Date aggregationEndDate = resultSet.getDate("aggregation_end_date"); + AggregationInfo aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate); + + AggregationStatus aggregationStatus = new AggregationStatus(aggregationInfo); + + UUID uuid = UUID.fromString(resultSet.getString("id")); + aggregationStatus.setUUID(uuid); + + int originalRecordsNumber = resultSet.getInt("original_records_number"); + int aggregatedRecordsNumber = resultSet.getInt("aggregated_records_number"); + int malformedRecordNumber = resultSet.getInt("malformed_records_number"); + aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber); + + String context = resultSet.getString("context"); + aggregationStatus.setContext(context); + + String current_aggregation_state = resultSet.getString("current_aggregation_state"); + AggregationState aggregationState = AggregationState.valueOf(current_aggregation_state); + aggregationStatus.setAggregationState(aggregationState); + + Date last_update_time = resultSet.getDate("last_update_time"); + Calendar lastUpdateTime = Calendar.getInstance(); + lastUpdateTime.setTime(last_update_time); + aggregationStatus.setLastUpdateTime(lastUpdateTime); + + return aggregationStatus; } public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ /* - * SELECT * - * FROM aggregation_status + * SELECT * FROM aggregation_status + * * WHERE - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000" - * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000" - * ORDER BY `aggregationInfo`.`aggregationStartDate` DESC LIMIT 1 + * record_type = 'ServiceUsageRecord' AND + * aggregation_type = 'DAILY' AND + * aggregation_start_date >= '2017-05-01 00:00:00.000 +0000' AND + * aggregation_start_date <= '2017-05-31 00:00:00.000 +0000' + * + * ORDER BY last_update_time DESC LIMIT 1 + * */ - return null; + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT * "); + stringBuffer.append("FROM aggregation_status"); + + stringBuffer.append(" WHERE "); + stringBuffer.append("record_type = "); + stringBuffer.append(getValue(recordType)); + + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_type = "); + stringBuffer.append(getValue(aggregationType.name())); + + if(aggregationStartDate!=null && aggregationEndDate!=null) { + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_start_date >= "); + stringBuffer.append(getValue(aggregationStartDate)); + + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_start_date <= "); + stringBuffer.append(getValue(aggregationEndDate)); + } + + stringBuffer.append(" ORDER BY "); + stringBuffer.append("last_update_time DESC LIMIT 1"); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlCommand = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlCommand); + ResultSet resultSet = statement.executeQuery(sqlCommand); + + AggregationStatus aggregationStatus = null; + + while(resultSet.next()) { + aggregationStatus = getAggregationStatusFromResultSet(resultSet); + break; + } + + return aggregationStatus; } @@ -204,20 +304,65 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { /* * SELECT * - * FROM AccountingManager + * FROM aggregation_status * WHERE - * `aggregationState` != "COMPLETED" AND - * `lastUpdateTime` < "2017-07-31 09:31:10.984 +0000" AND - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000" - * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000" + * aggregation_state != "COMPLETED" AND + * last_update_time < "2017-07-31 09:31:10.984 +0000" AND + * record_type = "ServiceUsageRecord" AND + * aggregation_type` = "DAILY" AND + * aggregation_start_date >= "2017-05-01 00:00:00.000 +0000" + * aggregation_start_date <= "2017-05-31 00:00:00.000 +0000" * - * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC + * ORDER BY aggregation_start_date ASC */ + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT * "); + stringBuffer.append("FROM aggregation_status"); + + stringBuffer.append(" WHERE "); + stringBuffer.append("current_aggregation_state != "); + stringBuffer.append(getValue(AggregationState.COMPLETED)); + + Calendar now = Utility.getUTCCalendarInstance(); + now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED); + stringBuffer.append(" AND "); + stringBuffer.append("last_update_time < "); + stringBuffer.append(getValue(now)); + + stringBuffer.append(" AND "); + stringBuffer.append("record_type = "); + stringBuffer.append(getValue(recordType)); + + if(aggregationStartDate!=null && aggregationEndDate!=null) { + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_start_date >= "); + stringBuffer.append(getValue(aggregationStartDate)); + + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_start_date <= "); + stringBuffer.append(getValue(aggregationEndDate)); + } + + stringBuffer.append(" ORDER BY "); + stringBuffer.append("aggregation_start_date ASC"); + + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlCommand = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlCommand); + ResultSet resultSet = statement.executeQuery(sqlCommand); + List aggregationStatuses = new ArrayList<>(); + while(resultSet.next()) { + AggregationStatus aggregationStatus = getAggregationStatusFromResultSet(resultSet); + aggregationStatuses.add(aggregationStatus); + } + return aggregationStatuses; } @@ -226,15 +371,33 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { /* * SELECT * - * FROM AccountingManager - * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC + * FROM aggregation_status + * ORDER BY aggregation_start_date ASC */ + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT * "); + stringBuffer.append("FROM aggregation_status"); + stringBuffer.append("ORDER BY aggregation_start_date ASC"); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlCommand = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlCommand); + ResultSet resultSet = statement.executeQuery(sqlCommand); + List aggregationStatuses = new ArrayList<>(); + while(resultSet.next()) { + AggregationStatus aggregationStatus = getAggregationStatusFromResultSet(resultSet); + aggregationStatuses.add(aggregationStatus); + } return aggregationStatuses; + } @@ -242,14 +405,50 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { /* * SELECT * - * FROM AccountingManager + * FROM aggregation_status * WHERE - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000" + * record_type = "ServiceUsageRecord" AND + * aggregation_type = "DAILY" AND + * aggregation_start_date` = "2017-06-24 00:00:00.000 +0000" + * ORDER BY aggregation_start_date DESC LIMIT 1 */ - return null; + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT * "); + stringBuffer.append("FROM aggregation_status"); + + stringBuffer.append(" WHERE "); + stringBuffer.append("record_type = "); + stringBuffer.append(getValue(recordType)); + + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_type = "); + stringBuffer.append(getValue(aggregationType.name())); + + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_start_date = "); + stringBuffer.append(getValue(aggregationStartDate)); + + stringBuffer.append(" ORDER BY "); + stringBuffer.append("last_update_time DESC LIMIT 1"); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlCommand = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlCommand); + ResultSet resultSet = statement.executeQuery(sqlCommand); + + + AggregationStatus aggregationStatus = null; + while(resultSet.next()) { + aggregationStatus = getAggregationStatusFromResultSet(resultSet); + break; + } + + return aggregationStatus; } } diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index 6a7c731..98cf000 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -50,7 +50,7 @@ public class AggregationStatus { protected AggregationStatus previous; // Last observed status - @JsonFormat(shape= JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN) + @JsonFormat(shape= JsonFormat.Shape.STRING) @JsonProperty protected AggregationState aggregationState; @@ -62,26 +62,19 @@ public class AggregationStatus { @JsonProperty protected List aggregationStateEvents; - private static PostgreSQLConnector postgreSQLConnector; - - static { - // TODO - postgreSQLConnector = null; - } - // Needed for Jackon Unmarshalling protected AggregationStatus(){} public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - return postgreSQLConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); + return PostgreSQLConnector.getPostgreSQLConnector().getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); } public static List getUnterminated(String recordType, AggregationType aggregationType) throws Exception{ - return postgreSQLConnector.getUnterminated(recordType, aggregationType, null, null); + return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null); } public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ - return postgreSQLConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate); + return PostgreSQLConnector.getPostgreSQLConnector().getAggregationStatus(recordType, aggregationType, aggregationStartDate); } public AggregationStatus(AggregationInfo aggregationInfo) throws Exception { @@ -103,7 +96,7 @@ public class AggregationStatus { public AggregationInfo getAggregationInfo() { return aggregationInfo; } - + public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync) throws Exception { Calendar endTime = Utility.getUTCCalendarInstance(); @@ -121,7 +114,7 @@ public class AggregationStatus { aggregationStateEvents.add(aggregationStatusEvent); if(sync){ - postgreSQLConnector.upsertAggregationStatus(this); + PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this); } } @@ -139,6 +132,11 @@ public class AggregationStatus { return uuid; } + + public void setUUID(UUID uuid) { + this.uuid = uuid; + } + public void setAggregation(AggregationInfo aggregation) { this.aggregationInfo = aggregation; } @@ -162,6 +160,10 @@ public class AggregationStatus { public AggregationState getAggregationState() { return aggregationState; } + + public void setAggregationState(AggregationState aggregationState) { + this.aggregationState = aggregationState; + } public List getAggregationStateEvents() { return aggregationStateEvents; @@ -190,11 +192,15 @@ public class AggregationStatus { public Calendar getLastUpdateTime() { return lastUpdateTime; } + + public void setLastUpdateTime(Calendar lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } public void updateLastUpdateTime(boolean sync) throws Exception { this.lastUpdateTime = Utility.getUTCCalendarInstance(); if(sync){ - postgreSQLConnector.upsertAggregationStatus(this); + PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this); } } diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/ContextTest.java b/src/test/java/org/gcube/accounting/aggregator/ContextTest.java similarity index 98% rename from src/test/java/org/gcube/accounting/aggregator/plugin/ContextTest.java rename to src/test/java/org/gcube/accounting/aggregator/ContextTest.java index 41c801a..f8d73ba 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/ContextTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/ContextTest.java @@ -1,7 +1,7 @@ /** * */ -package org.gcube.accounting.aggregator.plugin; +package org.gcube.accounting.aggregator; import java.io.IOException; import java.io.InputStream; diff --git a/src/test/java/org/gcube/accounting/aggregator/file/WorkSpaceDirectoryStructureTest.java b/src/test/java/org/gcube/accounting/aggregator/file/WorkSpaceDirectoryStructureTest.java index 6257563..75cc400 100644 --- a/src/test/java/org/gcube/accounting/aggregator/file/WorkSpaceDirectoryStructureTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/file/WorkSpaceDirectoryStructureTest.java @@ -6,9 +6,9 @@ import java.util.Calendar; import java.util.Date; import java.util.List; +import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.directory.WorkSpaceDirectoryStructure; -import org.gcube.accounting.aggregator.plugin.ContextTest; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.workspace.WorkSpaceManagement; diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index 8a7bda1..403da08 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin.ElaborationType; import org.gcube.accounting.aggregator.utility.Utility; diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java index 2293382..43a699f 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.elaboration.Elaborator; diff --git a/src/test/java/org/gcube/accounting/aggregator/workspace/WorkSpaceManagementTest.java b/src/test/java/org/gcube/accounting/aggregator/workspace/WorkSpaceManagementTest.java index 23a2572..1fa413e 100644 --- a/src/test/java/org/gcube/accounting/aggregator/workspace/WorkSpaceManagementTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/workspace/WorkSpaceManagementTest.java @@ -1,7 +1,7 @@ package org.gcube.accounting.aggregator.workspace; +import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.directory.WorkSpaceDirectoryStructure; -import org.gcube.accounting.aggregator.plugin.ContextTest; import org.gcube.common.storagehub.client.dsl.FolderContainer; import org.junit.Test;