From ddb66f1ead4e43a874e88a801095fbf26c2447c6 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Mon, 15 Feb 2016 11:18:30 +0000 Subject: [PATCH] refs #380: Implements different behaviour to buffer account record https://support.d4science.org/issues/380 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@124183 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../AggregationScheduler.java | 74 +++++++++++++------ .../FallbackPersistenceBackend.java | 3 +- .../persistence/PersistenceBackend.java | 6 +- .../PersistenceBackendFactory.java | 5 +- .../BufferAggregationScheduler.java | 7 +- 5 files changed, 62 insertions(+), 33 deletions(-) rename src/main/java/org/gcube/documentstore/{records/aggregation => persistence}/AggregationScheduler.java (70%) diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java b/src/main/java/org/gcube/documentstore/persistence/AggregationScheduler.java similarity index 70% rename from src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java rename to src/main/java/org/gcube/documentstore/persistence/AggregationScheduler.java index 45cd1a1..61b5f44 100644 --- a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java +++ b/src/main/java/org/gcube/documentstore/persistence/AggregationScheduler.java @@ -1,4 +1,4 @@ -package org.gcube.documentstore.records.aggregation; +package org.gcube.documentstore.persistence; import java.lang.reflect.Constructor; import java.util.ArrayList; @@ -7,12 +7,15 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions; -import org.gcube.documentstore.persistence.PersistenceExecutor; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; +import org.gcube.documentstore.records.aggregation.BufferAggregationScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,20 +23,26 @@ import org.slf4j.LoggerFactory; * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ -public abstract class AggregationScheduler { +public abstract class AggregationScheduler implements Runnable { public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class); - public static AggregationScheduler newInstance(){ - return new BufferAggregationScheduler(); + public static AggregationScheduler newInstance(PersistenceBackend persistenceBackend){ + return new BufferAggregationScheduler(persistenceBackend); } protected int totalBufferedRecords; protected Map> bufferedRecords; - - protected AggregationScheduler(){ + + protected final PersistenceBackend persistenceBackend; + protected final ScheduledExecutorService scheduler; + + protected AggregationScheduler(PersistenceBackend persistenceBackend){ this.bufferedRecords = new HashMap>(); this.totalBufferedRecords = 0; + this.persistenceBackend = persistenceBackend; + this.scheduler = Executors.newScheduledThreadPool(1); + this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES); } @SuppressWarnings("rawtypes") @@ -136,24 +145,26 @@ public abstract class AggregationScheduler { } if(isTimeToPersist() || forceFlush){ - Record[] recordToPersist = new Record[totalBufferedRecords]; - int i = 0; - Collection> values = bufferedRecords.values(); - for(List records : values){ - for(Record thisRecord: records){ - recordToPersist[i] = thisRecord; - i++; - } - } - - logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist)); - persistenceExecutor.persist(recordToPersist); - - clear(); + reallyFlush(persistenceExecutor); } } - + protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception{ + Record[] recordToPersist = new Record[totalBufferedRecords]; + int i = 0; + Collection> values = bufferedRecords.values(); + for(List records : values){ + for(Record thisRecord: records){ + recordToPersist[i] = thisRecord; + i++; + } + } + + logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist)); + persistenceExecutor.persist(recordToPersist); + + clear(); + } /** * Get an usage records and try to aggregate with other buffered @@ -169,5 +180,24 @@ public abstract class AggregationScheduler { protected abstract boolean isTimeToPersist(); + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + final PersistenceBackend pb = persistenceBackend; + PersistenceExecutor persistenceExecutor = new PersistenceExecutor() { + @Override + public void persist(Record... records) throws Exception { + pb.accountWithFallback(records); + } + }; + try { + this.flush(persistenceExecutor); + } catch (Exception e) { + logger.error("Error flushin Buffered Records"); + } + } } diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java index 68898ba..f747f2e 100644 --- a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java @@ -10,7 +10,6 @@ import java.io.IOException; import java.io.PrintWriter; import org.gcube.documentstore.records.Record; -import org.gcube.documentstore.records.aggregation.AggregationScheduler; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ @@ -27,7 +26,7 @@ public class FallbackPersistenceBackend extends PersistenceBackend { } protected FallbackPersistenceBackend(File fallbackFile) { - super(null, AggregationScheduler.newInstance()); + super(null); this.fallbackFile = fallbackFile; } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java index 8e18fc8..81f4be4 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java @@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.records.Record; -import org.gcube.documentstore.records.aggregation.AggregationScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +37,10 @@ public abstract class PersistenceBackend { } } - protected PersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){ + protected PersistenceBackend(FallbackPersistenceBackend fallback){ this(); this.fallbackPersistence = fallback; - this.aggregationScheduler = aggregationScheduler; - + this.aggregationScheduler = AggregationScheduler.newInstance(this); } /** diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java index 78de96e..de86ac0 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -10,7 +10,6 @@ import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.TimeUnit; -import org.gcube.documentstore.records.aggregation.AggregationScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +79,7 @@ public abstract class PersistenceBackendFactory { fallbackFile = new File(fallbackLocation, FALLBACK_FILENAME); } FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile); - fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance()); + fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(fallbackPersistence)); return fallbackPersistence; } @@ -101,7 +100,7 @@ public abstract class PersistenceBackendFactory { found.prepareConnection(configuration); logger.debug("{} will be used.", foundClassName); - found.setAggregationScheduler(AggregationScheduler.newInstance()); + found.setAggregationScheduler(AggregationScheduler.newInstance(found)); found.setFallback(createFallback(context)); return found; } catch (Exception e) { diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java index 98aa7e2..f89656d 100644 --- a/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java +++ b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java @@ -5,6 +5,9 @@ package org.gcube.documentstore.records.aggregation; import java.util.Calendar; +import org.gcube.documentstore.persistence.AggregationScheduler; +import org.gcube.documentstore.persistence.PersistenceBackend; + /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * @@ -29,8 +32,8 @@ public class BufferAggregationScheduler extends AggregationScheduler { protected boolean firstOfBuffer; protected long firstBufferedTime; - protected BufferAggregationScheduler(){ - super(); + public BufferAggregationScheduler(PersistenceBackend persistenceBackend){ + super(persistenceBackend); this.firstOfBuffer = true; }