210 lines
8.6 KiB
Java
210 lines
8.6 KiB
Java
package org.gcube.accounting.aggregator.elaboration;
|
|
|
|
import java.io.File;
|
|
import java.text.DateFormat;
|
|
import java.util.Calendar;
|
|
import java.util.Date;
|
|
|
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
|
import org.gcube.accounting.aggregator.aggregation.Aggregator;
|
|
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
|
|
import org.gcube.accounting.aggregator.persist.Persist;
|
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
|
import org.gcube.accounting.aggregator.utility.Constant;
|
|
import org.gcube.accounting.aggregator.utility.Utility;
|
|
import org.gcube.documentstore.records.DSMapper;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
/**
|
|
* @author Luca Frosini (ISTI - CNR)
|
|
*/
|
|
public class Elaborator {
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(Elaborator.class);
|
|
|
|
public final static String ORIGINAL_SUFFIX = ".original.json";
|
|
public final static String AGGREGATED_SUFFIX = ".aggregated.json";
|
|
|
|
protected AggregationStatus aggregationStatus;
|
|
// protected final Date persistStartTime;
|
|
// protected final Date persistEndTime;
|
|
|
|
public Elaborator(AggregationStatus aggregationStatus) throws Exception {
|
|
this.aggregationStatus = aggregationStatus;
|
|
}
|
|
|
|
public boolean isAggregationAllowed(){
|
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
|
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
|
AggregationType aggregationType = aggregationInfo.getAggregationType();
|
|
|
|
boolean allowed = false;
|
|
|
|
Calendar calendar = Utility.getUTCCalendarInstance();
|
|
switch (aggregationType) {
|
|
case DAILY:
|
|
break;
|
|
|
|
case MONTHLY:
|
|
calendar.set(Calendar.DAY_OF_MONTH, 1);
|
|
break;
|
|
|
|
case YEARLY:
|
|
calendar.set(Calendar.DAY_OF_MONTH, 1);
|
|
calendar.set(Calendar.MONTH, Calendar.JANUARY);
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
|
calendar.set(Calendar.MINUTE, 0);
|
|
calendar.set(Calendar.SECOND, 0);
|
|
calendar.set(Calendar.MILLISECOND, 0);
|
|
|
|
calendar.add(aggregationType.getCalendarField(), -aggregationType.getNotAggregableBefore());
|
|
|
|
logger.trace("Checking if {} is before {}",
|
|
aggregationType.getDateFormat().format(aggregationStartDate),
|
|
aggregationType.getDateFormat().format(calendar.getTime()));
|
|
|
|
if(aggregationStartDate.before(calendar.getTime())){
|
|
allowed = true;
|
|
}
|
|
|
|
return allowed;
|
|
}
|
|
|
|
public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart, boolean skipAggregation) throws Exception {
|
|
Calendar startTime = Utility.getUTCCalendarInstance();
|
|
|
|
final AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
|
final Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
|
final AggregationType aggregationType = aggregationInfo.getAggregationType();
|
|
|
|
if(!isAggregationAllowed()){
|
|
if(!forceEarlyAggregation) {
|
|
logger.info("Too early to start aggregation {}. {} Aggregation is not allowed for the last {} {}",
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
|
|
aggregationType,
|
|
aggregationType.getNotAggregableBefore(),
|
|
aggregationType.name().toLowerCase().replace("ly", "s").replaceAll("dais", "days"));
|
|
return;
|
|
}else {
|
|
logger.info("The aggregation has been forced even is too early to start it {}. {} aggregation should not be made for the last {} {}",
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
|
|
aggregationType,
|
|
aggregationType.getNotAggregableBefore(),
|
|
aggregationType.name().toLowerCase().replace("ly", "s").replace("dais", "days"));
|
|
}
|
|
}
|
|
|
|
if(aggregationStatus.getAggregationState()==null){
|
|
aggregationStatus.setAggregationState(AggregationState.STARTED, startTime, true);
|
|
}else{
|
|
if(aggregationStatus.getAggregationState()==AggregationState.COMPLETED){
|
|
if(!forceRerun) {
|
|
logger.info("{} is {}. Nothing to do :-). \n Details {}",
|
|
AggregationStatus.class.getSimpleName(),
|
|
aggregationStatus.getAggregationState(),
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
|
return;
|
|
}else {
|
|
logger.info("Last {} is {} and the aggreation should not be needed but it has been forced. Details {}.",
|
|
AggregationStatus.class.getSimpleName(),
|
|
aggregationStatus.getAggregationState(),
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
|
|
|
|
|
|
|
aggregationStatus = new AggregationStatus(aggregationStatus);
|
|
|
|
|
|
aggregationStatus.setAggregationState(AggregationState.RESTARTED, startTime, false);
|
|
aggregationStatus.setAggregationState(AggregationState.STARTED, startTime, true);
|
|
|
|
/*
|
|
* Last update time has been just modified restart must be forced otherwise the the aggregation
|
|
* cannot continue
|
|
*/
|
|
forceRestart = true;
|
|
}
|
|
}
|
|
|
|
Calendar now = Utility.getUTCCalendarInstance();
|
|
now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
|
|
|
|
if(aggregationStatus.getLastUpdateTime().after(now)){
|
|
if(!forceRestart) {
|
|
logger.info("Cannot elaborate {} because has been modified in the last {} ",
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
|
|
Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED==Calendar.HOUR_OF_DAY? "hours" : "unit");
|
|
return;
|
|
}else {
|
|
logger.info("Normally {} cannot be elaborated because has been modified in the last {} but the restart has been forced",
|
|
DSMapper.getObjectMapper().writeValueAsString(aggregationStatus),
|
|
Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED==Calendar.HOUR_OF_DAY? "hours" : "unit");
|
|
}
|
|
}
|
|
|
|
aggregationStatus.updateLastUpdateTime(true);
|
|
|
|
}
|
|
|
|
String recordType = aggregationInfo.getRecordType();
|
|
|
|
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
|
|
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
|
|
|
|
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
|
|
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
|
|
|
|
Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile);
|
|
aggregator.setSkipAggregation(skipAggregation);
|
|
aggregator.aggregate();
|
|
|
|
Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
|
|
persist.recover();
|
|
|
|
|
|
// Calendar now = Utility.getUTCCalendarInstance();
|
|
// /*
|
|
// * now is passed as argument to isTimeElapsed function to avoid situation
|
|
// * (even rare) where both check are valid because the first invocation happen
|
|
// * before midnight and the second after midnight (so in the next day).
|
|
// */
|
|
// if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
|
|
// Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
|
|
// persist.recover();
|
|
// }else{
|
|
// logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));
|
|
// }
|
|
|
|
}
|
|
|
|
protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception {
|
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
|
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
|
AggregationType aggregationType = aggregationInfo.getAggregationType();
|
|
|
|
DateFormat dateFormat = aggregationType.getDateFormat();
|
|
String dateString = dateFormat.format(aggregationStartDate);
|
|
String[] splittedDate = dateString.split(AggregationType.DATE_SEPARATOR);
|
|
|
|
String backupFileName = splittedDate[splittedDate.length-1] + "-" +name;
|
|
File originalRecordsbackupFile = new File(elaborationDirectory, backupFileName + ORIGINAL_SUFFIX);
|
|
return originalRecordsbackupFile;
|
|
}
|
|
|
|
protected File getAggregatedRecordsBackupFile(File originalRecordsbackupFile) throws Exception {
|
|
File aggregateRecordsBackupFile = new File(originalRecordsbackupFile.getParentFile(),
|
|
originalRecordsbackupFile.getName().replace(ORIGINAL_SUFFIX, AGGREGATED_SUFFIX));
|
|
return aggregateRecordsBackupFile;
|
|
}
|
|
|
|
}
|