This commit is contained in:
Alessandro Pieve 2016-06-28 13:09:40 +00:00
parent 5da3783125
commit be994ed95b
6 changed files with 265 additions and 75 deletions

View File

@ -9,15 +9,14 @@
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.2-SNAPSHOT</version>
<name>Document Store Lib</name>
<description>
Allow to persist data in NoSQL Document Store Databases.
<description>Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically.
Discover Database Backend connector dynamically.
Discover Configuration implementation dynamically.
Provide aggregation and fallback facilities.
</description>
Discover Max Buffer Record dynamically.
Provide aggregation and fallback facilities.</description>
<packaging>jar</packaging>
<properties>

View File

@ -38,10 +38,13 @@ public abstract class PersistenceBackend {
}
}
protected PersistenceBackend(FallbackPersistenceBackend fallback){
protected PersistenceBackend(FallbackPersistenceBackend fallback){
this();
logger.trace("PersistenceBackend-");
this.fallbackPersistence = fallback;
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this));
}
/**

View File

@ -89,6 +89,7 @@ public abstract class PersistenceBackendFactory {
public static File getFallbackFile(String context){
context = sanitizeContext(context);
String slashLessContext = removeSlashFromContext(context);
logger.trace("getFallbackFile location:"+getFallbackLocation()+" context:"+slashLessContext+"-"+FALLBACK_FILENAME);
File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME));
return fallbackFile;
}
@ -99,7 +100,10 @@ public abstract class PersistenceBackendFactory {
File fallbackFile = getFallbackFile(context);
logger.trace("{} for context {} is {}", FallbackPersistenceBackend.class.getSimpleName(), context, fallbackFile.getAbsolutePath());
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
//Modify for configuration aggregation
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence;
}
@ -121,8 +125,10 @@ public abstract class PersistenceBackendFactory {
found.prepareConnection(configuration);
logger.debug("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found)));
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration));
found.setFallback(createFallback(context));
return found;
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e);

View File

@ -94,6 +94,7 @@ public class RecordUtility {
AggregatedRecord<?,?> instance = cls.newInstance();
discoveredRecordType = instance.getRecordType();
if(!aggregatedRecordClassesFound.containsKey(discoveredRecordType)){
logger.trace("discoveredRecordType not found"+discoveredRecordType+" with cls:"+cls.getName());
aggregatedRecordClassesFound.put(discoveredRecordType, cls);
Class<? extends Record> recordClass = instance.getAggregable();
@ -147,11 +148,20 @@ public class RecordUtility {
}
public static Class<? extends AggregatedRecord<?,?>> getAggregatedRecordClass(String recordType) throws ClassNotFoundException {
if(getAggregatedRecordClassesFound().containsKey(recordType)){
logger.trace("record type {},getAggregatedRecordClassesFound {}",recordType,getAggregatedRecordClassesFound(),getAggregatedRecordClassesFound().get(recordType));
return getAggregatedRecordClassesFound().get(recordType);
}
logger.error("Unable to find {} class for {}.",
AggregatedRecord.class.getSimpleName(), recordType);
logger.trace("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound());
throw new ClassNotFoundException();
}

View File

@ -3,19 +3,26 @@ package org.gcube.documentstore.records.aggregation;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
//import org.gcube.documentstore.persistence.DefaultPersitenceExecutor;
import org.gcube.documentstore.persistence.PersistenceBackend;
import org.gcube.documentstore.persistence.PersistenceBackendConfiguration;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.implementation.ConfigurationGetPropertyValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,33 +31,118 @@ import org.slf4j.LoggerFactory;
*
*/
public abstract class AggregationScheduler implements Runnable {
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor){
return new BufferAggregationScheduler(persistenceExecutor);
}
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected final PersistenceExecutor persistenceExecutor;
protected final ScheduledExecutorService scheduler;
public final static int INITIAL_DELAY = 10;
public final static int DELAY = 10;
public static int INITIAL_DELAY = 10;
public static int DELAY = 10;
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
public static final String AGGREGATION_SCHEDULER_TIME="AggregationSchedulerTime";
public static final String BUFFER_RECORD_TIME="BufferRecordTime";
public static final String BUFFER_RECORD_NUMBER="BufferRecordNumber";
/**
* The Max amount of time for reload a configuration
* TODO Get from configuration
*/
public static long TIME_RELOAD_CONFIGURATION =1000*60*60*1; // 5 hour
/**
* The time for first
*/
public static long TIME_LOAD_CONFIGURATION=0L;
/**
* Define the MAX number of Record to buffer.
* TODO Get from configuration
*/
protected static int MAX_RECORDS_NUMBER = 15;
/**
* The Max amount of time elapsed form last record before after that
* the buffered record are persisted even if
* TODO Get from configuration
*/
protected static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*10; // 10 min
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor){
return new BufferAggregationScheduler(persistenceExecutor);
}
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor, PersistenceBackendConfiguration configuration) throws NumberFormatException, Exception{
ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
Properties prop =properties.getPropValues();
Integer delay=0;
Integer maxRecordNumber=0;
Integer maxRecordTime=0;
if (prop==null){
//get value from service end point
logger.trace("Configuration from service end point");
delay=Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME));
maxRecordTime=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME));
maxRecordNumber=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER));
}
else{
//get value from properties file
logger.trace("Configuration from properties file");
delay=Integer.parseInt(prop.getProperty("delay"));
maxRecordNumber=Integer.parseInt(prop.getProperty("maxrecordnumber"));
maxRecordTime=Integer.parseInt(prop.getProperty("maxtimenumber"));
}
if (delay != 0){
DELAY=delay;
INITIAL_DELAY=delay;
}
if (maxRecordNumber != 0)
MAX_RECORDS_NUMBER=maxRecordNumber;
if (maxRecordTime != 0)
OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60;
TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis();
logger.trace("AggregationScheduler--NewInstance TIME_LOAD_CONFIGURATION:"+TIME_LOAD_CONFIGURATION);
logger.trace("AggregationScheduler--NewInstance TIME_RELOAD_CONFIGURATION:"+TIME_RELOAD_CONFIGURATION);
return new BufferAggregationScheduler(persistenceExecutor);
}
protected AggregationScheduler(PersistenceExecutor persistenceExecutor){
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceExecutor = persistenceExecutor;
this.scheduler = Executors.newScheduledThreadPool(1);
logger.trace("AggregationScheduler-- OLD_RECORD_MAX_TIME_ELAPSED:"+OLD_RECORD_MAX_TIME_ELAPSED);
logger.trace("AggregationScheduler-- DELAY:"+DELAY);
logger.trace("AggregationScheduler-- MAX_RECORDS_NUMBER:"+MAX_RECORDS_NUMBER);
this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
}
@SuppressWarnings("rawtypes")
protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception{
String recordType = record.getRecordType();
Class<? extends AggregatedRecord> clz = RecordUtility.getAggregatedRecordClass(recordType);
Class[] argTypes = { record.getClass() };
@ -58,9 +150,10 @@ public abstract class AggregationScheduler implements Runnable {
Object[] arguments = {record};
return constructor.newInstance(arguments);
}
@SuppressWarnings("rawtypes")
public static AggregatedRecord getAggregatedRecord(Record record) throws Exception {
AggregatedRecord aggregatedRecord;
if(record instanceof AggregatedRecord){
// the record is already an aggregated version
@ -68,36 +161,36 @@ public abstract class AggregationScheduler implements Runnable {
}else{
aggregatedRecord = instantiateAggregatedRecord(record);
}
return aggregatedRecord;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(Record record){
String recordType = record.getRecordType();
List<Record> records;
if(this.bufferedRecords.containsKey(recordType)){
records = this.bufferedRecords.get(recordType);
boolean found = false;
for(Record bufferedRecord : records){
if(!(bufferedRecord instanceof AggregatedRecord)){
continue;
}
try {
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
logger.trace("Trying to use {} for aggregation.", bufferedAggregatedRecord);
if(record instanceof AggregatedRecord){
// TODO check compatibility using getAggregable
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
}else{
bufferedAggregatedRecord.aggregate((Record) record);
}
logger.trace("Aggregated Record is {}", bufferedAggregatedRecord);
found = true;
break;
@ -105,7 +198,7 @@ public abstract class AggregationScheduler implements Runnable {
logger.trace("{} is not usable for aggregation", bufferedRecord);
}
}
if(!found){
try {
records.add(getAggregatedRecord(record));
@ -115,8 +208,8 @@ public abstract class AggregationScheduler implements Runnable {
totalBufferedRecords++;
return;
}
}else{
records = new ArrayList<Record>();
try {
@ -127,32 +220,124 @@ public abstract class AggregationScheduler implements Runnable {
totalBufferedRecords++;
this.bufferedRecords.put(recordType, records);
}
}
public void flush(PersistenceExecutor persistenceExecutor) throws Exception{
aggregate(null, persistenceExecutor, true);
}
protected abstract void schedulerSpecificClear();
protected void clear(){
totalBufferedRecords=0;
bufferedRecords.clear();
schedulerSpecificClear();
}
protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
if(record!=null){
logger.trace("Trying to aggregate {}", record);
madeAggregation(record);
}
if(isTimeToPersist() || forceFlush){
if(isTimeToPersist( MAX_RECORDS_NUMBER , OLD_RECORD_MAX_TIME_ELAPSED ) || forceFlush){
reallyFlush(persistenceExecutor);
}
/**
* reload a configuration
*/
long now = Calendar.getInstance().getTimeInMillis();
if((now - TIME_LOAD_CONFIGURATION) >= TIME_RELOAD_CONFIGURATION){
ReloadConfiguration();
}
}
protected void ReloadConfiguration()throws Exception{
new Thread(){
public void run(){
Integer delay=0;
Integer maxRecordNumber=0;
Integer maxRecordTime=0;
try {
ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
Properties prop =properties.getPropValues();
if (prop!=null){
//get value from properties file
logger.trace("Reload Configuration from properties file");
delay=Integer.parseInt(prop.getProperty("delay"));
maxRecordNumber=Integer.parseInt(prop.getProperty("maxrecordnumber"));
maxRecordTime=Integer.parseInt(prop.getProperty("maxtimenumber"));
}
else{
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
PersistenceBackendConfiguration configuration =null;
for (PersistenceBackend found : serviceLoader) {
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.debug("Testing {}", foundClassName);
configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){
continue;
}
logger.debug("{} will be used.", foundClassName);
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e);
}
}
if (configuration!=null){
//get value from service end point
logger.trace("Reload Configuration from service end point");
delay=Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME));
maxRecordTime=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME));
maxRecordNumber=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER));
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", e.getLocalizedMessage()), e);
//e.printStackTrace();
}
//configure new value
if (delay != 0){
DELAY=delay;
INITIAL_DELAY=delay;
}
if (maxRecordNumber != 0)
MAX_RECORDS_NUMBER=maxRecordNumber;
if (maxRecordTime != 0)
OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60;
//reset a timer
TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis();
logger.trace("AggregationScheduler-- OLD_RECORD_MAX_TIME_ELAPSED:"+OLD_RECORD_MAX_TIME_ELAPSED);
logger.trace("AggregationScheduler-- DELAY:"+DELAY);
logger.trace("AggregationScheduler-- MAX_RECORDS_NUMBER:"+MAX_RECORDS_NUMBER);
}
}.start();
}
protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception{
if(totalBufferedRecords==0){
return;
@ -166,13 +351,12 @@ public abstract class AggregationScheduler implements Runnable {
i++;
}
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
logger.trace("reallyFlush It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
}
/**
* Get an usage records and try to aggregate with other buffered
* Usage Record.
@ -181,12 +365,11 @@ public abstract class AggregationScheduler implements Runnable {
* @throws Exception if fails
*/
public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception {
logger.trace("Going to aggregate {}", record);
aggregate(record, persistenceExecutor, false);
}
protected abstract boolean isTimeToPersist();
protected abstract boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime);
/* (non-Javadoc)
* @see java.lang.Runnable#run()
@ -199,5 +382,5 @@ public abstract class AggregationScheduler implements Runnable {
logger.error("Error flushing Buffered Records");
}
}
}

View File

@ -15,22 +15,9 @@ import org.gcube.documentstore.persistence.PersistenceExecutor;
*/
public class BufferAggregationScheduler extends AggregationScheduler {
/**
* Define the MAX number of Record to buffer.
* TODO Get from configuration
*/
protected final static int MAX_RECORDS_NUMBER = 15;
/**
* The Max amount of time elapsed form last record before after that
* the buffered record are persisted even if
* TODO Get from configuration
*/
protected final static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*5; // 5 min
protected boolean firstOfBuffer;
protected long firstBufferedTime;
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){
super(persistenceExecutor);
this.firstOfBuffer = true;
@ -40,30 +27,32 @@ public class BufferAggregationScheduler extends AggregationScheduler {
protected void schedulerSpecificClear(){
firstOfBuffer = true;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isTimeToPersist(){
protected boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime) {
long now = Calendar.getInstance().getTimeInMillis();
if(firstOfBuffer){
firstOfBuffer = false;
firstBufferedTime = now;
}
if(totalBufferedRecords >= MAX_RECORDS_NUMBER){
if(totalBufferedRecords >= maxRecordNumber){
logger.trace("Time perisist from maxRecordNumber:"+maxRecordNumber+" max totalBufferedRecords:"+totalBufferedRecords);
return true;
}
if((now - firstBufferedTime) >= OLD_RECORD_MAX_TIME_ELAPSED){
if((now - firstBufferedTime) >= oldRecordMaxTime){
logger.trace("Time perisist from oldRecordMaxTime:"+oldRecordMaxTime+" firstBufferedTime:"+firstBufferedTime);
return true;
}
return false;
}
}