From ba405646d3fc77e8683f204a8e859c5ee839e304 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Mon, 29 Jun 2015 15:41:16 +0000 Subject: [PATCH] refs #200: Create accouting-lib library https://support.d4science.org/issues/200 Implementing buffer strategy git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115626 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../scheduler/AggregationScheduler.java | 54 ++++++++++++++++ .../scheduler/BufferAggregationScheduler.java | 63 +++++++++++++++++++ .../accounting/persistence/Persistence.java | 20 +++++- 3 files changed, 134 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java create mode 100644 src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java diff --git a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java new file mode 100644 index 0000000..a677cf2 --- /dev/null +++ b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java @@ -0,0 +1,54 @@ +package org.gcube.accounting.datamodel.aggregation.scheduler; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.accounting.datamodel.UsageRecord; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public abstract class AggregationScheduler { + + protected static AggregationScheduler aggregationScheduler; + + public static AggregationScheduler getInstance(){ + if(aggregationScheduler==null){ + aggregationScheduler = new BufferAggregationScheduler(); + } + return aggregationScheduler; + } + + protected List records; + + /** + * @return the records + */ + public List getRecords() { + return records; + } + + protected AggregationScheduler(){ + this.records = new ArrayList(); + } + + protected void madeAggregation(UsageRecord UsageRecord){ + // TODO + } + + /** + * Get an usage records and try to aggregate with other buffered + * Usage Record. + * @param usageRecord the Usage Record To Buffer + * @return true if is time to persist the buffered Usage Record + */ + public synchronized boolean aggregate(UsageRecord UsageRecord) { + madeAggregation(UsageRecord); + return isTimeToPersist(); + } + + + protected abstract boolean isTimeToPersist(); + +} diff --git a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java new file mode 100644 index 0000000..8eaeb38 --- /dev/null +++ b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java @@ -0,0 +1,63 @@ +/** + * + */ +package org.gcube.accounting.datamodel.aggregation.scheduler; + +import java.util.Calendar; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + * This class implements a Simple Buffer with timeout strategy. + * It buffer a predefined number of Records before invoking a persistence. + * + * + */ +public class BufferAggregationScheduler extends AggregationScheduler { + + /** + * Define the MAX number of Record to buffer. + * TODO Get from configuration + */ + protected final static int MAX_RECORDS_NUMBER = 15; + + /** + * The Max amount of time elapsed form last record before after that + * the buffered record are persisted even if + * TODO Get from configuration + */ + protected final static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*5; // 5 min + + protected boolean firstOfBuffer; + protected long firstBufferedTime; + + protected BufferAggregationScheduler(){ + super(); + this.firstOfBuffer = true; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized boolean isTimeToPersist(){ + long now = Calendar.getInstance().getTimeInMillis(); + + if(firstOfBuffer){ + firstOfBuffer = false; + firstBufferedTime = now; + } + + if(records.size() >= MAX_RECORDS_NUMBER){ + return true; + } + + if((now - firstBufferedTime) >= OLD_RECORD_MAX_TIME_ELAPSED){ + return true; + } + + return false; + } + + +} diff --git a/src/main/java/org/gcube/accounting/persistence/Persistence.java b/src/main/java/org/gcube/accounting/persistence/Persistence.java index cb3f505..3a10774 100644 --- a/src/main/java/org/gcube/accounting/persistence/Persistence.java +++ b/src/main/java/org/gcube/accounting/persistence/Persistence.java @@ -4,6 +4,7 @@ package org.gcube.accounting.persistence; import java.io.File; +import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -11,6 +12,7 @@ import java.util.concurrent.Executors; import org.gcube.accounting.datamodel.SingleUsageRecord; import org.gcube.accounting.datamodel.TestUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; +import org.gcube.accounting.datamodel.aggregation.scheduler.AggregationScheduler; import org.gcube.accounting.exception.InvalidValueException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +31,7 @@ public abstract class Persistence { */ protected static Persistence persistence; protected static FallbackPersistence fallback; + protected static AggregationScheduler aggregationScheduler; private static File file(File file) throws IllegalArgumentException { @@ -79,6 +82,7 @@ public abstract class Persistence { e.getCause()); persistence = fallback; } + aggregationScheduler = AggregationScheduler.getInstance(); } /** @@ -127,7 +131,9 @@ public abstract class Persistence { */ protected abstract void reallyAccount(UsageRecord usageRecord) throws Exception; - private void accountWithFallback(SingleUsageRecord usageRecord) throws Exception { + + + private void accountWithFallback(UsageRecord usageRecord) throws Exception { String persistenceName = getInstance().getClass().getSimpleName(); try { //logger.debug("Going to account {} using {}", usageRecord, persistenceName); @@ -163,9 +169,17 @@ public abstract class Persistence { public void run(){ try { usageRecord.validate(); - persistence.accountWithFallback(usageRecord); + boolean timeToPersist = aggregationScheduler.aggregate(usageRecord); + if(timeToPersist){ + List records = aggregationScheduler.getRecords(); + for(UsageRecord record : records){ + persistence.accountWithFallback(record); + } + } + } catch (InvalidValueException e) { + logger.error("Error validating UsageRecord", e.getCause()); } catch (Exception e) { - logger.error("Error accouting UsageRecod", e.getCause()); + logger.error("Error accounting UsageRecord", e.getCause()); } } };