refs #200: Create accouting-lib library

https://support.d4science.org/issues/200
Implementing buffer strategy

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115717 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-06-30 14:44:30 +00:00
parent e392879728
commit b5d001383b
2 changed files with 86 additions and 42 deletions

View File

@ -1,12 +1,17 @@
package org.gcube.accounting.datamodel.aggregation.scheduler; package org.gcube.accounting.datamodel.aggregation.scheduler;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.AggregationStrategy; import org.gcube.accounting.datamodel.AggregationStrategy;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.exception.NotAggregatableRecordsExceptions;
import org.gcube.accounting.persistence.PersistenceExecutor; import org.gcube.accounting.persistence.PersistenceExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,10 +33,17 @@ public abstract class AggregationScheduler {
return aggregationScheduler; return aggregationScheduler;
} }
protected Map<String, Map<String, UsageRecord>> records; protected int totalBufferedRecords;
@SuppressWarnings("rawtypes")
protected Map<String, List<AggregationStrategy>> records;
protected List<UsageRecord> unaggregableRecords;
@SuppressWarnings("rawtypes")
protected AggregationScheduler(){ protected AggregationScheduler(){
this.records = new HashMap<String, Map<String, UsageRecord>>(); this.records = new HashMap<String, List<AggregationStrategy>>();
totalBufferedRecords = 0;
} }
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
@ -49,18 +61,14 @@ public abstract class AggregationScheduler {
return clz; return clz;
} }
protected UsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{ @SuppressWarnings("rawtypes")
protected AggregatedUsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{
String usageRecordName = usageRecord.getClass().getSimpleName(); String usageRecordName = usageRecord.getClass().getSimpleName();
@SuppressWarnings("rawtypes")
Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordName); Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordName);
@SuppressWarnings("rawtypes")
Class[] argTypes = { usageRecord.getClass() }; Class[] argTypes = { usageRecord.getClass() };
@SuppressWarnings("rawtypes")
Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes); Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {usageRecord}; Object[] arguments = {usageRecord};
@SuppressWarnings("rawtypes") return constructor.newInstance(arguments);
AggregatedUsageRecord aggregatedUsageRecord = constructor.newInstance(arguments);
return aggregatedUsageRecord;
} }
@ -86,45 +94,62 @@ public abstract class AggregationScheduler {
Class[] argTypes = { aggregatedUsageRecord.getClass() }; Class[] argTypes = { aggregatedUsageRecord.getClass() };
Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes); Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {aggregatedUsageRecord}; Object[] arguments = {aggregatedUsageRecord};
AggregationStrategy aggregationStrategy = constructor.newInstance(arguments); return constructor.newInstance(arguments);
return aggregationStrategy;
} }
// TODO @SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(UsageRecord usageRecord){ protected void madeAggregation(UsageRecord usageRecord){
String usageRecordName = usageRecord.getClass().getSimpleName(); String usageRecordName = usageRecord.getClass().getSimpleName();
UsageRecord aggregatedUsageRecord;
Map<String, UsageRecord> perUsageRecordNameMap; List<AggregationStrategy> aggregationStrategies;
if(this.records.containsKey(usageRecordName)){ if(this.records.containsKey(usageRecordName)){
perUsageRecordNameMap = this.records.get(usageRecordName); aggregationStrategies = this.records.get(usageRecordName);
boolean found = false;
// TODO for(AggregationStrategy aggregationStrategy : aggregationStrategies){
}else{
perUsageRecordNameMap = new HashMap<String, UsageRecord>();
this.records.put(usageRecordName, perUsageRecordNameMap);
if(usageRecord instanceof AggregatedUsageRecord){
// the record is already an aggregated version
aggregatedUsageRecord = usageRecord;
}else{
try { try {
// Instantiate the aggregated usage record using reflection with the aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
// simple name of the new UsageRecord to aggregate. } catch(NotAggregatableRecordsExceptions e) {
// The instantiated UsageRecord is the aggregated version from logger.trace("{} is not usable for aggregation", aggregationStrategy);
// org.gcube.accounting.datamodel.aggregation package }
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
aggregatedUsageRecord = usageRecord;
}
} }
if(found){
return;
perUsageRecordNameMap.put(usageRecordName, aggregatedUsageRecord); }
}else{
aggregationStrategies = new ArrayList<AggregationStrategy>();
this.records.put(usageRecordName, aggregationStrategies);
}
AggregatedUsageRecord aggregatedUsageRecord;
if(usageRecord instanceof AggregatedUsageRecord){
// the record is already an aggregated version
aggregatedUsageRecord = (AggregatedUsageRecord) usageRecord;
}else{
try {
// Instantiate the aggregated usage record using reflection with the
// simple name of the new UsageRecord to aggregate.
// The instantiated UsageRecord is the aggregated version from
// org.gcube.accounting.datamodel.aggregation package
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
unaggregableRecords.add(usageRecord);
totalBufferedRecords++;
return;
}
}
try {
AggregationStrategy aggregationStrategy = instantiateAggregationStrategy(aggregatedUsageRecord);
aggregationStrategies.add(aggregationStrategy);
totalBufferedRecords++;
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Strategy Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
unaggregableRecords.add(usageRecord);
} }
} }
@ -136,10 +161,29 @@ public abstract class AggregationScheduler {
* @return true if is time to persist the buffered Usage Record * @return true if is time to persist the buffered Usage Record
* @throws Exception if fails * @throws Exception if fails
*/ */
public synchronized void aggregate(UsageRecord UsageRecord, PersistenceExecutor persistenceExecutor) throws Exception { public synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
madeAggregation(UsageRecord); madeAggregation(usageRecord);
if(isTimeToPersist()){ if(isTimeToPersist()){
persistenceExecutor.persist(records.values().toArray(new UsageRecord[records.size()])); UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords];
int i = 0;
@SuppressWarnings("rawtypes")
Collection<List<AggregationStrategy>> values = records.values();
for(@SuppressWarnings("rawtypes") List<AggregationStrategy> startegies : values){
for(@SuppressWarnings("rawtypes") AggregationStrategy startegy : startegies){
recordToPersist[i] = startegy.getAggregatedUsageRecord();
i++;
}
}
for(UsageRecord record : unaggregableRecords){
recordToPersist[i] = record;
i++;
}
persistenceExecutor.persist(recordToPersist);
totalBufferedRecords=0;
records.clear();
unaggregableRecords.clear();
} }
} }

View File

@ -48,7 +48,7 @@ public class BufferAggregationScheduler extends AggregationScheduler {
firstBufferedTime = now; firstBufferedTime = now;
} }
if(records.size() >= MAX_RECORDS_NUMBER){ if(totalBufferedRecords >= MAX_RECORDS_NUMBER){
return true; return true;
} }