From 0c236e845ddabfb3751886eed023739bdd238d59 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Thu, 22 Feb 2024 16:14:51 +0100 Subject: [PATCH] Improving aggregator --- .../aggregator/aggregation/Aggregator.java | 58 ++++++--- .../elaboration/AggregatorManager.java | 8 +- .../aggregator/elaboration/Elaborator.java | 3 +- .../elaboration/RecoveryManager.java | 2 +- .../persistence/PostgreSQLConnector.java | 23 ++-- .../plugin/AccountingAggregatorPlugin.java | 22 ++++ .../AccountingAggregatorPluginTest.java | 111 +++++++++++------- 7 files changed, 154 insertions(+), 73 deletions(-) diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index bec7c44..42c4f5c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -1,6 +1,8 @@ package org.gcube.accounting.aggregator.aggregation; import java.io.File; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.sql.ResultSet; import java.time.OffsetDateTime; import java.util.Calendar; @@ -49,6 +51,8 @@ public class Aggregator { protected ObjectMapper objectMapper; protected Calendar startTime; + + protected boolean skipAggregation; public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { this.aggregationStatus = aggregationStatus; @@ -60,6 +64,10 @@ public class Aggregator { } + public void setSkipAggregation(boolean skipAggregation) { + this.skipAggregation = skipAggregation; + } + public void aggregate() throws Exception { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); @@ -155,18 +163,25 @@ public class Aggregator { String record = content.toString(); - // Aggregate the Record - aggregateRow(aggregatorBuffer, record); + if(!skipAggregation) { + // Aggregate the Record + aggregateRow(aggregatorBuffer, record); + } ++originalRecordsCounter; if(originalRecordsCounter%1000==0){ - int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); + int aggregatedRecordsNumber = 0; + if(!skipAggregation) { + aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); + }else { + aggregatedRecordsNumber = originalRecordsCounter; + } int diff = originalRecordsCounter - aggregatedRecordsNumber; float percentage = (100 * diff) / originalRecordsCounter; logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents", aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage); } - + Utility.printLine(originalRecordsbackupFile, record); return originalRecordsCounter; @@ -269,25 +284,32 @@ public class Aggregator { logger.debug("{} Elaboration of Records terminated at {}. Duration {}", aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman); - File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(), - aggregateRecordsBackupFile.getName() + TMP_SUFFIX); - aggregateRecordsBackupFileTmp.delete(); - // Saving Aggregated Record on local file - logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), - aggregateRecordsBackupFile); - - List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); - for (AggregatedRecord aggregatedRecord : aggregatedRecords) { - String marshalled = DSMapper.marshal(aggregatedRecord); - Utility.printLine(aggregateRecordsBackupFileTmp, marshalled); + if(!skipAggregation) { + File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(), + aggregateRecordsBackupFile.getName() + TMP_SUFFIX); + aggregateRecordsBackupFileTmp.delete(); + + // Saving Aggregated Record on local file + logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), + aggregateRecordsBackupFile); + + List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); + for (AggregatedRecord aggregatedRecord : aggregatedRecords) { + String marshalled = DSMapper.marshal(aggregatedRecord); + Utility.printLine(aggregateRecordsBackupFileTmp, marshalled); + } + + aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); + + aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber); + }else { + Files.copy(originalRecordsbackupFile.toPath(), aggregateRecordsBackupFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + aggregationStatus.setRecordNumbers(originalRecordsCounter, originalRecordsCounter, malformedRecordNumber); } - aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); - - aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber); aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true); } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java index 99580f7..84fe927 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java @@ -25,6 +25,8 @@ public class AggregatorManager { protected boolean forceEarlyAggregation; protected boolean forceRerun; protected boolean forceRestart; + + protected boolean skipAggregation; public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate, Date aggregationStartDate, Date aggregationEndDate) throws Exception { @@ -54,6 +56,10 @@ public class AggregatorManager { this.forceRestart = forceRestart; } + public void setSkipAggregation(boolean skipAggregation) { + this.skipAggregation = skipAggregation; + } + protected Date getEndDateFromStartDate() { return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1); } @@ -99,7 +105,7 @@ public class AggregatorManager { } Elaborator elaborator = new Elaborator(aggregationStatus); - elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart); + elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart, skipAggregation); } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java index e2bd7d1..531d737 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java @@ -79,7 +79,7 @@ public class Elaborator { return allowed; } - public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart) throws Exception { + public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart, boolean skipAggregation) throws Exception { Calendar startTime = Utility.getUTCCalendarInstance(); final AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); @@ -164,6 +164,7 @@ public class Elaborator { File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile); Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile); + aggregator.setSkipAggregation(skipAggregation); aggregator.aggregate(); Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java index 39cc8bf..f8d6fbc 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java @@ -49,7 +49,7 @@ public class RecoveryManager { logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); Elaborator elaborator = new Elaborator(aggregationStatus); - elaborator.elaborate(true, true, forceRestart); + elaborator.elaborate(true, true, forceRestart, false); } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index c91a7d9..acc8f55 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -204,14 +204,21 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg @Override public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception { - Connection connection = getConnection(); - Statement statement = connection.createStatement(); - String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true); - statement.executeUpdate(sqlCommand); - sqlCommand = getInsertAggregationStateQuery(aggregationStatus); - statement.executeUpdate(sqlCommand); - statement.close(); - connection.commit(); + for(int i=1; i<=3; i++){ + try { + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true); + statement.executeUpdate(sqlCommand); + sqlCommand = getInsertAggregationStateQuery(aggregationStatus); + statement.executeUpdate(sqlCommand); + statement.close(); + connection.commit(); + break; + }catch (Throwable e) { + connection.close(); + } + } } // private Calendar getCalendar(ResultSet resultSet, String columnLabel) throws SQLException { diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index 4e34f9a..848cb29 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -61,6 +61,20 @@ public class AccountingAggregatorPlugin extends Plugin { public static final String FORCE_RERUN = "forceRerun"; public static final String FORCE_RESTART = "forceRestart"; + /** + * This parameter is used to skip aggregation. Default is false. + * It is used mainly to move the data from a db/table to another + * when the data has been already aggregated and any attempt to aggregate again + * does not have any effect. + * Please note that aggregation could take a lot of time when the amount of + * not aggregable records is more than 50.000/60.000. + * The reason is simple. Any records must be compared with the previous 50.000 + * before deciding that is not aggregable and to be inserted in the list. + * Furthermore it requires a lot of memory. + * + */ + public static final String SKIP_AGGREGATION = "skipAggregation"; + public enum ElaborationType { AGGREGATE, // Aggregate RECOVERY // Recover unterminated executions @@ -128,6 +142,8 @@ public class AccountingAggregatorPlugin extends Plugin { boolean forceRerun = false; boolean forceRestart = false; + boolean skipAggregation = false; + if (inputs == null || inputs.isEmpty()) { throw new IllegalArgumentException("The can only be launched providing valid input parameters"); } @@ -187,10 +203,16 @@ public class AccountingAggregatorPlugin extends Plugin { throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'."); } + if(inputs.containsKey(SKIP_AGGREGATION)) { + skipAggregation = (boolean) inputs.get(SKIP_AGGREGATION); + } + AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate); aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation); aggregatorManager.setForceRerun(forceRerun); aggregatorManager.setForceRestart(forceRestart); + aggregatorManager.setSkipAggregation(skipAggregation); + // aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType); aggregatorManager.elaborate(recordType); diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index 8edc33a..e02aeb6 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -25,7 +25,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { public static final String ROOT_PROD = "/d4science.research-infrastructures.eu"; private void aggregate(String recordType, AggregationType aggregationType, Calendar aggregationStartCalendar, - Calendar aggregationEndCalendar, boolean forceRerun, boolean forceEarlyAggregation) throws Exception { + Calendar aggregationEndCalendar, boolean forceRerun, boolean forceEarlyAggregation, boolean skipAggregation) throws Exception { Map inputs = new HashMap(); inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name()); @@ -39,6 +39,8 @@ public class AccountingAggregatorPluginTest extends ContextTest { inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, forceEarlyAggregation); inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, forceRerun); inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false); + + inputs.put(AccountingAggregatorPlugin.SKIP_AGGREGATION, skipAggregation); if (aggregationStartCalendar != null) { String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT @@ -82,40 +84,22 @@ public class AccountingAggregatorPluginTest extends ContextTest { // aggregateOneShot(recordType); aggregateEverything(recordType); } - -// @JsonIgnore - @Test - public void aggregateDailyServiceUsageRecord() throws Exception { - String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.DAILY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 6); - Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } // @JsonIgnore @Test - public void aggregateMonthlyFirstHalf2021ServiceUsageRecord() throws Exception { + public void aggregateMonthlyFirstHalf2022ServiceUsageRecord() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.FEBRUARY, 1); - Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1); + Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } @@ -123,32 +107,62 @@ public class AccountingAggregatorPluginTest extends ContextTest { // @JsonIgnore @Test - public void aggregateMonthlySecondHalf2021ServiceUsageRecord() throws Exception { + public void aggregateMonthlySecondHalf2022ServiceUsageRecord() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.AUGUST, 1); - Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } } - @JsonIgnore @Test - public void aggregateAllStorageStatusRecord() throws Exception { - String recordType = StorageStatusRecord.class.newInstance().getRecordType(); -// aggregateOneShot(recordType); -// aggregateEverything(recordType); + public void aggregateMonthlyFirstHalf2023ServiceUsageRecord() throws Exception { + String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JULY, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } } + @JsonIgnore + @Test + public void aggregateMonthlySecondHalf2023ServiceUsageRecord() throws Exception { + String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.JULY, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + + // @JsonIgnore @Test public void aggregateMonthlyStorageStatusRecord() throws Exception { @@ -157,11 +171,11 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); - Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 1); + Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JUNE, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } @@ -176,17 +190,25 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JUNE, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.SEPTEMBER, 1); Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } } @JsonIgnore + @Test + public void aggregateAllStorageStatusRecord() throws Exception { + String recordType = StorageStatusRecord.class.newInstance().getRecordType(); +// aggregateOneShot(recordType); +// aggregateEverything(recordType); + } + +// @JsonIgnore @Test public void aggregateAllJobUsageRecord() throws Exception { String recordType = JobUsageRecord.class.newInstance().getRecordType(); @@ -194,21 +216,22 @@ public class AccountingAggregatorPluginTest extends ContextTest { // aggregateEverything(recordType); } - @JsonIgnore +// @JsonIgnore @Test public void aggregateAllStorageUsageRecord() throws Exception { String recordType = StorageUsageRecord.class.newInstance().getRecordType(); aggregateOneShot(recordType); // aggregateEverything(recordType); } + - public void aggregateOneShot(String recordType) throws Exception { + public void aggregateOneShot(String recordType) throws Exception { boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.DAILY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 20); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); } public void aggregateEverything(String recordType) throws Exception { @@ -221,7 +244,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JANUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } @@ -232,7 +255,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); } @@ -242,7 +265,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false); aggregationStartCalendar = Calendar.getInstance(); aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); }