refs #380: Implements different behaviour to buffer account record

https://support.d4science.org/issues/380

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@124183 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2016-02-15 11:18:30 +00:00
parent 0df6fda7bc
commit ddb66f1ead
5 changed files with 62 additions and 33 deletions

View File

@ -1,4 +1,4 @@
package org.gcube.documentstore.records.aggregation;
package org.gcube.documentstore.persistence;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
@ -7,12 +7,15 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.aggregation.BufferAggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,20 +23,26 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class AggregationScheduler {
public abstract class AggregationScheduler implements Runnable {
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static AggregationScheduler newInstance(){
return new BufferAggregationScheduler();
public static AggregationScheduler newInstance(PersistenceBackend persistenceBackend){
return new BufferAggregationScheduler(persistenceBackend);
}
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected AggregationScheduler(){
protected final PersistenceBackend persistenceBackend;
protected final ScheduledExecutorService scheduler;
protected AggregationScheduler(PersistenceBackend persistenceBackend){
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceBackend = persistenceBackend;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
}
@SuppressWarnings("rawtypes")
@ -136,24 +145,26 @@ public abstract class AggregationScheduler {
}
if(isTimeToPersist() || forceFlush){
Record[] recordToPersist = new Record[totalBufferedRecords];
int i = 0;
Collection<List<Record>> values = bufferedRecords.values();
for(List<Record> records : values){
for(Record thisRecord: records){
recordToPersist[i] = thisRecord;
i++;
}
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
reallyFlush(persistenceExecutor);
}
}
protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception{
Record[] recordToPersist = new Record[totalBufferedRecords];
int i = 0;
Collection<List<Record>> values = bufferedRecords.values();
for(List<Record> records : values){
for(Record thisRecord: records){
recordToPersist[i] = thisRecord;
i++;
}
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
}
/**
* Get an usage records and try to aggregate with other buffered
@ -169,5 +180,24 @@ public abstract class AggregationScheduler {
protected abstract boolean isTimeToPersist();
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
final PersistenceBackend pb = persistenceBackend;
PersistenceExecutor persistenceExecutor = new PersistenceExecutor() {
@Override
public void persist(Record... records) throws Exception {
pb.accountWithFallback(records);
}
};
try {
this.flush(persistenceExecutor);
} catch (Exception e) {
logger.error("Error flushin Buffered Records");
}
}
}

View File

@ -10,7 +10,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -27,7 +26,7 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
}
protected FallbackPersistenceBackend(File fallbackFile) {
super(null, AggregationScheduler.newInstance());
super(null);
this.fallbackFile = fallbackFile;
}

View File

@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,11 +37,10 @@ public abstract class PersistenceBackend {
}
}
protected PersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){
protected PersistenceBackend(FallbackPersistenceBackend fallback){
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = aggregationScheduler;
this.aggregationScheduler = AggregationScheduler.newInstance(this);
}
/**

View File

@ -10,7 +10,6 @@ import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,7 +79,7 @@ public abstract class PersistenceBackendFactory {
fallbackFile = new File(fallbackLocation, FALLBACK_FILENAME);
}
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(fallbackPersistence));
return fallbackPersistence;
}
@ -101,7 +100,7 @@ public abstract class PersistenceBackendFactory {
found.prepareConnection(configuration);
logger.debug("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance());
found.setAggregationScheduler(AggregationScheduler.newInstance(found));
found.setFallback(createFallback(context));
return found;
} catch (Exception e) {

View File

@ -5,6 +5,9 @@ package org.gcube.documentstore.records.aggregation;
import java.util.Calendar;
import org.gcube.documentstore.persistence.AggregationScheduler;
import org.gcube.documentstore.persistence.PersistenceBackend;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
@ -29,8 +32,8 @@ public class BufferAggregationScheduler extends AggregationScheduler {
protected boolean firstOfBuffer;
protected long firstBufferedTime;
protected BufferAggregationScheduler(){
super();
public BufferAggregationScheduler(PersistenceBackend persistenceBackend){
super(persistenceBackend);
this.firstOfBuffer = true;
}