Improved aggregator

This commit is contained in:
Luca Frosini 2021-11-09 11:53:29 +01:00
parent d5352602ef
commit 532c74d7e4
7 changed files with 90 additions and 64 deletions

View File

@ -10,7 +10,6 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.aggregation.Aggregator;
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
import org.gcube.accounting.aggregator.persist.Persist;
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant;
@ -163,9 +162,6 @@ public class Elaborator {
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
// Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
// Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
@ -173,19 +169,18 @@ public class Elaborator {
aggregator.aggregate();
Calendar now = Utility.getUTCCalendarInstance();
/*
* now is passed as argument to isTimeElapsed function to avoid situation
* (even rare) where both check are valid because the first invocation happen
* before midnight and the second after midnight (so in the next day).
*/
if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
// Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
// Calendar now = Utility.getUTCCalendarInstance();
// /*
// * now is passed as argument to isTimeElapsed function to avoid situation
// * (even rare) where both check are valid because the first invocation happen
// * before midnight and the second after midnight (so in the next day).
// */
// if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover();
}else{
logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));
}
// }else{
// logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));
// }
}

View File

@ -4,6 +4,7 @@ import java.util.Date;
import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.documentstore.records.DSMapper;
@ -19,19 +20,22 @@ public class RecoveryManager {
protected final Date aggregationStartDate;
protected final Date aggregationEndDate;
protected String recordType;
protected AggregationType aggregationType;
protected boolean forceRestart;
public RecoveryManager(Date persistStartTime, Date persistEndTime, Date aggregationStartDate, Date aggregationEndDate){
super();
this.persistStartTime = persistStartTime;
this.persistEndTime = persistEndTime;
this.aggregationStartDate = aggregationStartDate;
this.aggregationEndDate = aggregationEndDate;
this.forceRestart = false;
}
public void recovery() throws Exception {
PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector();
// TODO
List<AggregationStatus> aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate);
List<AggregationStatus> aggregationStatusList = postgreSQLConnector.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart);
if(aggregationStatusList.size()==0){
logger.info("Nothing to recover :)");
}
@ -43,9 +47,21 @@ public class RecoveryManager {
logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
elaborator.elaborate(false, false, false);
elaborator.elaborate(true, true, forceRestart);
}
}
public void setForceRestart(boolean forceRestart) {
this.forceRestart = forceRestart;
}
public void setRecordType(String recordType) {
this.recordType = recordType;
}
public void setAggregationType(AggregationType aggregationType) {
this.aggregationType = aggregationType;
}
}

View File

@ -69,22 +69,23 @@ public class Persist {
setAggregationStateToCompleted(now);
return;
}
// Giving the rewrite rules the number of records could be the same but the calledMAthods has been replaced
//
// if(aggregationStatus.getOriginalRecordsNumber()==aggregationStatus.getAggregatedRecordsNumber()){
//
// logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. Setting {} to {}",
// aggregationStatus.getAggregationInfo(),
// aggregationStatus.getOriginalRecordsNumber(),
// aggregationStatus.getAggregatedRecordsNumber(),
// aggregationStatus.getOriginalRecordsNumber(),
// aggregationStatus.getAggregatedRecordsNumber(),
// AggregationState.class.getSimpleName(), AggregationState.COMPLETED);
// setAggregationStateToCompleted(now);
// return;
// }
//
/*
if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){
logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}",
aggregationStatus.getAggregationInfo(),
aggregationStatus.getOriginalRecordsNumber(),
aggregationStatus.getAggregatedRecordsNumber(),
aggregationStatus.getOriginalRecordsNumber(),
aggregationStatus.getAggregatedRecordsNumber(),
originalRecordBucket.name(),
AggregationState.class.getSimpleName(), AggregationState.COMPLETED);
setAggregationStateToCompleted(now);
return;
}
*/
}
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.AGGREGATED)){
@ -94,7 +95,7 @@ public class Persist {
DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile);
deleteDocument.elaborate();
}
// InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile);
boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);

View File

