From 28b6498967b44fbb75ad73d6afe13f8bb81eb791 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 6 Feb 2024 12:31:38 +0100 Subject: [PATCH] REfactoring code to enable different src and destination --- .../aggregator/aggregation/Aggregator.java | 7 +- .../elaboration/RecoveryManager.java | 7 +- .../aggregator/persist/DeleteDocument.java | 7 +- .../aggregator/persist/InsertDocument.java | 18 +-- .../persistence/AggregatorPersistence.java | 2 +- .../persistence/AggregatorPersistenceDst.java | 6 + .../AggregatorPersistenceFactory.java | 34 +++++ .../persistence/AggregatorPersistenceSrc.java | 9 ++ .../AggregatorPersistenceStatus.java | 27 ++++ .../persistence/PostgreSQLConnector.java | 17 ++- .../persistence/PostgreSQLConnectorDst.java | 37 +++++- .../persistence/PostgreSQLConnectorSrc.java | 2 +- .../PostgreSQLConnectorStatus.java | 12 ++ .../aggregator/status/AggregationStatus.java | 118 ++++++++++-------- .../persistence/PostgreSQLConnectorTest.java | 14 +-- 15 files changed, 228 insertions(+), 89 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java create mode 100644 src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java create mode 100644 src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index d2544e8..66e7671 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -9,7 +9,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceStatus; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; @@ -63,8 +64,8 @@ public class Aggregator { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); - PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); - ResultSet resultSet = postgreSQLConnector.getResultSetOfRecordToBeAggregated(aggregationStatus); + AggregatorPersistenceStatus aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus(); + ResultSet resultSet = aggregatorPersistenceStatus.getResultSetOfRecordToBeAggregated(aggregationStatus); retrieveAndAggregate(resultSet); } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java index 62493bf..39cc8bf 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java @@ -5,7 +5,8 @@ import java.util.List; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceStatus; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.documentstore.records.DSMapper; import org.slf4j.Logger; @@ -35,8 +36,8 @@ public class RecoveryManager { } public void recovery() throws Exception { - PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); - List aggregationStatusList = postgreSQLConnector.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart); + AggregatorPersistenceStatus aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus(); + List aggregationStatusList = aggregatorPersistenceStatus.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart); if(aggregationStatusList.size()==0){ logger.info("Nothing to recover :)"); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java index 4f6c035..17377e4 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java @@ -2,7 +2,8 @@ package org.gcube.accounting.aggregator.persist; import java.io.File; -import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.com.fasterxml.jackson.databind.JsonNode; @@ -22,8 +23,8 @@ public class DeleteDocument extends DocumentElaboration { JsonNode jsonNode = DSMapper.asJsonNode(line); String id = jsonNode.get(ID).asText(); logger.trace("Going to delete record with id {}", id); - PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); - postgreSQLConnector.deleteRecord(jsonNode); + AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); + aggregatorPersistenceSrc.deleteRecord(jsonNode); } @Override diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index 8660ed3..c288213 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -8,6 +8,9 @@ import java.util.Map; import java.util.TreeMap; import org.gcube.accounting.aggregator.elaboration.Elaborator; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceDst; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; +import org.gcube.accounting.aggregator.persistence.PostgreSQLConnectorDst; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Utility; @@ -42,7 +45,8 @@ public class InsertDocument extends DocumentElaboration { protected boolean serviceUsageRecordElaboration; protected File calledMethodCSVFile; - protected PersistencePostgreSQL persistencePostgreSQL; + + protected AggregatorPersistenceDst aggregatorPersistenceDst; protected int count; @@ -56,10 +60,8 @@ public class InsertDocument extends DocumentElaboration { File destinationFolder = file.getParentFile(); calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX)); - AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); - persistencePostgreSQL = new PersistencePostgreSQL(); - persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration); - + aggregatorPersistenceDst = AggregatorPersistenceFactory.getAggregatorPersistenceDst(); + count = 0; } @@ -111,11 +113,11 @@ public class InsertDocument extends DocumentElaboration { JsonNode jsonNode = analyseLine(line); Record record = RecordUtility.getRecord(jsonNode.toString()); - persistencePostgreSQL.insert(record); + aggregatorPersistenceDst.insert(record); ++count; if(count==100) { - persistencePostgreSQL.commitAndClose(); + aggregatorPersistenceDst.commitAndClose(); count = 0; } @@ -123,7 +125,7 @@ public class InsertDocument extends DocumentElaboration { @Override protected void afterElaboration() throws Exception { - persistencePostgreSQL.commitAndClose(); + aggregatorPersistenceDst.commitAndClose(); count = 0; if(serviceUsageRecordElaboration) { diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistence.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistence.java index a0bd8bf..b9fd312 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistence.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistence.java @@ -6,5 +6,5 @@ import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQu * @author Luca Frosini (ISTI - CNR) */ public interface AggregatorPersistence extends AccountingPersistenceBackendQuery { - + } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceDst.java index 843bf35..02f28e9 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceDst.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceDst.java @@ -1,8 +1,14 @@ package org.gcube.accounting.aggregator.persistence; +import org.gcube.documentstore.records.Record; + /** * @author Luca Frosini (ISTI - CNR) */ public interface AggregatorPersistenceDst extends AggregatorPersistence { + void insert(Record record); + + void commitAndClose(); + } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java new file mode 100644 index 0000000..e64edba --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java @@ -0,0 +1,34 @@ +package org.gcube.accounting.aggregator.persistence; + +/** + * @author Luca Frosini (ISTI-CNR) + */ +public class AggregatorPersistenceFactory { + + private static AggregatorPersistenceStatus aggregatorPersistenceStatus; + + private static AggregatorPersistenceSrc aggregatorPersistenceSrc; + private static AggregatorPersistenceDst aggregatorPersistenceDst; + + public static AggregatorPersistenceStatus getAggregatorPersistenceStatus() throws Exception { + if(aggregatorPersistenceStatus == null) { + aggregatorPersistenceStatus = new PostgreSQLConnectorSrc(); + } + return aggregatorPersistenceStatus; + } + + public static AggregatorPersistenceSrc getAggregatorPersistenceSrc() throws Exception { + if(aggregatorPersistenceSrc == null) { + aggregatorPersistenceSrc = new PostgreSQLConnectorSrc(); + } + return aggregatorPersistenceSrc; + } + + public static AggregatorPersistenceDst getAggregatorPersistenceDst() throws Exception { + if(aggregatorPersistenceDst == null) { + aggregatorPersistenceDst = new PostgreSQLConnectorDst(); + } + return aggregatorPersistenceDst; + } + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java index 70a8d04..84802cb 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java @@ -1,8 +1,17 @@ package org.gcube.accounting.aggregator.persistence; +import java.sql.ResultSet; + +import org.gcube.accounting.aggregator.status.AggregationStatus; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; + /** * @author Luca Frosini (ISTI - CNR) */ public interface AggregatorPersistenceSrc extends AggregatorPersistence { + public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception; + + public void deleteRecord(JsonNode jsonNode) throws Exception; + } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java new file mode 100644 index 0000000..cc447ae --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java @@ -0,0 +1,27 @@ +package org.gcube.accounting.aggregator.persistence; + +import java.util.Date; +import java.util.List; + +import org.gcube.accounting.aggregator.aggregation.AggregationType; +import org.gcube.accounting.aggregator.status.AggregationStatus; + +/** + * @author Luca Frosini (ISTI - CNR) + */ +public interface AggregatorPersistenceStatus extends AggregatorPersistence { + + public List getUnterminated(String recordType, AggregationType aggregationType, + Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception; + + public List getAll() throws Exception; + + public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, + Date aggregationStartDate) throws Exception; + + public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, + Date aggregationEndDate) throws Exception; + + public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception; + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 376322f..1f7c25c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -39,7 +39,7 @@ import org.postgresql.core.Utils; /** * @author Luca Frosini (ISTI-CNR) */ -public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { +public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceStatus { private static final String UTC_TIME_ZONE = "UTC"; public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE); @@ -117,7 +117,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return getQuotedString(serializable.toString()); } - public String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException { + protected String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("INSERT INTO "); @@ -176,7 +176,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return stringBuffer.toString(); } - public String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException { + protected String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException { StringBuffer stringBuffer = new StringBuffer(); for(AggregationStateEvent aggregationStateEvent : aggregationStatus.getAggregationStateEvents()) { stringBuffer.append("INSERT INTO "); @@ -195,7 +195,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return stringBuffer.toString(); } - + @Override public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception { Connection connection = getConnection(); Statement statement = connection.createStatement(); @@ -221,7 +221,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return date; } - public AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { + protected AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { String recordType = resultSet.getString("record_type"); String aggregationTypeString = resultSet.getString("aggregation_type"); AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString); @@ -256,6 +256,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return aggregationStatus; } + @Override public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ /* @@ -319,6 +320,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { // return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart); // } + @Override public List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{ /* @@ -396,6 +398,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } + @Override public List getAll() throws Exception{ /* @@ -429,7 +432,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } - + @Override public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ /* @@ -480,6 +483,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return aggregationStatus; } + @Override public void deleteRecord(JsonNode jsonNode) throws Exception { Record record = DSMapper.unmarshal(Record.class, jsonNode.toString()); Class clz = record.getClass(); @@ -508,6 +512,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } + @Override public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception { AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java index 93ee61c..fb7a4a3 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java @@ -1,12 +1,45 @@ package org.gcube.accounting.aggregator.persistence; +import java.util.Calendar; +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; + +import org.gcube.accounting.analytics.Filter; +import org.gcube.accounting.analytics.Info; +import org.gcube.accounting.analytics.NumberedFilter; +import org.gcube.accounting.analytics.TemporalConstraint; +import org.gcube.accounting.analytics.UsageValue; +import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException; +import org.gcube.accounting.analytics.exception.KeyException; +import org.gcube.accounting.analytics.exception.ValueException; +import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; +import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; +import org.gcube.documentstore.persistence.PersistencePostgreSQL; +import org.gcube.documentstore.records.AggregatedRecord; +import org.gcube.documentstore.records.Record; + /** * @author Luca Frosini (ISTI-CNR) */ -public class PostgreSQLConnectorDst extends PostgreSQLConnector implements AggregatorPersistenceDst { +public class PostgreSQLConnectorDst implements AggregatorPersistenceDst { + protected PersistencePostgreSQL persistencePostgreSQL; + protected PostgreSQLConnectorDst() throws Exception { - super(AggregatorPersistenceDst.class); + AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); + persistencePostgreSQL = new PersistencePostgreSQL(); + persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration); + } + + @Override + public void insert(Record record) { + persistencePostgreSQL.insert(record); + } + + @Override + public void commitAndClose() { + persistencePostgreSQL.commitAndClose(); } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java index 1acfcba..bb4c902 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java @@ -5,7 +5,7 @@ package org.gcube.accounting.aggregator.persistence; */ public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc { - protected PostgreSQLConnectorSrc(Class clazz) throws Exception { + protected PostgreSQLConnectorSrc() throws Exception { super(AggregatorPersistenceSrc.class); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java new file mode 100644 index 0000000..04a2892 --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java @@ -0,0 +1,12 @@ +package org.gcube.accounting.aggregator.persistence; + +/** + * @author Luca Frosini (ISTI-CNR) + */ +public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceSrc { + + protected PostgreSQLConnectorStatus() throws Exception { + super(AggregatorPersistenceStatus.class); + } + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index 271bc47..5924e2a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -8,7 +8,7 @@ import java.util.UUID; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; +import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; @@ -20,63 +20,70 @@ import org.slf4j.LoggerFactory; * @author Luca Frosini (ISTI - CNR) */ public class AggregationStatus { - + private static Logger logger = LoggerFactory.getLogger(AggregationStatus.class); - + protected AggregationInfo aggregationInfo; - + @JsonProperty protected UUID uuid; - + @JsonProperty protected int originalRecordsNumber; - + @JsonProperty protected int aggregatedRecordsNumber; - + @JsonProperty protected int recoveredRecordNumber; - + @JsonProperty protected int malformedRecordNumber; - + @JsonProperty protected float percentage; - - @JsonProperty(required=false) + + @JsonProperty(required = false) protected String context; - - @JsonProperty(required=false) + + @JsonProperty(required = false) protected AggregationStatus previous; - + // Last observed status - @JsonFormat(shape= JsonFormat.Shape.STRING) + @JsonFormat(shape = JsonFormat.Shape.STRING) @JsonProperty protected AggregationState aggregationState; - + @JsonProperty - @JsonFormat(shape= JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN) + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN) protected Calendar lastUpdateTime; - + // List of Status Event Changes @JsonProperty protected List aggregationStateEvents; - + // Needed for Jackon Unmarshalling - protected AggregationStatus(){} - - public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - return PostgreSQLConnector.getPostgreSQLConnector().getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); + protected AggregationStatus() { } - - public static List getUnterminated(String recordType, AggregationType aggregationType, boolean forceEarlyAggregation) throws Exception{ - return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null, forceEarlyAggregation); + + public static AggregationStatus getLast(String recordType, AggregationType aggregationType, + Date aggregationStartDate, Date aggregationEndDate) throws Exception { + return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getLast(recordType, aggregationType, + aggregationStartDate, aggregationEndDate); } - - public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ - return PostgreSQLConnector.getPostgreSQLConnector().getAggregationStatus(recordType, aggregationType, aggregationStartDate); + + public static List getUnterminated(String recordType, AggregationType aggregationType, + boolean forceEarlyAggregation) throws Exception { + return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getUnterminated(recordType, + aggregationType, null, null, forceEarlyAggregation); } - + + public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, + Date aggregationStartDate) throws Exception { + return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getAggregationStatus(recordType, + aggregationType, aggregationStartDate); + } + public AggregationStatus(AggregationInfo aggregationInfo) throws Exception { this.aggregationInfo = aggregationInfo; this.aggregationStateEvents = new ArrayList<>(); @@ -84,7 +91,7 @@ public class AggregationStatus { this.malformedRecordNumber = 0; this.previous = null; } - + public AggregationStatus(AggregationStatus aggregationStatus) throws Exception { this.aggregationInfo = new AggregationInfo(aggregationStatus.getAggregationInfo()); this.aggregationStateEvents = new ArrayList<>(); @@ -92,47 +99,48 @@ public class AggregationStatus { this.malformedRecordNumber = 0; this.previous = aggregationStatus; } - + public AggregationInfo getAggregationInfo() { return aggregationInfo; } - public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync) throws Exception { + public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync) + throws Exception { Calendar endTime = Utility.getUTCCalendarInstance(); - + logger.info("Going to Set {} for {} to {}. StartTime {}, EndTime {} [Duration : {}]", - AggregationState.class.getSimpleName(), - aggregationInfo, aggregationState.name(), + AggregationState.class.getSimpleName(), aggregationInfo, aggregationState.name(), Constant.DEFAULT_DATE_FORMAT.format(startTime.getTime()), - Constant.DEFAULT_DATE_FORMAT.format(endTime.getTime()), + Constant.DEFAULT_DATE_FORMAT.format(endTime.getTime()), Utility.getHumanReadableDuration(endTime.getTimeInMillis() - startTime.getTimeInMillis())); - + this.aggregationState = aggregationState; this.lastUpdateTime = endTime; - + AggregationStateEvent aggregationStatusEvent = new AggregationStateEvent(aggregationState, startTime, endTime); aggregationStateEvents.add(aggregationStatusEvent); - - if(sync){ - PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this); + + if (sync) { + AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this); } } public void setRecordNumbers(int originalRecordsNumber, int aggregatedRecordsNumber, int malformedRecordNumber) { this.recoveredRecordNumber = originalRecordsNumber - aggregatedRecordsNumber; - this.percentage = originalRecordsNumber!=0 ? (100 * recoveredRecordNumber) / originalRecordsNumber : 0; - logger.info("Original records are {} ({} were malformed). Aggregated records are {}. Difference {}. We recover {}% of Documents", - originalRecordsNumber, malformedRecordNumber, aggregatedRecordsNumber, recoveredRecordNumber, percentage); + this.percentage = originalRecordsNumber != 0 ? (100 * recoveredRecordNumber) / originalRecordsNumber : 0; + logger.info( + "Original records are {} ({} were malformed). Aggregated records are {}. Difference {}. We recover {}% of Documents", + originalRecordsNumber, malformedRecordNumber, aggregatedRecordsNumber, recoveredRecordNumber, + percentage); this.malformedRecordNumber = malformedRecordNumber; this.originalRecordsNumber = originalRecordsNumber; this.aggregatedRecordsNumber = aggregatedRecordsNumber; } - + public UUID getUUID() { return uuid; } - public void setUUID(UUID uuid) { this.uuid = uuid; } @@ -140,7 +148,7 @@ public class AggregationStatus { public void setAggregation(AggregationInfo aggregation) { this.aggregationInfo = aggregation; } - + public int getOriginalRecordsNumber() { return originalRecordsNumber; } @@ -148,11 +156,11 @@ public class AggregationStatus { public int getAggregatedRecordsNumber() { return aggregatedRecordsNumber; } - + public int getRecoveredRecordNumber() { return recoveredRecordNumber; } - + public float getPercentage() { return percentage; } @@ -160,7 +168,7 @@ public class AggregationStatus { public AggregationState getAggregationState() { return aggregationState; } - + public void setAggregationState(AggregationState aggregationState) { this.aggregationState = aggregationState; } @@ -176,7 +184,7 @@ public class AggregationStatus { public void setContext(String context) { this.context = context; } - + public AggregationStatus getPrevious() { return previous; } @@ -192,15 +200,15 @@ public class AggregationStatus { public Calendar getLastUpdateTime() { return lastUpdateTime; } - + public void setLastUpdateTime(Calendar lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; } public void updateLastUpdateTime(boolean sync) throws Exception { this.lastUpdateTime = Utility.getUTCCalendarInstance(); - if(sync){ - PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this); + if (sync) { + AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this); } } diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java index 9aff792..120bd04 100644 --- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java @@ -21,21 +21,21 @@ public class PostgreSQLConnectorTest extends ContextTest { private static Logger logger = LoggerFactory.getLogger(PostgreSQLConnectorTest.class); - protected PostgreSQLConnector postgreSQLConnector; + protected AggregatorPersistenceStatus aggregatorPersistenceStatus; public PostgreSQLConnectorTest() throws Exception { - postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); + aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus(); } @Test public void getLastTest() throws Exception { - AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); + AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @Test public void getUnterminatedTest() throws Exception{ - List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true); + List aggregationStatuses = aggregatorPersistenceStatus.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -46,7 +46,7 @@ public class PostgreSQLConnectorTest extends ContextTest { Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31); - AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); + AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -55,7 +55,7 @@ public class PostgreSQLConnectorTest extends ContextTest { Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30); - List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true); + List aggregationStatuses = aggregatorPersistenceStatus.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -64,7 +64,7 @@ public class PostgreSQLConnectorTest extends ContextTest { @Test public void getAggregationStatusTest() throws Exception{ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15); - AggregationStatus aggregationStatus = postgreSQLConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime()); + AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime()); logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); }