diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index 42c4f5c..c967404 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -46,8 +46,13 @@ public class Aggregator { protected final File originalRecordsbackupFile; protected final File aggregateRecordsBackupFile; protected final File malformedRecordsFile; + + protected int estimatedRecordsNumber; + protected int originalRecordsNumber; protected int malformedRecordNumber; + + protected ObjectMapper objectMapper; protected Calendar startTime; @@ -73,6 +78,9 @@ public class Aggregator { startTime = Utility.getUTCCalendarInstance(); AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); + + estimatedRecordsNumber = aggregatorPersistenceSrc.getEstimatedRecordRecordToBeAggregated(aggregationStatus); + logger.info("Estimated records to be aggregated are {}", estimatedRecordsNumber); ResultSet resultSet = aggregatorPersistenceSrc.getResultSetOfRecordToBeAggregated(aggregationStatus); retrieveAndAggregate(resultSet); @@ -84,7 +92,7 @@ public class Aggregator { private static final String SIMPLE = "Simple"; - protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { + protected void elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer) throws Exception { try { if(content.has(USAGE_RECORD_TYPE)){ @@ -168,23 +176,22 @@ public class Aggregator { aggregateRow(aggregatorBuffer, record); } - ++originalRecordsCounter; - if(originalRecordsCounter%1000==0){ + ++originalRecordsNumber; + if(originalRecordsNumber%1000==0){ int aggregatedRecordsNumber = 0; if(!skipAggregation) { aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); }else { - aggregatedRecordsNumber = originalRecordsCounter; + aggregatedRecordsNumber = originalRecordsNumber; } - int diff = originalRecordsCounter - aggregatedRecordsNumber; - float percentage = (100 * diff) / originalRecordsCounter; - logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents", - aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage); + int diff = originalRecordsNumber - aggregatedRecordsNumber; + float percentage = (100 * diff) / originalRecordsNumber; + logger.info("{} At the moment, the elaborated original records are {} (Total Estimated Number is {}). The Aggregated records are {}. Difference {}. We are recovering {}% of Records", + aggregationStatus.getAggregationInfo(), originalRecordsNumber, estimatedRecordsNumber, aggregatedRecordsNumber, diff, percentage); } Utility.printLine(originalRecordsbackupFile, record); - - return originalRecordsCounter; + }catch (Exception e) { throw e; } @@ -236,8 +243,8 @@ public class Aggregator { Set requiredFields = clz.newInstance().getRequiredFields(); + originalRecordsNumber = 0; malformedRecordNumber = 0; - int originalRecordsCounter = 0; while (resultSet.next()) { for(int i=1; i<=MAX_RETRY; i++){ try { @@ -267,7 +274,7 @@ public class Aggregator { addProperty(objectNode, recordField, obj); } - originalRecordsCounter = elaborateRow(objectNode, aggregatorBuffer, originalRecordsCounter); + elaborateRow(objectNode, aggregatorBuffer); TimeUnit.MILLISECONDS.sleep(3); break; }catch (RuntimeException e) { @@ -304,10 +311,10 @@ public class Aggregator { aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); - aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber); + aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecords.size(), malformedRecordNumber); }else { Files.copy(originalRecordsbackupFile.toPath(), aggregateRecordsBackupFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - aggregationStatus.setRecordNumbers(originalRecordsCounter, originalRecordsCounter, malformedRecordNumber); + aggregationStatus.setRecordNumbers(originalRecordsNumber, originalRecordsNumber, malformedRecordNumber); } aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true); diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java index c2549af..d5f4807 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java @@ -30,7 +30,7 @@ public class DeleteDocument extends DocumentElaboration { if(aggregatorPersistenceSrc.isBulkDeleteAllowed()) { arrayNode.add(jsonNode); aggregatorPersistenceSrc.deleteRecord(jsonNode); - if(arrayNode.size()>=MAX_ROWS_PER_STEP) { + if(arrayNode.size()>=effectiveMaxRowPerStep) { aggregatorPersistenceSrc.deleteRecords(arrayNode); arrayNode = DSMapper.getObjectMapper().createArrayNode(); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java index f098a33..86e1943 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java @@ -37,6 +37,8 @@ public abstract class DocumentElaboration { protected Calendar startTime; + protected int effectiveMaxRowPerStep; + protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) { this.aggregationStatus = statusManager; this.finalAggregationState = finalAggregationState; @@ -58,9 +60,9 @@ public abstract class DocumentElaboration { logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated); - int numberOfRowsForEachRecoveryPoint = (rowToBeElaborated / 10) + 1; - if(numberOfRowsForEachRecoveryPoint>MAX_ROWS_PER_STEP) { - numberOfRowsForEachRecoveryPoint = MAX_ROWS_PER_STEP; + effectiveMaxRowPerStep = (rowToBeElaborated / 10) + 1; + if(effectiveMaxRowPerStep>MAX_ROWS_PER_STEP) { + effectiveMaxRowPerStep = MAX_ROWS_PER_STEP; } currentlyElaborated = 0; @@ -104,7 +106,7 @@ public abstract class DocumentElaboration { } ++currentlyElaborated; - if(currentlyElaborated % numberOfRowsForEachRecoveryPoint == 0) { + if(currentlyElaborated % effectiveMaxRowPerStep == 0) { if(currentlyElaborated>=restartFrom) { aggregationStatus.setRestartFrom(currentlyElaborated, true); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java index dc78a91..e6077f0 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java @@ -22,5 +22,7 @@ public interface AggregatorPersistenceSrc extends AggregatorPersistence { * return true. It must raise UnsupportedOperationException if bulk delete is not allowed */ public void deleteRecords(ArrayNode array) throws UnsupportedOperationException, Exception; + + public int getEstimatedRecordRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception; } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 417bfc0..00e27eb 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -561,6 +561,39 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg return resultSet; } + + @Override + public int getEstimatedRecordRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception { + AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); + + String tableName = RecordToDBFields.getKey(aggregationInfo.getRecordType()); + + String startTimeColumnName = RecordToDBFields.getKey(AggregatedUsageRecord.START_TIME); + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT COUNT(*)"); + stringBuffer.append(" FROM "); + stringBuffer.append(tableName); + stringBuffer.append(" WHERE "); + stringBuffer.append(startTimeColumnName); + stringBuffer.append(" >= "); + stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate())); + stringBuffer.append(" AND "); + stringBuffer.append(startTimeColumnName); + stringBuffer.append(" < "); + stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate())); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlQuery = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); + + resultSet.next(); + return resultSet.getInt(1); + } @Override public boolean isBulkDeleteAllowed() { @@ -606,5 +639,4 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg connection.commit(); } - } diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index 1802e02..f1d7a6c 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -77,24 +77,24 @@ public class AccountingAggregatorPluginTest extends ContextTest { } - @Ignore +// @Ignore @Test public void aggregateAllServiceUsageRecord() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); aggregateOneShot(recordType); // aggregateEverything(recordType); } - + @Test - public void aggregateMonthlyFirstHalf2022ServiceUsageRecord() throws Exception { + public void aggregateServiceUsageRecordDaily() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); boolean forceRestart = true; boolean forceEarlyAggregation = true; - AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JUNE, 1); - Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1); + AggregationType aggregationType = AggregationType.DAILY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 26); + Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); @@ -111,7 +111,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.NOVEMBER, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.DECEMBER, 1); Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); @@ -129,8 +129,8 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.MAY, 1); - Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JULY, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.SEPTEMBER, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.OCTOBER, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); @@ -138,16 +138,17 @@ public class AccountingAggregatorPluginTest extends ContextTest { aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } } - + @Test public void aggregateFirstQuarter2022StorageStatusRecord() throws Exception { String recordType = StorageStatusRecord.class.newInstance().getRecordType(); + boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1); - Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.APRIL, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1); + Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); @@ -164,7 +165,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.APRIL, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.MAY, 1); Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); @@ -173,7 +174,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } } - + @Ignore @Test public void aggregateAllStorageStatusRecord() throws Exception { @@ -203,7 +204,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.DAILY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 25); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); }