refs #200: Create accouting-lib library
https://support.d4science.org/issues/200 Implementing buffer strategy git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115720 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
bbf40324f8
commit
9318f46c28
|
@ -111,6 +111,8 @@ public abstract class AggregationScheduler {
|
|||
for(AggregationStrategy aggregationStrategy : aggregationStrategies){
|
||||
try {
|
||||
aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
|
||||
found = true;
|
||||
break;
|
||||
} catch(NotAggregatableRecordsExceptions e) {
|
||||
logger.trace("{} is not usable for aggregation", aggregationStrategy);
|
||||
}
|
||||
|
@ -155,16 +157,16 @@ public abstract class AggregationScheduler {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an usage records and try to aggregate with other buffered
|
||||
* Usage Record.
|
||||
* @param usageRecord the Usage Record To Buffer
|
||||
* @return true if is time to persist the buffered Usage Record
|
||||
* @throws Exception if fails
|
||||
*/
|
||||
public synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
|
||||
public void flush(PersistenceExecutor persistenceExecutor) throws Exception{
|
||||
aggregate(null, persistenceExecutor, true);
|
||||
}
|
||||
|
||||
protected synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
|
||||
if(usageRecord!=null){
|
||||
madeAggregation(usageRecord);
|
||||
if(isTimeToPersist()){
|
||||
}
|
||||
|
||||
if(isTimeToPersist() || forceFlush){
|
||||
UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords];
|
||||
int i = 0;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
@ -186,7 +188,17 @@ public abstract class AggregationScheduler {
|
|||
records.clear();
|
||||
unaggregableRecords.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an usage records and try to aggregate with other buffered
|
||||
* Usage Record.
|
||||
* @param usageRecord the Usage Record To Buffer
|
||||
* @return true if is time to persist the buffered Usage Record
|
||||
* @throws Exception if fails
|
||||
*/
|
||||
public void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
|
||||
aggregate(usageRecord, persistenceExecutor, false);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -207,6 +207,18 @@ public abstract class Persistence {
|
|||
}
|
||||
}
|
||||
|
||||
public void flush() throws Exception {
|
||||
aggregationScheduler.flush(new PersistenceExecutor(){
|
||||
|
||||
@Override
|
||||
public void persist(UsageRecord... usageRecords) throws Exception {
|
||||
persistence.accountWithFallback(usageRecords);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public abstract void close() throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -44,6 +44,8 @@ public class PersistenceTest {
|
|||
double average = (duration/quantity);
|
||||
logger.debug("Duration (in millisec) : " + duration);
|
||||
logger.debug("Average (in millisec) : " + average);
|
||||
|
||||
persistence.flush();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue