Refs #11258: Add possibility to force aggregation (already made or after the allowed range)

Task-Url: https://support.d4science.org/issues/11258

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@164522 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2018-02-22 13:49:54 +00:00
parent a72fdccea7
commit a6a5302227
12 changed files with 158 additions and 38 deletions

View File

@ -271,7 +271,7 @@ public class Aggregator {
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber);
aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true);
aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true);
}

View File

@ -23,17 +23,38 @@ public class AggregatorManager {
protected Date aggregationStartDate;
protected Date aggregationEndDate;
protected final boolean restartFromLastAggregationDate;
protected boolean forceEarlyAggregation;
protected boolean forceRerun;
protected boolean forceRestart;
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
this.aggregationType = aggregationType;
this.aggregationStartDate = aggregationStartDate;
this.aggregationStartDate = Utility.sanitizeDate(aggregationType, aggregationStartDate);
this.aggregationEndDate = aggregationEndDate;
this.restartFromLastAggregationDate = restartFromLastAggregationDate;
this.forceEarlyAggregation = false;
this.forceRerun = false;
this.forceRestart = false;
}
public void setForceEarlyAggregation(boolean forceEarlyAggregation) {
this.forceEarlyAggregation = forceEarlyAggregation;
}
public void setForceRerun(boolean forceRerun) {
this.forceRerun = forceRerun;
}
public void setForceRestart(boolean forceRestart) {
this.forceRestart = forceRestart;
}
protected Date getEndDateFromStartDate() {
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
}
@ -85,14 +106,14 @@ public class AggregatorManager {
}
if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
logger.info("StartDate {} is after last Aggregation End Date allowed {}. Nothing to do.",
logger.info("Start Date {} is after provided End Date {}. Please check input parameters.",
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
return;
}
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
elaborator.elaborate();
elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
}

View File

