accounting-lib/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java

219 lines
8.1 KiB
Java

package org.gcube.accounting.aggregation.scheduler;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.accounting.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.aggregation.strategy.ServiceUsageRecordAggregationStrategy;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.AggregationStrategy;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.exception.NotAggregatableRecordsExceptions;
import org.gcube.accounting.persistence.AccountingPersistenceExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class AggregationScheduler {
private static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static AggregationScheduler newInstance(){
return new BufferAggregationScheduler();
}
protected int totalBufferedRecords;
@SuppressWarnings("rawtypes")
protected Map<String, List<AggregationStrategy>> records;
protected List<UsageRecord> unaggregableRecords;
@SuppressWarnings("rawtypes")
protected AggregationScheduler(){
this.records = new HashMap<String, List<AggregationStrategy>>();
unaggregableRecords = new ArrayList<UsageRecord>();
totalBufferedRecords = 0;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregatedUsageRecord> getAggregatedUsageRecordClass(String usageRecordType) throws ClassNotFoundException {
Class<? extends AggregatedUsageRecord> clz;
String aggregatedURFullyQualifiedName;
try {
Package aggregatedPackage = AggregatedServiceUsageRecord.class.getPackage();
aggregatedURFullyQualifiedName = String.format("%s.Aggregated%s", aggregatedPackage.getName(), usageRecordType);
clz = (Class<? extends AggregatedUsageRecord>) Class.forName(aggregatedURFullyQualifiedName);
} catch (ClassNotFoundException e) {
logger.error("Unable To find the Aggregation Class for {}", usageRecordType);
throw e;
}
return clz;
}
@SuppressWarnings("rawtypes")
protected AggregatedUsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{
String usageRecordType = usageRecord.getUsageRecordType();
Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordType);
Class[] argTypes = { usageRecord.getClass() };
Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {usageRecord};
return constructor.newInstance(arguments);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregationStrategy> getAggregattionStrategyUsageRecordClass(String usageRecordType) throws ClassNotFoundException {
Class<? extends AggregationStrategy> clz;
String aggregationStrategyName;
try {
Package aggregationStrategyPackage = ServiceUsageRecordAggregationStrategy.class.getPackage();
aggregationStrategyName = String.format("%s.%s%s", aggregationStrategyPackage.getName(), usageRecordType, AggregationStrategy.class.getSimpleName());
clz = (Class<? extends AggregationStrategy>) Class.forName(aggregationStrategyName);
} catch (Exception e) {
logger.error("Unable To find the Aggregation Strategy Class for {}", usageRecordType);
throw e;
}
return clz;
}
@SuppressWarnings("rawtypes")
protected AggregationStrategy instantiateAggregationStrategy(AggregatedUsageRecord aggregatedUsageRecord) throws Exception{
String usageRecordType = aggregatedUsageRecord.getUsageRecordType();
Class<? extends AggregationStrategy> clz = getAggregattionStrategyUsageRecordClass(usageRecordType);
Class[] argTypes = { aggregatedUsageRecord.getClass() };
Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {aggregatedUsageRecord};
return constructor.newInstance(arguments);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(UsageRecord usageRecord){
String usageRecordType = usageRecord.getUsageRecordType();
List<AggregationStrategy> aggregationStrategies;
if(this.records.containsKey(usageRecordType)){
aggregationStrategies = this.records.get(usageRecordType);
boolean found = false;
for(AggregationStrategy aggregationStrategy : aggregationStrategies){
try {
aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
logger.trace("{} has been used for aggregation. Aggregated Record is {}", aggregationStrategy, aggregationStrategy.getAggregatedUsageRecord());
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation", aggregationStrategy);
}
}
if(found){
return;
}
}else{
aggregationStrategies = new ArrayList<AggregationStrategy>();
this.records.put(usageRecordType, aggregationStrategies);
}
AggregatedUsageRecord aggregatedUsageRecord;
if(usageRecord instanceof AggregatedUsageRecord){
// the record is already an aggregated version
aggregatedUsageRecord = (AggregatedUsageRecord) usageRecord;
}else{
try {
// Instantiate the aggregated usage record using reflection with the
// simple name of the new UsageRecord to aggregate.
// The instantiated UsageRecord is the aggregated version from
// org.gcube.accounting.datamodel.aggregation package
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordType);
unaggregableRecords.add(usageRecord);
totalBufferedRecords++;
return;
}
}
try {
AggregationStrategy aggregationStrategy = instantiateAggregationStrategy(aggregatedUsageRecord);
aggregationStrategies.add(aggregationStrategy);
totalBufferedRecords++;
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Strategy Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordType);
unaggregableRecords.add(usageRecord);
}
}
public void flush(AccountingPersistenceExecutor persistenceExecutor) throws Exception{
aggregate(null, persistenceExecutor, true);
}
protected abstract void specificClear();
protected void clear(){
totalBufferedRecords=0;
records.clear();
unaggregableRecords.clear();
specificClear();
}
protected synchronized void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
if(usageRecord!=null){
logger.trace("Trying to aggregate {}", usageRecord);
madeAggregation(usageRecord);
}
if(isTimeToPersist() || forceFlush){
UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords];
int i = 0;
@SuppressWarnings("rawtypes")
Collection<List<AggregationStrategy>> values = records.values();
for(@SuppressWarnings("rawtypes") List<AggregationStrategy> startegies : values){
for(@SuppressWarnings("rawtypes") AggregationStrategy startegy : startegies){
recordToPersist[i] = startegy.getAggregatedUsageRecord();
i++;
}
}
for(UsageRecord record : unaggregableRecords){
recordToPersist[i] = record;
i++;
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
}
}
/**
* Get an usage records and try to aggregate with other buffered
* Usage Record.
* @param usageRecord the Usage Record To Buffer
* @return true if is time to persist the buffered Usage Record
* @throws Exception if fails
*/
public void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor) throws Exception {
logger.trace("Going to aggregate {}", usageRecord);
aggregate(usageRecord, persistenceExecutor, false);
}
protected abstract boolean isTimeToPersist();
}