refs #380: Implements different behaviour to buffer account record

https://support.d4science.org/issues/380

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@124193 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2016-02-15 14:30:22 +00:00
parent ddb66f1ead
commit ad305923d2
5 changed files with 49 additions and 43 deletions

View File

@ -0,0 +1,25 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import org.gcube.documentstore.records.Record;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
class DefaultPersitenceExecutor implements PersistenceExecutor {
final PersistenceBackend persistenceBackend;
DefaultPersitenceExecutor(PersistenceBackend persistenceBackend){
this.persistenceBackend = persistenceBackend;
}
@Override
public void persist(Record... records) throws Exception {
persistenceBackend.accountWithFallback(records);
}
}

View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,7 +41,7 @@ public abstract class PersistenceBackend {
protected PersistenceBackend(FallbackPersistenceBackend fallback){
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = AggregationScheduler.newInstance(this);
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this));
}
/**
@ -118,15 +119,7 @@ public abstract class PersistenceBackend {
logger.trace("{} {} valid", record.getClass().getSimpleName(), record);
}
if(aggregate){
final PersistenceBackend persistence = this;
aggregationScheduler.aggregate(record, new PersistenceExecutor(){
@Override
public void persist(Record... records) throws Exception {
persistence.accountWithFallback(records);
}
});
aggregationScheduler.aggregate(record, new DefaultPersitenceExecutor(this));
}else{
this.accountWithFallback(record);
}
@ -160,17 +153,7 @@ public abstract class PersistenceBackend {
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
pool.awaitTermination(timeout, timeUnit);
final PersistenceBackend persistence = this;
aggregationScheduler.flush(new PersistenceExecutor(){
@Override
public void persist(Record... records) throws Exception {
persistence.accountWithFallback(records);
}
});
aggregationScheduler.flush(new DefaultPersitenceExecutor(this));
}
public abstract void close() throws Exception;

View File

@ -10,6 +10,7 @@ import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,8 +70,7 @@ public abstract class PersistenceBackendFactory {
return context.replace("/", "_");
}
protected static FallbackPersistenceBackend createFallback(String context){
logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context);
public static File getFallbackFile(String context){
File fallbackFile = null;
if(context!=null){
String sanitized = sanitizeContext(context);
@ -78,8 +78,14 @@ public abstract class PersistenceBackendFactory {
}else{
fallbackFile = new File(fallbackLocation, FALLBACK_FILENAME);
}
return fallbackFile;
}
protected static FallbackPersistenceBackend createFallback(String context){
logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context);
File fallbackFile = getFallbackFile(context);
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(fallbackPersistence));
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence;
}
@ -100,7 +106,7 @@ public abstract class PersistenceBackendFactory {
found.prepareConnection(configuration);
logger.debug("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance(found));
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found)));
found.setFallback(createFallback(context));
return found;
} catch (Exception e) {

View File

@ -1,4 +1,4 @@
package org.gcube.documentstore.persistence;
package org.gcube.documentstore.records.aggregation;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
@ -12,10 +12,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.aggregation.BufferAggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,20 +27,20 @@ public abstract class AggregationScheduler implements Runnable {
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static AggregationScheduler newInstance(PersistenceBackend persistenceBackend){
return new BufferAggregationScheduler(persistenceBackend);
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor){
return new BufferAggregationScheduler(persistenceExecutor);
}
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected final PersistenceBackend persistenceBackend;
protected final PersistenceExecutor persistenceExecutor;
protected final ScheduledExecutorService scheduler;
protected AggregationScheduler(PersistenceBackend persistenceBackend){
protected AggregationScheduler(PersistenceExecutor persistenceExecutor){
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceBackend = persistenceBackend;
this.persistenceExecutor = persistenceExecutor;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
}
@ -186,13 +186,6 @@ public abstract class AggregationScheduler implements Runnable {
*/
@Override
public void run() {
final PersistenceBackend pb = persistenceBackend;
PersistenceExecutor persistenceExecutor = new PersistenceExecutor() {
@Override
public void persist(Record... records) throws Exception {
pb.accountWithFallback(records);
}
};
try {
this.flush(persistenceExecutor);
} catch (Exception e) {

View File

@ -5,8 +5,7 @@ package org.gcube.documentstore.records.aggregation;
import java.util.Calendar;
import org.gcube.documentstore.persistence.AggregationScheduler;
import org.gcube.documentstore.persistence.PersistenceBackend;
import org.gcube.documentstore.persistence.PersistenceExecutor;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -32,8 +31,8 @@ public class BufferAggregationScheduler extends AggregationScheduler {
protected boolean firstOfBuffer;
protected long firstBufferedTime;
public BufferAggregationScheduler(PersistenceBackend persistenceBackend){
super(persistenceBackend);
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){
super(persistenceExecutor);
this.firstOfBuffer = true;
}