209 lines
7.6 KiB
Java
209 lines
7.6 KiB
Java
package org.gcube.accounting.aggregation.scheduler;
|
|
|
|
import java.lang.reflect.Constructor;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
|
|
import org.gcube.accounting.datamodel.AggregationStrategy;
|
|
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
|
import org.gcube.accounting.datamodel.UsageRecord;
|
|
import org.gcube.accounting.exception.NotAggregatableRecordsExceptions;
|
|
import org.gcube.accounting.persistence.PersistenceExecutor;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
/**
|
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
|
*
|
|
*/
|
|
public abstract class AggregationScheduler {
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
|
|
|
|
protected static AggregationScheduler aggregationScheduler;
|
|
|
|
static {
|
|
aggregationScheduler = new BufferAggregationScheduler();
|
|
}
|
|
|
|
public static AggregationScheduler getInstance(){
|
|
return aggregationScheduler;
|
|
}
|
|
|
|
protected int totalBufferedRecords;
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
protected Map<String, List<AggregationStrategy>> records;
|
|
|
|
protected List<UsageRecord> unaggregableRecords;
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
protected AggregationScheduler(){
|
|
this.records = new HashMap<String, List<AggregationStrategy>>();
|
|
unaggregableRecords = new ArrayList<UsageRecord>();
|
|
totalBufferedRecords = 0;
|
|
}
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
protected Class<? extends AggregatedUsageRecord> getAggregatedUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
|
|
Class<? extends AggregatedUsageRecord> clz;
|
|
String aggregatedURFullyQualifiedName;
|
|
try {
|
|
Package aggregatedPackage = org.gcube.accounting.aggregation.ServiceUsageRecord.class.getPackage();
|
|
aggregatedURFullyQualifiedName = String.format("%s.%s", aggregatedPackage.getName(), usageRecordName);
|
|
clz = (Class<? extends AggregatedUsageRecord>) Class.forName(aggregatedURFullyQualifiedName);
|
|
} catch (ClassNotFoundException e) {
|
|
logger.error("Unable To find the Aggregation Class for {}", usageRecordName);
|
|
throw e;
|
|
}
|
|
return clz;
|
|
}
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
protected AggregatedUsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{
|
|
String usageRecordName = usageRecord.getClass().getSimpleName();
|
|
Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordName);
|
|
Class[] argTypes = { usageRecord.getClass() };
|
|
Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes);
|
|
Object[] arguments = {usageRecord};
|
|
return constructor.newInstance(arguments);
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
protected Class<? extends AggregationStrategy> getAggregattionStrategyUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
|
|
Class<? extends AggregationStrategy> clz;
|
|
String aggregationStrategyName;
|
|
try {
|
|
Package aggregationStrategyPackage = org.gcube.accounting.aggregation.strategy.ServiceUsageRecordAggregationStrategy.class.getPackage();
|
|
aggregationStrategyName = String.format("%s.%s%s", aggregationStrategyPackage.getName(), usageRecordName, AggregationStrategy.class.getSimpleName());
|
|
clz = (Class<? extends AggregationStrategy>) Class.forName(aggregationStrategyName);
|
|
} catch (Exception e) {
|
|
logger.error("Unable To find the Aggregation Strategy Class for {}", usageRecordName);
|
|
throw e;
|
|
}
|
|
return clz;
|
|
}
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
protected AggregationStrategy instantiateAggregationStrategy(AggregatedUsageRecord aggregatedUsageRecord) throws Exception{
|
|
String usageRecordName = aggregatedUsageRecord.getClass().getSimpleName();
|
|
Class<? extends AggregationStrategy> clz = getAggregattionStrategyUsageRecordClass(usageRecordName);
|
|
Class[] argTypes = { aggregatedUsageRecord.getClass() };
|
|
Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes);
|
|
Object[] arguments = {aggregatedUsageRecord};
|
|
return constructor.newInstance(arguments);
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
protected void madeAggregation(UsageRecord usageRecord){
|
|
String usageRecordName = usageRecord.getClass().getSimpleName();
|
|
|
|
List<AggregationStrategy> aggregationStrategies;
|
|
|
|
if(this.records.containsKey(usageRecordName)){
|
|
aggregationStrategies = this.records.get(usageRecordName);
|
|
boolean found = false;
|
|
for(AggregationStrategy aggregationStrategy : aggregationStrategies){
|
|
try {
|
|
aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
|
|
found = true;
|
|
break;
|
|
} catch(NotAggregatableRecordsExceptions e) {
|
|
logger.trace("{} is not usable for aggregation", aggregationStrategy);
|
|
}
|
|
}
|
|
if(found){
|
|
return;
|
|
}
|
|
}else{
|
|
aggregationStrategies = new ArrayList<AggregationStrategy>();
|
|
this.records.put(usageRecordName, aggregationStrategies);
|
|
}
|
|
|
|
|
|
AggregatedUsageRecord aggregatedUsageRecord;
|
|
if(usageRecord instanceof AggregatedUsageRecord){
|
|
// the record is already an aggregated version
|
|
aggregatedUsageRecord = (AggregatedUsageRecord) usageRecord;
|
|
}else{
|
|
try {
|
|
// Instantiate the aggregated usage record using reflection with the
|
|
// simple name of the new UsageRecord to aggregate.
|
|
// The instantiated UsageRecord is the aggregated version from
|
|
// org.gcube.accounting.datamodel.aggregation package
|
|
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
|
|
} catch (Exception e) {
|
|
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
|
|
unaggregableRecords.add(usageRecord);
|
|
totalBufferedRecords++;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
AggregationStrategy aggregationStrategy = instantiateAggregationStrategy(aggregatedUsageRecord);
|
|
aggregationStrategies.add(aggregationStrategy);
|
|
totalBufferedRecords++;
|
|
} catch (Exception e) {
|
|
logger.error("Unable to Istantiate the Aggregation Strategy Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
|
|
unaggregableRecords.add(usageRecord);
|
|
}
|
|
|
|
}
|
|
|
|
public void flush(PersistenceExecutor persistenceExecutor) throws Exception{
|
|
aggregate(null, persistenceExecutor, true);
|
|
}
|
|
|
|
protected synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
|
|
if(usageRecord!=null){
|
|
madeAggregation(usageRecord);
|
|
}
|
|
|
|
if(isTimeToPersist() || forceFlush){
|
|
UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords];
|
|
int i = 0;
|
|
@SuppressWarnings("rawtypes")
|
|
Collection<List<AggregationStrategy>> values = records.values();
|
|
for(@SuppressWarnings("rawtypes") List<AggregationStrategy> startegies : values){
|
|
for(@SuppressWarnings("rawtypes") AggregationStrategy startegy : startegies){
|
|
recordToPersist[i] = startegy.getAggregatedUsageRecord();
|
|
i++;
|
|
}
|
|
}
|
|
for(UsageRecord record : unaggregableRecords){
|
|
recordToPersist[i] = record;
|
|
i++;
|
|
}
|
|
|
|
persistenceExecutor.persist(recordToPersist);
|
|
|
|
totalBufferedRecords=0;
|
|
records.clear();
|
|
unaggregableRecords.clear();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get an usage records and try to aggregate with other buffered
|
|
* Usage Record.
|
|
* @param usageRecord the Usage Record To Buffer
|
|
* @return true if is time to persist the buffered Usage Record
|
|
* @throws Exception if fails
|
|
*/
|
|
public void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
|
|
aggregate(usageRecord, persistenceExecutor, false);
|
|
}
|
|
|
|
|
|
protected abstract boolean isTimeToPersist();
|
|
|
|
}
|