2017-09-05 17:13:50 +02:00
|
|
|
package org.gcube.accounting.aggregator.elaboration;
|
|
|
|
|
|
|
|
import java.util.Date;
|
|
|
|
|
|
|
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
|
|
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
|
|
|
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
|
|
|
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
|
|
|
import org.gcube.accounting.aggregator.utility.Constant;
|
|
|
|
import org.gcube.accounting.aggregator.utility.Utility;
|
|
|
|
import org.gcube.accounting.datamodel.UsageRecord;
|
|
|
|
import org.gcube.documentstore.records.DSMapper;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
public class AggregatorManager {
|
|
|
|
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(AggregatorManager.class);
|
|
|
|
|
|
|
|
public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager";
|
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
protected final AggregationType aggregationType;
|
|
|
|
|
2017-09-05 17:13:50 +02:00
|
|
|
protected Date aggregationStartDate;
|
2017-09-22 14:30:48 +02:00
|
|
|
protected Date aggregationEndDate;
|
2017-09-05 17:13:50 +02:00
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
protected final boolean restartFromLastAggregationDate;
|
2017-09-05 17:13:50 +02:00
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
|
|
|
|
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
|
2017-09-05 17:13:50 +02:00
|
|
|
this.aggregationType = aggregationType;
|
2017-09-22 14:30:48 +02:00
|
|
|
this.aggregationStartDate = aggregationStartDate;
|
|
|
|
this.aggregationEndDate = aggregationEndDate;
|
2017-09-05 17:13:50 +02:00
|
|
|
this.restartFromLastAggregationDate = restartFromLastAggregationDate;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected Date getEndDateFromStartDate() {
|
|
|
|
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
protected AggregationStatus createAggregationStatus(String recordType) throws Exception {
|
|
|
|
Date aggregationEndDate = getEndDateFromStartDate();
|
|
|
|
AggregationInfo aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate,
|
|
|
|
aggregationEndDate);
|
|
|
|
AggregationStatus aggregationStatus = new AggregationStatus(aggregationInfo);
|
|
|
|
return aggregationStatus;
|
|
|
|
}
|
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
public void elaborate(Date persistStartTime, Date persistEndTime, Class<? extends UsageRecord> usageRecordClass)
|
|
|
|
throws Exception {
|
2017-09-05 17:13:50 +02:00
|
|
|
|
|
|
|
CouchBaseConnector couchBaseConnector = CouchBaseConnector.getInstance();
|
|
|
|
|
|
|
|
for (String recordType : couchBaseConnector.getRecordTypes()) {
|
2017-09-22 14:30:48 +02:00
|
|
|
|
2017-09-05 17:13:50 +02:00
|
|
|
if (usageRecordClass != null && usageRecordClass.newInstance().getRecordType().compareTo(recordType) != 0) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (recordType.compareTo(ACCOUNTING_MANAGER_BUCKET_NAME) == 0) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
AggregationStatus aggregationStatus = null;
|
|
|
|
if (restartFromLastAggregationDate) {
|
2017-09-22 14:30:48 +02:00
|
|
|
AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType,
|
|
|
|
aggregationStartDate, aggregationEndDate);
|
|
|
|
|
2017-09-05 17:13:50 +02:00
|
|
|
// I don't check if this aggregation is COMPLETED because this
|
|
|
|
// is responsibility of Recovery Process
|
2017-09-22 14:30:48 +02:00
|
|
|
if (lastAggregationStatus != null) {
|
2017-09-05 17:13:50 +02:00
|
|
|
this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate();
|
|
|
|
logger.info("Last got AggregationStatus is {}. Restarting from {}",
|
|
|
|
DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus),
|
|
|
|
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate));
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType,
|
|
|
|
aggregationStartDate);
|
|
|
|
|
2017-09-05 17:13:50 +02:00
|
|
|
if (aggregationStatus == null) {
|
|
|
|
aggregationStatus = createAggregationStatus(recordType);
|
|
|
|
}
|
|
|
|
|
2017-09-22 14:30:48 +02:00
|
|
|
if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
|
|
|
|
logger.info("StartDate {} is after last Aggregation End Date allowed {}. Nothing to do.",
|
|
|
|
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
|
|
|
|
Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2017-09-05 17:13:50 +02:00
|
|
|
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
|
|
|
|
elaborator.elaborate();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|