@ -324,11 +324,11 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
}
public List<AggregationStatus> getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{
return getUnterminated(null, null, aggregationStartDate, aggregationEndDate);
}
// public List<AggregationStatus> getUnterminated(Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{
// return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart);
// }
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{
/*
* SELECT *
@ -352,15 +352,25 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
stringBuffer.append("current_aggregation_state != ");
stringBuffer.append(getValue(AggregationState.COMPLETED));
Calendar now = Utility.getUTCCalendarInstance();
now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
stringBuffer.append(" AND ");
stringBuffer.append("last_update_time < ");
stringBuffer.append(getValue(now));
if(!forceRestart) {
Calendar now = Utility.getUTCCalendarInstance();
now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
stringBuffer.append(" AND ");
stringBuffer.append("last_update_time < ");
stringBuffer.append(getValue(now));
}
stringBuffer.append(" AND ");
stringBuffer.append("record_type = ");
stringBuffer.append(getValue(recordType));
if(recordType!=null) {
stringBuffer.append(" AND ");
stringBuffer.append("record_type = ");
stringBuffer.append(getValue(recordType));
}
if(aggregationType!=null) {
stringBuffer.append(" AND ");
stringBuffer.append("aggregation_type = ");
stringBuffer.append(getValue(aggregationType));
}
if(aggregationStartDate!=null && aggregationEndDate!=null) {
stringBuffer.append(" AND ");
@ -368,7 +378,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
stringBuffer.append(getValue(aggregationStartDate));
stringBuffer.append(" AND ");
stringBuffer.append("aggregation_start_date <= ");
stringBuffer.append("aggregation_end_date <= ");
stringBuffer.append(getValue(aggregationEndDate));
}

View File

@ -161,14 +161,25 @@ public class AccountingAggregatorPlugin extends Plugin {
aggregationEndDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationEndDateString);
}
if(inputs.containsKey(FORCE_RESTART)) {
forceRestart = (boolean) inputs.get(FORCE_RESTART);
}
if(inputs.containsKey(AGGREGATION_TYPE_INPUT_PARAMETER)) {
aggregationType = AggregationType.valueOf((String) inputs.get(AGGREGATION_TYPE_INPUT_PARAMETER));
}
if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) {
recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER);
usageRecordClass = (Class<? extends UsageRecord>) RecordUtility.getRecordClass(recordType);
logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass);
}
switch (elaborationType) {
case AGGREGATE:
if (!inputs.containsKey(AGGREGATION_TYPE_INPUT_PARAMETER)) {
throw new IllegalArgumentException("Please set required parameter '" + AGGREGATION_TYPE_INPUT_PARAMETER +"'");
}
aggregationType = AggregationType.valueOf((String) inputs.get(AGGREGATION_TYPE_INPUT_PARAMETER));
if(inputs.containsKey(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER)){
restartFromLastAggregationDate = (boolean) inputs.get(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER);
@ -182,20 +193,10 @@ public class AccountingAggregatorPlugin extends Plugin {
forceRerun = (boolean) inputs.get(FORCE_RERUN);
}
if(inputs.containsKey(FORCE_RESTART)) {
forceRestart = (boolean) inputs.get(FORCE_RESTART);
}
if(restartFromLastAggregationDate==false && aggregationStartDate==null){
throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'.");
}
if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) {
recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER);
usageRecordClass = (Class<? extends UsageRecord>) RecordUtility.getRecordClass(recordType);
logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass);
}
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
aggregatorManager.setForceRerun(forceRerun);
@ -206,6 +207,9 @@ public class AccountingAggregatorPlugin extends Plugin {
case RECOVERY:
RecoveryManager recoveryManager = new RecoveryManager(persistStartTime, persistEndTime, aggregationStartDate, aggregationEndDate);
recoveryManager.setForceRestart(forceRestart);
recoveryManager.setRecordType(recordType);
recoveryManager.setAggregationType(aggregationType);
recoveryManager.recovery();
break;

View File

@ -69,8 +69,8 @@ public class AggregationStatus {
return PostgreSQLConnector.getPostgreSQLConnector().getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
}
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType) throws Exception{
return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null);
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, boolean forceEarlyAggregation) throws Exception{
return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null, forceEarlyAggregation);
}
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{

View File

@ -35,7 +35,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
@Test
public void getUnterminatedTest() throws Exception{
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true);
for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@ -55,7 +55,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1);
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30);
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true);
for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}