From 532c74d7e4f8ce2e5a6483b3bc5c5413cfff3f4f Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 9 Nov 2021 11:53:29 +0100 Subject: [PATCH] Improved aggregator --- .../aggregator/elaboration/Elaborator.java | 25 ++++++------- .../elaboration/RecoveryManager.java | 24 ++++++++++--- .../aggregator/persist/Persist.java | 33 ++++++++--------- .../persistence/PostgreSQLConnector.java | 36 ++++++++++++------- .../plugin/AccountingAggregatorPlugin.java | 28 ++++++++------- .../aggregator/status/AggregationStatus.java | 4 +-- .../persistence/PostgreSQLConnectorTest.java | 4 +-- 7 files changed, 90 insertions(+), 64 deletions(-) diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java index 36e40df..ffc6c6c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java @@ -10,7 +10,6 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.Aggregator; import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure; import org.gcube.accounting.aggregator.persist.Persist; -import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; @@ -163,9 +162,6 @@ public class Elaborator { FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure(); File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate); - // Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src); - // Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst); - File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType); File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile); @@ -173,19 +169,18 @@ public class Elaborator { aggregator.aggregate(); - Calendar now = Utility.getUTCCalendarInstance(); - /* - * now is passed as argument to isTimeElapsed function to avoid situation - * (even rare) where both check are valid because the first invocation happen - * before midnight and the second after midnight (so in the next day). - */ - if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) { - // Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); +// Calendar now = Utility.getUTCCalendarInstance(); +// /* +// * now is passed as argument to isTimeElapsed function to avoid situation +// * (even rare) where both check are valid because the first invocation happen +// * before midnight and the second after midnight (so in the next day). +// */ +// if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) { Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); persist.recover(); - }else{ - logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime)); - } +// }else{ +// logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime)); +// } } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java index f15117c..0a47d7c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java @@ -4,6 +4,7 @@ import java.util.Date; import java.util.List; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; +import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.documentstore.records.DSMapper; @@ -19,19 +20,22 @@ public class RecoveryManager { protected final Date aggregationStartDate; protected final Date aggregationEndDate; + protected String recordType; + protected AggregationType aggregationType; + protected boolean forceRestart; + public RecoveryManager(Date persistStartTime, Date persistEndTime, Date aggregationStartDate, Date aggregationEndDate){ super(); this.persistStartTime = persistStartTime; this.persistEndTime = persistEndTime; this.aggregationStartDate = aggregationStartDate; this.aggregationEndDate = aggregationEndDate; + this.forceRestart = false; } public void recovery() throws Exception { PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); - // TODO - - List aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate); + List aggregationStatusList = postgreSQLConnector.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart); if(aggregationStatusList.size()==0){ logger.info("Nothing to recover :)"); } @@ -43,9 +47,21 @@ public class RecoveryManager { logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime); - elaborator.elaborate(false, false, false); + elaborator.elaborate(true, true, forceRestart); } } + + public void setForceRestart(boolean forceRestart) { + this.forceRestart = forceRestart; + } + + public void setRecordType(String recordType) { + this.recordType = recordType; + } + + public void setAggregationType(AggregationType aggregationType) { + this.aggregationType = aggregationType; + } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java index 67cbbd2..cacca77 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java @@ -69,22 +69,23 @@ public class Persist { setAggregationStateToCompleted(now); return; } + +// Giving the rewrite rules the number of records could be the same but the calledMAthods has been replaced +// +// if(aggregationStatus.getOriginalRecordsNumber()==aggregationStatus.getAggregatedRecordsNumber()){ +// +// logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. Setting {} to {}", +// aggregationStatus.getAggregationInfo(), +// aggregationStatus.getOriginalRecordsNumber(), +// aggregationStatus.getAggregatedRecordsNumber(), +// aggregationStatus.getOriginalRecordsNumber(), +// aggregationStatus.getAggregatedRecordsNumber(), +// AggregationState.class.getSimpleName(), AggregationState.COMPLETED); +// setAggregationStateToCompleted(now); +// return; +// } +// - /* - if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){ - - logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}", - aggregationStatus.getAggregationInfo(), - aggregationStatus.getOriginalRecordsNumber(), - aggregationStatus.getAggregatedRecordsNumber(), - aggregationStatus.getOriginalRecordsNumber(), - aggregationStatus.getAggregatedRecordsNumber(), - originalRecordBucket.name(), - AggregationState.class.getSimpleName(), AggregationState.COMPLETED); - setAggregationStateToCompleted(now); - return; - } - */ } if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.AGGREGATED)){ @@ -94,7 +95,7 @@ public class Persist { DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile); deleteDocument.elaborate(); } - // InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket); + InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile); boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false; insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 9d08140..28134a5 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -324,11 +324,11 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } - public List getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - return getUnterminated(null, null, aggregationStartDate, aggregationEndDate); - } +// public List getUnterminated(Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{ +// return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart); +// } - public List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ + public List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{ /* * SELECT * @@ -352,15 +352,25 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { stringBuffer.append("current_aggregation_state != "); stringBuffer.append(getValue(AggregationState.COMPLETED)); - Calendar now = Utility.getUTCCalendarInstance(); - now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED); - stringBuffer.append(" AND "); - stringBuffer.append("last_update_time < "); - stringBuffer.append(getValue(now)); + if(!forceRestart) { + Calendar now = Utility.getUTCCalendarInstance(); + now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED); + stringBuffer.append(" AND "); + stringBuffer.append("last_update_time < "); + stringBuffer.append(getValue(now)); + } - stringBuffer.append(" AND "); - stringBuffer.append("record_type = "); - stringBuffer.append(getValue(recordType)); + if(recordType!=null) { + stringBuffer.append(" AND "); + stringBuffer.append("record_type = "); + stringBuffer.append(getValue(recordType)); + } + + if(aggregationType!=null) { + stringBuffer.append(" AND "); + stringBuffer.append("aggregation_type = "); + stringBuffer.append(getValue(aggregationType)); + } if(aggregationStartDate!=null && aggregationEndDate!=null) { stringBuffer.append(" AND "); @@ -368,7 +378,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { stringBuffer.append(getValue(aggregationStartDate)); stringBuffer.append(" AND "); - stringBuffer.append("aggregation_start_date <= "); + stringBuffer.append("aggregation_end_date <= "); stringBuffer.append(getValue(aggregationEndDate)); } diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index db5baab..2548a9c 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -161,14 +161,25 @@ public class AccountingAggregatorPlugin extends Plugin { aggregationEndDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationEndDateString); } + if(inputs.containsKey(FORCE_RESTART)) { + forceRestart = (boolean) inputs.get(FORCE_RESTART); + } + + if(inputs.containsKey(AGGREGATION_TYPE_INPUT_PARAMETER)) { + aggregationType = AggregationType.valueOf((String) inputs.get(AGGREGATION_TYPE_INPUT_PARAMETER)); + } + + if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) { + recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER); + usageRecordClass = (Class) RecordUtility.getRecordClass(recordType); + logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass); + } switch (elaborationType) { case AGGREGATE: if (!inputs.containsKey(AGGREGATION_TYPE_INPUT_PARAMETER)) { throw new IllegalArgumentException("Please set required parameter '" + AGGREGATION_TYPE_INPUT_PARAMETER +"'"); } - aggregationType = AggregationType.valueOf((String) inputs.get(AGGREGATION_TYPE_INPUT_PARAMETER)); - if(inputs.containsKey(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER)){ restartFromLastAggregationDate = (boolean) inputs.get(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER); @@ -182,20 +193,10 @@ public class AccountingAggregatorPlugin extends Plugin { forceRerun = (boolean) inputs.get(FORCE_RERUN); } - if(inputs.containsKey(FORCE_RESTART)) { - forceRestart = (boolean) inputs.get(FORCE_RESTART); - } - if(restartFromLastAggregationDate==false && aggregationStartDate==null){ throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'."); } - if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) { - recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER); - usageRecordClass = (Class) RecordUtility.getRecordClass(recordType); - logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass); - } - AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate); aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation); aggregatorManager.setForceRerun(forceRerun); @@ -206,6 +207,9 @@ public class AccountingAggregatorPlugin extends Plugin { case RECOVERY: RecoveryManager recoveryManager = new RecoveryManager(persistStartTime, persistEndTime, aggregationStartDate, aggregationEndDate); + recoveryManager.setForceRestart(forceRestart); + recoveryManager.setRecordType(recordType); + recoveryManager.setAggregationType(aggregationType); recoveryManager.recovery(); break; diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index 98cf000..271bc47 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -69,8 +69,8 @@ public class AggregationStatus { return PostgreSQLConnector.getPostgreSQLConnector().getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); } - public static List getUnterminated(String recordType, AggregationType aggregationType) throws Exception{ - return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null); + public static List getUnterminated(String recordType, AggregationType aggregationType, boolean forceEarlyAggregation) throws Exception{ + return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null, forceEarlyAggregation); } public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java index d15eb8d..9aff792 100644 --- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java @@ -35,7 +35,7 @@ public class PostgreSQLConnectorTest extends ContextTest { @Test public void getUnterminatedTest() throws Exception{ - List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); + List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -55,7 +55,7 @@ public class PostgreSQLConnectorTest extends ContextTest { Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30); - List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); + List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); }