From 9318f46c281b900f1995a08e6d204573f865af62 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 30 Jun 2015 15:32:30 +0000 Subject: [PATCH] refs #200: Create accouting-lib library https://support.d4science.org/issues/200 Implementing buffer strategy git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115720 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../scheduler/AggregationScheduler.java | 34 +++++++++++++------ .../accounting/persistence/Persistence.java | 12 +++++++ .../persistence/PersistenceTest.java | 2 ++ 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java index 820cf19..bad7b19 100644 --- a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java +++ b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java @@ -111,6 +111,8 @@ public abstract class AggregationScheduler { for(AggregationStrategy aggregationStrategy : aggregationStrategies){ try { aggregationStrategy.aggregate((SingleUsageRecord) usageRecord); + found = true; + break; } catch(NotAggregatableRecordsExceptions e) { logger.trace("{} is not usable for aggregation", aggregationStrategy); } @@ -155,16 +157,16 @@ public abstract class AggregationScheduler { } - /** - * Get an usage records and try to aggregate with other buffered - * Usage Record. - * @param usageRecord the Usage Record To Buffer - * @return true if is time to persist the buffered Usage Record - * @throws Exception if fails - */ - public synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception { - madeAggregation(usageRecord); - if(isTimeToPersist()){ + public void flush(PersistenceExecutor persistenceExecutor) throws Exception{ + aggregate(null, persistenceExecutor, true); + } + + protected synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception { + if(usageRecord!=null){ + madeAggregation(usageRecord); + } + + if(isTimeToPersist() || forceFlush){ UsageRecord[] recordToPersist = new UsageRecord[totalBufferedRecords]; int i = 0; @SuppressWarnings("rawtypes") @@ -186,7 +188,17 @@ public abstract class AggregationScheduler { records.clear(); unaggregableRecords.clear(); } - + } + + /** + * Get an usage records and try to aggregate with other buffered + * Usage Record. + * @param usageRecord the Usage Record To Buffer + * @return true if is time to persist the buffered Usage Record + * @throws Exception if fails + */ + public void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception { + aggregate(usageRecord, persistenceExecutor, false); } diff --git a/src/main/java/org/gcube/accounting/persistence/Persistence.java b/src/main/java/org/gcube/accounting/persistence/Persistence.java index ef801e7..a4529d7 100644 --- a/src/main/java/org/gcube/accounting/persistence/Persistence.java +++ b/src/main/java/org/gcube/accounting/persistence/Persistence.java @@ -207,6 +207,18 @@ public abstract class Persistence { } } + public void flush() throws Exception { + aggregationScheduler.flush(new PersistenceExecutor(){ + + @Override + public void persist(UsageRecord... usageRecords) throws Exception { + persistence.accountWithFallback(usageRecords); + } + + }); + } + + public abstract void close() throws Exception; } diff --git a/src/test/java/org/gcube/accounting/datamodel/persistence/PersistenceTest.java b/src/test/java/org/gcube/accounting/datamodel/persistence/PersistenceTest.java index 8f3d62c..433e807 100644 --- a/src/test/java/org/gcube/accounting/datamodel/persistence/PersistenceTest.java +++ b/src/test/java/org/gcube/accounting/datamodel/persistence/PersistenceTest.java @@ -44,6 +44,8 @@ public class PersistenceTest { double average = (duration/quantity); logger.debug("Duration (in millisec) : " + duration); logger.debug("Average (in millisec) : " + average); + + persistence.flush(); } }