accounting-lib/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java

218 lines
7.8 KiB
Java

package org.gcube.accounting.aggregation.scheduler;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.AggregationStrategy;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.exception.NotAggregatableRecordsExceptions;
import org.gcube.accounting.persistence.AccountingPersistenceExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class AggregationScheduler {
private static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
protected static AggregationScheduler aggregationScheduler;
static {
aggregationScheduler = new BufferAggregationScheduler();
}
public static AggregationScheduler getInstance(){
return aggregationScheduler;
}
protected int totalBufferedRecords;
@SuppressWarnings("rawtypes")
protected Map<String, List<AggregationStrategy>> records;
protected List<UsageRecord> unaggregableRecords;
@SuppressWarnings("rawtypes")
protected AggregationScheduler(){
this.records = new HashMap<String, List<AggregationStrategy>>();
unaggregableRecords = new ArrayList<UsageRecord>();
totalBufferedRecords = 0;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregatedUsageRecord> getAggregatedUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
Class<? extends AggregatedUsageRecord> clz;
String aggregatedURFullyQualifiedName;
try {
Package aggregatedPackage = org.gcube.accounting.aggregation.ServiceUsageRecord.class.getPackage();
aggregatedURFullyQualifiedName = String.format("%s.%s", aggregatedPackage.getName(), usageRecordName);
clz = (Class<? extends AggregatedUsageRecord>) Class.forName(aggregatedURFullyQualifiedName);
} catch (ClassNotFoundException e) {
logger.error("Unable To find the Aggregation Class for {}", usageRecordName);
throw e;
}
return clz;
}
@SuppressWarnings("rawtypes")
protected AggregatedUsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{
String usageRecordName = usageRecord.getClass().getSimpleName();
Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordName);
Class[] argTypes = { usageRecord.getClass() };
Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {usageRecord};
return constructor.newInstance(arguments);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregationStrategy> getAggregattionStrategyUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
Class<? extends AggregationStrategy> clz;
String aggregationStrategyName;
try {
Package aggregationStrategyPackage = org.gcube.accounting.aggregation.strategy.ServiceUsageRecordAggregationStrategy.class.getPackage();
aggregationStrategyName = String.format("%s.%s%s", aggregationStrategyPackage.getName(), usageRecordName, AggregationStrategy.class.getSimpleName());
clz = (Class<? extends AggregationStrategy>) Class.forName(aggregationStrategyName);
} catch (Exception e) {
logger.error("Unable To find the Aggregation Strategy Class for {}", usageRecordName);
throw e;
}
return clz;
}
@SuppressWarnings("rawtypes")
protected AggregationStrategy instantiateAggregationStrategy(AggregatedUsageRecord aggregatedUsageRecord) throws Exception{
String usageRecordName = aggregatedUsageRecord.getClass().getSimpleName();
Class<? extends AggregationStrategy> clz = getAggregattionStrategyUsageRecordClass(usageRecordName);
Class[] argTypes = { aggregatedUsageRecord.getClass() };
Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {aggregatedUsageRecord};
return constructor.newInstance(arguments);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(UsageRecord usageRecord){
String usageRecordName = usageRecord.getClass().getSimpleName();
List<AggregationStrategy> aggregationStrategies;
if(this.records.containsKey(usageRecordName)){
aggregationStrategies = this.records.get(usageRecordName);
boolean found = false;
for(AggregationStrategy aggregationStrategy : aggregationStrategies){
try {
aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation", aggregationStrategy);
}
}
if(found){
return;
}
}else{
aggregationStrategies = new ArrayList<AggregationStrategy>();
this.records.put(usageRecordName, aggregationStrategies);
}
AggregatedUsageRecord aggregatedUsageRecord;
if(usageRecord instanceof AggregatedUsageRecord){
// the record is already an aggregated version
aggregatedUsageRecord = (AggregatedUsageRecord) usageRecord;
}else{
try {
// Instantiate the aggregated usage record using reflection with the
// simple name of the new UsageRecord to aggregate.
// The instantiated UsageRecord is the aggregated version from
// org.gcube.accounting.datamodel.aggregation package
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
unaggregableRecords.add(usageRecord);
totalBufferedRecords++;
return;
}
}
try {
AggregationStrategy aggregationStrategy = instantiateAggregationStrategy(aggregatedUsageRecord);
aggregationStrategies.add(aggregationStrategy);
totalBufferedRecords++;
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Strategy Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
unaggregableRecords.add(usageRecord);
}
}
public void flush(AccountingPersistenceExecutor persistenceExecutor) throws Exception{
aggregate(null, persistenceExecutor, true);
}
protected abstract void specificClear();
protected void clear(){
totalBufferedRecords=0;
records.clear();
unaggregableRecords.clear();
specificClear();
}
protected synchronized void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
if(usageRecord!=null){
madeAggregation(usageRecord);
}
if(isTimeToPersist() || forceFlush){
UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords];
int i = 0;
@SuppressWarnings("rawtypes")
Collection<List<AggregationStrategy>> values = records.values();
for(@SuppressWarnings("rawtypes") List<AggregationStrategy> startegies : values){
for(@SuppressWarnings("rawtypes") AggregationStrategy startegy : startegies){
recordToPersist[i] = startegy.getAggregatedUsageRecord();
i++;
}
}
for(UsageRecord record : unaggregableRecords){
recordToPersist[i] = record;
i++;
}
persistenceExecutor.persist(recordToPersist);
clear();
}
}
/**
* Get an usage records and try to aggregate with other buffered
* Usage Record.
* @param usageRecord the Usage Record To Buffer
* @return true if is time to persist the buffered Usage Record
* @throws Exception if fails
*/
public void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor) throws Exception {
aggregate(usageRecord, persistenceExecutor, false);
}
protected abstract boolean isTimeToPersist();
}