From fd5bce26c2310ce58c988a5b362d5227d45b15a9 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 30 Jun 2015 09:16:08 +0000 Subject: [PATCH] refs #200: Create accouting-lib library https://support.d4science.org/issues/200 Implementing buffer strategy git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115632 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../scheduler/AggregationScheduler.java | 13 +++-- .../scheduler/BufferAggregationScheduler.java | 2 +- .../accounting/persistence/Persistence.java | 51 ++++++++++--------- .../persistence/PersistenceExecutor.java | 16 ++++++ 4 files changed, 52 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java diff --git a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java index a677cf2..8a8ccb6 100644 --- a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java +++ b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/AggregationScheduler.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import org.gcube.accounting.datamodel.UsageRecord; +import org.gcube.accounting.persistence.PersistenceExecutor; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ @@ -28,13 +29,13 @@ public abstract class AggregationScheduler { public List getRecords() { return records; } - + protected AggregationScheduler(){ this.records = new ArrayList(); } protected void madeAggregation(UsageRecord UsageRecord){ - // TODO + } /** @@ -42,10 +43,14 @@ public abstract class AggregationScheduler { * Usage Record. * @param usageRecord the Usage Record To Buffer * @return true if is time to persist the buffered Usage Record + * @throws Exception if fails */ - public synchronized boolean aggregate(UsageRecord UsageRecord) { + public synchronized void aggregate(UsageRecord UsageRecord, PersistenceExecutor persistenceExecutor) throws Exception { madeAggregation(UsageRecord); - return isTimeToPersist(); + if(isTimeToPersist()){ + persistenceExecutor.persist(records.toArray(new UsageRecord[records.size()])); + } + } diff --git a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java index 8eaeb38..d92cff6 100644 --- a/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java +++ b/src/main/java/org/gcube/accounting/datamodel/aggregation/scheduler/BufferAggregationScheduler.java @@ -40,7 +40,7 @@ public class BufferAggregationScheduler extends AggregationScheduler { * {@inheritDoc} */ @Override - public synchronized boolean isTimeToPersist(){ + public boolean isTimeToPersist(){ long now = Calendar.getInstance().getTimeInMillis(); if(firstOfBuffer){ diff --git a/src/main/java/org/gcube/accounting/persistence/Persistence.java b/src/main/java/org/gcube/accounting/persistence/Persistence.java index 3a10774..c95360c 100644 --- a/src/main/java/org/gcube/accounting/persistence/Persistence.java +++ b/src/main/java/org/gcube/accounting/persistence/Persistence.java @@ -4,7 +4,6 @@ package org.gcube.accounting.persistence; import java.io.File; -import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -129,27 +128,26 @@ public abstract class Persistence { * This method contains the code to save the {@link #UsageRecord} * */ - protected abstract void reallyAccount(UsageRecord usageRecord) throws Exception; + protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception; - - - private void accountWithFallback(UsageRecord usageRecord) throws Exception { + private void accountWithFallback(UsageRecord... usageRecords) { String persistenceName = getInstance().getClass().getSimpleName(); - try { - //logger.debug("Going to account {} using {}", usageRecord, persistenceName); - persistence.reallyAccount(usageRecord); - logger.debug("{} accounted succesfully from {}.", usageRecord, persistenceName); - } catch (Exception e) { - String fallabackPersistenceName = fallback.getClass().getSimpleName(); + for(UsageRecord usageRecord : usageRecords){ try { - logger.error("{} was not accounted succesfully from {}. Trying to use {}.", - usageRecord, persistenceName, fallabackPersistenceName); - fallback.reallyAccount(usageRecord); - logger.debug("{} accounted succesfully from {}", - usageRecord, fallabackPersistenceName); - }catch(Exception ex){ - logger.error("{} was not accounted at all", usageRecord); - throw e; + //logger.debug("Going to account {} using {}", usageRecord, persistenceName); + persistence.reallyAccount(usageRecord); + logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName); + } catch (Exception e) { + String fallabackPersistenceName = fallback.getClass().getSimpleName(); + try { + logger.error("{} was not accounted succesfully from {}. Trying to use {}.", + usageRecord.toString(), persistenceName, fallabackPersistenceName); + fallback.reallyAccount(usageRecord); + logger.debug("{} accounted succesfully from {}", + usageRecord.toString(), fallabackPersistenceName); + }catch(Exception ex){ + logger.error("{} was not accounted at all", usageRecord.toString()); + } } } } @@ -169,13 +167,16 @@ public abstract class Persistence { public void run(){ try { usageRecord.validate(); - boolean timeToPersist = aggregationScheduler.aggregate(usageRecord); - if(timeToPersist){ - List records = aggregationScheduler.getRecords(); - for(UsageRecord record : records){ - persistence.accountWithFallback(record); + aggregationScheduler.aggregate(usageRecord, new PersistenceExecutor(){ + + @Override + public void persist(UsageRecord... usageRecords) throws Exception { + persistence.accountWithFallback(usageRecords); } - } + + }); + + } catch (InvalidValueException e) { logger.error("Error validating UsageRecord", e.getCause()); } catch (Exception e) { diff --git a/src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java b/src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java new file mode 100644 index 0000000..95d1552 --- /dev/null +++ b/src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java @@ -0,0 +1,16 @@ +/** + * + */ +package org.gcube.accounting.persistence; + +import org.gcube.accounting.datamodel.UsageRecord; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public interface PersistenceExecutor { + + public void persist(UsageRecord... usageRecords)throws Exception; + +}