refs #200: Create accouting-lib library

https://support.d4science.org/issues/200
Implementing buffer strategy

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115674 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-06-30 12:35:40 +00:00
parent fd5bce26c2
commit e392879728
2 changed files with 105 additions and 14 deletions

View File

@ -8,7 +8,7 @@ import org.gcube.accounting.exception.InvalidValueException;
/** /**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/ */
public interface AggregatedUsageRecord<T extends AggregatedUsageRecord<T, B>, B extends SingleUsageRecord> { public interface AggregatedUsageRecord<T extends AggregatedUsageRecord<T, B>, B extends SingleUsageRecord> extends UsageRecord {
public T getAggregatedUsageRecord(B usageRecord) throws InvalidValueException ; public T getAggregatedUsageRecord(B usageRecord) throws InvalidValueException ;

View File

@ -1,10 +1,15 @@
package org.gcube.accounting.datamodel.aggregation.scheduler; package org.gcube.accounting.datamodel.aggregation.scheduler;
import java.util.ArrayList; import java.lang.reflect.Constructor;
import java.util.List; import java.util.HashMap;
import java.util.Map;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.AggregationStrategy;
import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.persistence.PersistenceExecutor; import org.gcube.accounting.persistence.PersistenceExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -12,6 +17,8 @@ import org.gcube.accounting.persistence.PersistenceExecutor;
*/ */
public abstract class AggregationScheduler { public abstract class AggregationScheduler {
private static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
protected static AggregationScheduler aggregationScheduler; protected static AggregationScheduler aggregationScheduler;
public static AggregationScheduler getInstance(){ public static AggregationScheduler getInstance(){
@ -21,20 +28,104 @@ public abstract class AggregationScheduler {
return aggregationScheduler; return aggregationScheduler;
} }
protected List<UsageRecord> records; protected Map<String, Map<String, UsageRecord>> records;
/**
* @return the records
*/
public List<UsageRecord> getRecords() {
return records;
}
protected AggregationScheduler(){ protected AggregationScheduler(){
this.records = new ArrayList<UsageRecord>(); this.records = new HashMap<String, Map<String, UsageRecord>>();
} }
protected void madeAggregation(UsageRecord UsageRecord){ @SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregatedUsageRecord> getAggregatedUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
Class<? extends AggregatedUsageRecord> clz = null;
String aggregatedURFullyQualifiedName = null;
try {
Package aggregatedPackage = org.gcube.accounting.datamodel.aggregation.ServiceUsageRecord.class.getPackage();
aggregatedURFullyQualifiedName = String.format("%s%s", aggregatedPackage.getName(), usageRecordName);
clz = (Class<? extends AggregatedUsageRecord>) Class.forName(aggregatedURFullyQualifiedName);
} catch (ClassNotFoundException e) {
logger.error("Unable To find the Aggregation Class for {}", usageRecordName);
throw e;
}
return clz;
}
protected UsageRecord instantiateAggregatedUsageRecord(UsageRecord usageRecord) throws Exception{
String usageRecordName = usageRecord.getClass().getSimpleName();
@SuppressWarnings("rawtypes")
Class<? extends AggregatedUsageRecord> clz = getAggregatedUsageRecordClass(usageRecordName);
@SuppressWarnings("rawtypes")
Class[] argTypes = { usageRecord.getClass() };
@SuppressWarnings("rawtypes")
Constructor<? extends AggregatedUsageRecord> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {usageRecord};
@SuppressWarnings("rawtypes")
AggregatedUsageRecord aggregatedUsageRecord = constructor.newInstance(arguments);
return aggregatedUsageRecord;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Class<? extends AggregationStrategy> getAggregattionStrategyUsageRecordClass(String usageRecordName) throws ClassNotFoundException {
Class<? extends AggregationStrategy> clz = null;
String aggregationStrategyName = null;
try {
Package aggregationStrategyPackage = org.gcube.accounting.datamodel.aggregation.aggregationstrategy.ServiceUsageRecordAggregationStrategy.class.getPackage();
aggregationStrategyName = String.format("%s%s", aggregationStrategyPackage.getName(), usageRecordName);
clz = (Class<? extends AggregationStrategy>) Class.forName(aggregationStrategyName);
} catch (Exception e) {
logger.error("Unable To find the Aggregation Strategy Class for {}", usageRecordName);
throw e;
}
return clz;
}
@SuppressWarnings("rawtypes")
protected AggregationStrategy instantiateAggregationStrategy(AggregatedUsageRecord aggregatedUsageRecord) throws Exception{
String usageRecordName = aggregatedUsageRecord.getClass().getSimpleName();
Class<? extends AggregationStrategy> clz = getAggregattionStrategyUsageRecordClass(usageRecordName);
Class[] argTypes = { aggregatedUsageRecord.getClass() };
Constructor<? extends AggregationStrategy> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {aggregatedUsageRecord};
AggregationStrategy aggregationStrategy = constructor.newInstance(arguments);
return aggregationStrategy;
}
// TODO
protected void madeAggregation(UsageRecord usageRecord){
String usageRecordName = usageRecord.getClass().getSimpleName();
UsageRecord aggregatedUsageRecord;
Map<String, UsageRecord> perUsageRecordNameMap;
if(this.records.containsKey(usageRecordName)){
perUsageRecordNameMap = this.records.get(usageRecordName);
// TODO
}else{
perUsageRecordNameMap = new HashMap<String, UsageRecord>();
this.records.put(usageRecordName, perUsageRecordNameMap);
if(usageRecord instanceof AggregatedUsageRecord){
// the record is already an aggregated version
aggregatedUsageRecord = usageRecord;
}else{
try {
// Instantiate the aggregated usage record using reflection with the
// simple name of the new UsageRecord to aggregate.
// The instantiated UsageRecord is the aggregated version from
// org.gcube.accounting.datamodel.aggregation package
aggregatedUsageRecord = instantiateAggregatedUsageRecord(usageRecord);
} catch (Exception e) {
logger.error("Unable to Istantiate the Aggregation Class for {}. The Record will be persisted as is (Better than nothing).", usageRecordName);
aggregatedUsageRecord = usageRecord;
}
}
perUsageRecordNameMap.put(usageRecordName, aggregatedUsageRecord);
}
} }
@ -48,7 +139,7 @@ public abstract class AggregationScheduler {
public synchronized void aggregate(UsageRecord UsageRecord, PersistenceExecutor persistenceExecutor) throws Exception { public synchronized void aggregate(UsageRecord UsageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
madeAggregation(UsageRecord); madeAggregation(UsageRecord);
if(isTimeToPersist()){ if(isTimeToPersist()){
persistenceExecutor.persist(records.toArray(new UsageRecord[records.size()])); persistenceExecutor.persist(records.values().toArray(new UsageRecord[records.size()]));
} }
} }