@ -86,7 +86,7 @@ public class Elaborator {
return allowed;
}
public void elaborate() throws Exception {
public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart) throws Exception {
Calendar startTime = Utility.getUTCCalendarInstance();
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
@ -94,32 +94,62 @@ public class Elaborator {
AggregationType aggregationType = aggregationInfo.getAggregationType();
if(!isAggregationAllowed()){
logger.info("Too early to start aggregation {}. {} Aggregation is not allowed for the last {} {}",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
aggregationType,
aggregationType.getNotAggregableBefore(),
aggregationType.name().toLowerCase().replace("ly", "s").replaceAll("dais", "days"));
return;
if(!forceEarlyAggregation) {
logger.info("Too early to start aggregation {}. {} Aggregation is not allowed for the last {} {}",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
aggregationType,
aggregationType.getNotAggregableBefore(),
aggregationType.name().toLowerCase().replace("ly", "s").replaceAll("dais", "days"));
return;
}else {
logger.info("The aggregation has been forced even is too early to start it {}. {} aggregation should not be made for the last {} {}",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
aggregationType,
aggregationType.getNotAggregableBefore(),
aggregationType.name().toLowerCase().replace("ly", "s").replace("dais", "days"));
}
}
if(aggregationStatus.getAggregationState()==null){
aggregationStatus.setState(AggregationState.STARTED, startTime, true);
aggregationStatus.setAggregationState(AggregationState.STARTED, startTime, true);
}else{
if(aggregationStatus.getAggregationState()==AggregationState.COMPLETED){
logger.info("{} is {}. Nothing to do :-). \n Details {}",
AggregationStatus.class.getSimpleName(),
aggregationStatus.getAggregationState(),
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
return;
if(!forceRerun) {
logger.info("{} is {}. Nothing to do :-). \n Details {}",
AggregationStatus.class.getSimpleName(),
aggregationStatus.getAggregationState(),
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
return;
}else {
logger.info("Last {} is {} and the aggreation should not be needed but it has been forced. Details {}.",
AggregationStatus.class.getSimpleName(),
aggregationStatus.getAggregationState(),
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
aggregationStatus.setAggregationState(AggregationState.RESTARTED, startTime, false);
aggregationStatus.setAggregationState(AggregationState.STARTED, startTime, true);
/*
* Last update time has been just modified restart must be forced otherwise the the aggregation
* cannot continue
*/
forceRestart = true;
}
}
Calendar now = Utility.getUTCCalendarInstance();
now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
if(aggregationStatus.getLastUpdateTime().after(now)){
logger.info("Cannot elaborate {} because has been modified in the last {} ",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED==Calendar.HOUR_OF_DAY? "hours" : "unit");
return;
if(!forceRestart) {
logger.info("Cannot elaborate {} because has been modified in the last {} ",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED==Calendar.HOUR_OF_DAY? "hours" : "unit");
return;
}else {
logger.info("Normally {} cannot be elaborated because has been modified in the last {} but the restart has been forced",
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED==Calendar.HOUR_OF_DAY? "hours" : "unit");
}
}
aggregationStatus.updateLastUpdateTime(true);

View File

@ -40,7 +40,7 @@ public class RecoveryManager {
logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
elaborator.elaborate();
elaborator.elaborate(false, false, false);
}
}

View File

@ -100,7 +100,7 @@ public abstract class DocumentElaboration {
public void elaborate() throws Exception{
startTime = Utility.getUTCCalendarInstance();
readFile();
aggregationStatus.setState(finalAggregationState, startTime, true);
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
afterElaboration();
}

View File

@ -50,7 +50,7 @@ public class Persist {
if(malformedRecords.exists()){
malformedRecords.delete();
}
aggregationStatus.setState(AggregationState.COMPLETED, now, true);
aggregationStatus.setAggregationState(AggregationState.COMPLETED, now, true);
}
public void recover() throws Exception{

View File

@ -55,11 +55,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
private static final DateFormat AGGREGATION_START_END_DATE_UTC_DATE_FORMAT;
private static final String UTC = "+0000";
public static final String FORCE_EARLY_AGGREGATION = "forceEarlyAggregation";
public static final String FORCE_RERUN = "forceRerun";
public static final String FORCE_RESTART = "forceRestart";
public enum ElaborationType {
AGGREGATE, // Aggregate
RECOVERY // Recover unterminated executions
}
/**
* Indicate which types of elaboration the plugin must perform
*/
@ -123,6 +127,10 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
Class<? extends UsageRecord> usageRecordClass = null;
boolean forceEarlyAggregation = false;
boolean forceRerun = false;
boolean forceRestart = false;
if (inputs == null || inputs.isEmpty()) {
throw new IllegalArgumentException("The can only be launched providing valid input parameters");
}
@ -162,6 +170,18 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
restartFromLastAggregationDate = (boolean) inputs.get(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER);
}
if(inputs.containsKey(FORCE_EARLY_AGGREGATION)) {
forceEarlyAggregation = (boolean) inputs.get(FORCE_EARLY_AGGREGATION);
}
if(inputs.containsKey(FORCE_RERUN)) {
forceRerun = (boolean) inputs.get(FORCE_RERUN);
}
if(inputs.containsKey(FORCE_RESTART)) {
forceRestart = (boolean) inputs.get(FORCE_RESTART);
}
if(restartFromLastAggregationDate==false && aggregationStartDate==null){
throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'.");
}
@ -171,8 +191,11 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
}
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
aggregatorManager.setForceRerun(forceRerun);
aggregatorManager.setForceRestart(forceRestart);
aggregatorManager.elaborate(persistStartTime, persistEndTime, usageRecordClass);
break;
case RECOVERY:

View File

@ -7,6 +7,10 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR)
*/
public enum AggregationState {
/**
* The Aggregation has been started
*/
RESTARTED,
/**
* The Aggregation has been started
*/

View File

@ -87,7 +87,7 @@ public class AggregationStatus {
return aggregationInfo;
}
public synchronized void setState(AggregationState aggregationState, Calendar startTime, boolean sync) throws Exception {
public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync) throws Exception {
Calendar endTime = Utility.getUTCCalendarInstance();
logger.info("Going to Set {} for {} to {}. StartTime {}, EndTime {} [Duration : {}]",

View File

@ -133,6 +133,36 @@ public class Utility {
return aggregationStartCalendar;
}
public static Date sanitizeDate(AggregationType aggregationType, Date date) {
Calendar calendarToSanitize = getUTCCalendarInstance();
calendarToSanitize.setTime(date);
switch(aggregationType) {
case DAILY:
break;
case MONTHLY:
calendarToSanitize.set(Calendar.DAY_OF_MONTH, 1);
break;
case YEARLY:
calendarToSanitize.set(Calendar.DAY_OF_MONTH, 1);
calendarToSanitize.set(Calendar.MONTH, Calendar.JANUARY);
break;
default:
break;
}
calendarToSanitize.set(Calendar.HOUR_OF_DAY, 0);
calendarToSanitize.set(Calendar.MINUTE, 0);
calendarToSanitize.set(Calendar.SECOND, 0);
calendarToSanitize.set(Calendar.MILLISECOND, 0);
return calendarToSanitize.getTime();
}
public static Date getEndDateFromStartDate(AggregationType aggregationType, Date aggregationStartDate, int offset) {
Calendar aggregationEndDate = getUTCCalendarInstance();

View File

@ -19,7 +19,7 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
@Test
public void aggregate() throws Exception {
setContext(ROOT);
//setContext(ROOT);
Map<String, Object> inputs = new HashMap<String, Object>();
@ -37,12 +37,16 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, false);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.NOVEMBER, 1);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, false);
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, false);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.SEPTEMBER, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.DECEMBER, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.SEPTEMBER, 30);
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate);
@ -54,6 +58,14 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
}
@Test
public void cycle() throws Exception {
for(int i=0; i<20; i++) {
aggregate();
logger.debug("---------------------------------------------\n\n");
}
}
// @Test
public void testRecovery() throws Exception {
setContext(ROOT);
@ -64,12 +76,12 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(8, 0));
inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(20, 30));
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2016, Calendar.JANUARY, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.SEPTEMBER, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2016, Calendar.FEBRUARY, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.SEPTEMBER, 30);
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate);

View File

@ -90,26 +90,26 @@ public class CouchBaseConnectorTest extends ScopedTest {
boolean sync = true;
Calendar startedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.STARTED, startedStart, sync);
aggregationStatus.setAggregationState(AggregationState.STARTED, startedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
aggregationStatus.setRecordNumbers(100, 72, 0);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Calendar aggregatedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.AGGREGATED, aggregatedStart, sync);
aggregationStatus.setAggregationState(AggregationState.AGGREGATED, aggregatedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Calendar addedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.ADDED, addedStart, sync);
aggregationStatus.setAggregationState(AggregationState.ADDED, addedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Calendar deletedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.DELETED, deletedStart, sync);
aggregationStatus.setAggregationState(AggregationState.DELETED, deletedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
Calendar completedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.COMPLETED, completedStart, sync);
aggregationStatus.setAggregationState(AggregationState.COMPLETED, completedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@ -141,7 +141,7 @@ public class CouchBaseConnectorTest extends ScopedTest {
boolean sync = true;
Calendar startedStart = Utility.getUTCCalendarInstance();
aggregationStatus.setState(AggregationState.STARTED, startedStart, sync);
aggregationStatus.setAggregationState(AggregationState.STARTED, startedStart, sync);
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}