fix reload configuration and optimize the number of thread for each scope
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@134689 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
6866cd6afb
commit
1ec3508771
|
@ -9,9 +9,9 @@ import org.gcube.documentstore.records.Record;
|
|||
* @author Luca Frosini (ISTI - CNR)
|
||||
*
|
||||
*/
|
||||
class DefaultPersitenceExecutor implements PersistenceExecutor {
|
||||
public class DefaultPersitenceExecutor implements PersistenceExecutor {
|
||||
|
||||
final PersistenceBackend persistenceBackend;
|
||||
public final PersistenceBackend persistenceBackend;
|
||||
|
||||
DefaultPersitenceExecutor(PersistenceBackend persistenceBackend){
|
||||
this.persistenceBackend = persistenceBackend;
|
||||
|
|
|
@ -41,7 +41,7 @@ public abstract class PersistenceBackend {
|
|||
this();
|
||||
logger.trace("PersistenceBackend-");
|
||||
this.fallbackPersistence = fallback;
|
||||
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this));
|
||||
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this), "FALLBACK");
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -116,12 +116,12 @@ public abstract class PersistenceBackendFactory {
|
|||
File fallbackFile = getFallbackFile(context);
|
||||
logger.trace("{} for context {} is {}", FallbackPersistenceBackend.class.getSimpleName(), context, fallbackFile.getAbsolutePath());
|
||||
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
|
||||
|
||||
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
|
||||
//TODO: VERIFY (Aggregation creation for fallback is already done on FallbackPersistenceBackend constructor)
|
||||
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
|
||||
return fallbackPersistence;
|
||||
}
|
||||
|
||||
protected static PersistenceBackend discoverPersistenceBackend(String context){
|
||||
protected static PersistenceBackend discoverPersistenceBackend(String context, FallbackPersistenceBackend fallback){
|
||||
context = sanitizeContext(context);
|
||||
logger.debug("Discovering {} for scope {}",
|
||||
PersistenceBackend.class.getSimpleName(), context);
|
||||
|
@ -141,10 +141,12 @@ public abstract class PersistenceBackendFactory {
|
|||
found.setOpen();
|
||||
|
||||
logger.trace("{} will be used.", foundClassName);
|
||||
|
||||
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration));
|
||||
found.setFallback(createFallback(context));
|
||||
|
||||
|
||||
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK"));
|
||||
if (fallback!=null)
|
||||
found.setFallback(fallback);
|
||||
else
|
||||
found.setFallback(createFallback(context));
|
||||
return found;
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -164,7 +166,7 @@ public abstract class PersistenceBackendFactory {
|
|||
logger.trace("Going to synchronized block in getPersistenceBackend");
|
||||
synchronized (persistenceBackends) {
|
||||
persistence = persistenceBackends.get(context);
|
||||
logger.trace("{} {}", PersistenceBackend.class.getSimpleName(), persistence);
|
||||
logger.trace("[getPersistenceBackend]{} {} in context {}", PersistenceBackend.class.getSimpleName(), persistence,context);
|
||||
if(persistence==null){
|
||||
|
||||
/*
|
||||
|
@ -176,8 +178,12 @@ public abstract class PersistenceBackendFactory {
|
|||
persistenceBackends.put(context, persistence);
|
||||
|
||||
if(forceImmediateRediscovery){
|
||||
persistence = discoverPersistenceBackend(context);
|
||||
persistenceBackends.put(context, persistence);
|
||||
PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence);
|
||||
|
||||
if (p!=null){
|
||||
persistence=p;
|
||||
persistenceBackends.put(context, persistence);
|
||||
}
|
||||
}else{
|
||||
new PersistenceBackendRediscover(context,
|
||||
(FallbackPersistenceBackend) persistence, INITIAL_DELAY,
|
||||
|
@ -190,7 +196,7 @@ public abstract class PersistenceBackendFactory {
|
|||
}
|
||||
|
||||
|
||||
protected static PersistenceBackend rediscoverPersistenceBackend(PersistenceBackend actual, String context){
|
||||
protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){
|
||||
context = sanitizeContext(context);
|
||||
logger.debug("The {} for context {} is {}. "
|
||||
+ "Is time to rediscover if there is another possibility.",
|
||||
|
@ -198,18 +204,18 @@ public abstract class PersistenceBackendFactory {
|
|||
actual.getClass().getSimpleName());
|
||||
|
||||
PersistenceBackend discoveredPersistenceBackend =
|
||||
PersistenceBackendFactory.discoverPersistenceBackend(context);
|
||||
PersistenceBackendFactory.discoverPersistenceBackend(context, actual);
|
||||
|
||||
if(discoveredPersistenceBackend!=null){
|
||||
synchronized (persistenceBackends) {
|
||||
|
||||
|
||||
/*
|
||||
* Passing the aggregator to the new PersistenceBackend
|
||||
* so that the buffered records will be persisted with the
|
||||
* new method
|
||||
*
|
||||
*/
|
||||
discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
|
||||
//discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
|
||||
|
||||
persistenceBackends.put(context, discoveredPersistenceBackend);
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
package org.gcube.documentstore.records.aggregation;
|
||||
|
||||
public class AggregationConfig {
|
||||
/**
|
||||
* Define the MAX number of Record to buffer. TODO Get from configuration
|
||||
*/
|
||||
protected static final int MAX_RECORDS_NUMBER = 1000;
|
||||
/**
|
||||
* The Max amount of time elapsed form last record before after that the
|
||||
* buffered record are persisted even if TODO Get from configuration
|
||||
*/
|
||||
protected static final long OLD_RECORD_MAX_TIME_ELAPSED = 1000 * 60 * 30; // 30 min
|
||||
public static final int INITIAL_DELAY = 30;
|
||||
public static final int DELAY = 30;
|
||||
|
||||
public static AggregationConfig getDefaultConfiguration(){
|
||||
return new AggregationConfig(INITIAL_DELAY, DELAY, MAX_RECORDS_NUMBER, OLD_RECORD_MAX_TIME_ELAPSED);
|
||||
}
|
||||
|
||||
private int initialDelaySet;
|
||||
private int delaySet;
|
||||
private int maxRecordsNumberSet;
|
||||
private long oldRecordMaxTimeElapsedSet;
|
||||
|
||||
|
||||
public AggregationConfig(Integer initialDelaySet, Integer delaySet,
|
||||
int maxRecordsNumberSet, long oldRecordMaxTimeElapsedSet) {
|
||||
super();
|
||||
this.initialDelaySet = initialDelaySet;
|
||||
this.delaySet = delaySet;
|
||||
this.maxRecordsNumberSet = maxRecordsNumberSet;
|
||||
this.oldRecordMaxTimeElapsedSet = oldRecordMaxTimeElapsedSet;
|
||||
}
|
||||
|
||||
public Integer getInitialDelaySet() {
|
||||
return initialDelaySet;
|
||||
}
|
||||
public void setInitialDelaySet(int initialDelaySet) {
|
||||
this.initialDelaySet = initialDelaySet;
|
||||
}
|
||||
public Integer getDelaySet() {
|
||||
return delaySet;
|
||||
}
|
||||
public void setDelaySet(int delaySet) {
|
||||
this.delaySet = delaySet;
|
||||
}
|
||||
public int getMaxRecordsNumberSet() {
|
||||
return maxRecordsNumberSet;
|
||||
}
|
||||
public void setMaxRecordsNumberSet(int maxRecordsNumberSet) {
|
||||
this.maxRecordsNumberSet = maxRecordsNumberSet;
|
||||
}
|
||||
public long getOldRecordMaxTimeElapsedSet() {
|
||||
return oldRecordMaxTimeElapsedSet;
|
||||
}
|
||||
public void setOldRecordMaxTimeElapsedSet(long oldRecordMaxTimeElapsedSet) {
|
||||
this.oldRecordMaxTimeElapsedSet = oldRecordMaxTimeElapsedSet;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + delaySet;
|
||||
result = prime * result + initialDelaySet;
|
||||
result = prime * result + maxRecordsNumberSet;
|
||||
result = prime
|
||||
* result
|
||||
+ (int) (oldRecordMaxTimeElapsedSet ^ (oldRecordMaxTimeElapsedSet >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
AggregationConfig other = (AggregationConfig) obj;
|
||||
if (delaySet != other.delaySet)
|
||||
return false;
|
||||
if (initialDelaySet != other.initialDelaySet)
|
||||
return false;
|
||||
if (maxRecordsNumberSet != other.maxRecordsNumberSet)
|
||||
return false;
|
||||
if (oldRecordMaxTimeElapsedSet != other.oldRecordMaxTimeElapsedSet)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AggregationConfig [initialDelaySet=" + initialDelaySet
|
||||
+ ", delaySet=" + delaySet + ", maxRecordsNumberSet="
|
||||
+ maxRecordsNumberSet + ", oldRecordMaxTimeElapsedSet="
|
||||
+ oldRecordMaxTimeElapsedSet + "]";
|
||||
}
|
||||
|
||||
}
|
|
@ -1,22 +1,21 @@
|
|||
package org.gcube.documentstore.records.aggregation;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
|
||||
import org.gcube.documentstore.persistence.ExecutorUtils;
|
||||
//import org.gcube.documentstore.persistence.DefaultPersitenceExecutor;
|
||||
import org.gcube.documentstore.persistence.PersistenceBackend;
|
||||
import org.gcube.documentstore.persistence.PersistenceBackendConfiguration;
|
||||
import org.gcube.documentstore.persistence.PersistenceExecutor;
|
||||
|
@ -26,113 +25,97 @@ import org.gcube.documentstore.records.RecordUtility;
|
|||
import org.gcube.documentstore.records.implementation.ConfigurationGetPropertyValues;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* @author Luca Frosini (ISTI - CNR)
|
||||
*
|
||||
*/
|
||||
public abstract class AggregationScheduler implements Runnable {
|
||||
|
||||
|
||||
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
|
||||
|
||||
|
||||
protected int totalBufferedRecords;
|
||||
protected Map<String, List<Record>> bufferedRecords;
|
||||
|
||||
|
||||
protected final PersistenceExecutor persistenceExecutor;
|
||||
|
||||
public static final int INITIAL_DELAY = 30;
|
||||
public static Integer initialDelaySet;
|
||||
|
||||
public static final int DELAY = 30;
|
||||
public static Integer delaySet;
|
||||
|
||||
|
||||
|
||||
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
|
||||
|
||||
|
||||
public static final String AGGREGATION_SCHEDULER_TIME = "AggregationSchedulerTime";
|
||||
|
||||
|
||||
public static final String BUFFER_RECORD_TIME = "BufferRecordTime";
|
||||
public static final String BUFFER_RECORD_NUMBER = "BufferRecordNumber";
|
||||
|
||||
|
||||
|
||||
public static final Integer RANDOM_INIT_START=5;
|
||||
|
||||
/**
|
||||
* The Max amount of time for reload a configuration TODO Get from
|
||||
* The Max amount of time for reload a configuration Get from
|
||||
* configuration
|
||||
*/
|
||||
public static long TIME_RELOAD_CONFIGURATION = 1000 * 60 * 60 * 12; // 12
|
||||
|
||||
/**
|
||||
* The time for first
|
||||
*/
|
||||
public static long timeLoadConfiguration = 0L;
|
||||
|
||||
/**
|
||||
* Define the MAX number of Record to buffer. TODO Get from configuration
|
||||
*/
|
||||
protected static final int MAX_RECORDS_NUMBER = 100;
|
||||
protected static int maxRecordsNumberSet;
|
||||
public static boolean changeConfiguration = false;
|
||||
|
||||
/**
|
||||
* The Max amount of time elapsed form last record before after that the
|
||||
* buffered record are persisted even if TODO Get from configuration
|
||||
*/
|
||||
protected static final long OLD_RECORD_MAX_TIME_ELAPSED = 1000 * 60 * 30; // 30 min
|
||||
protected static long OldRecordMaxTimeElapsedSet;
|
||||
|
||||
protected ScheduledFuture<?> future = null;
|
||||
|
||||
public static long TIME_RELOAD_CONFIGURATION = 720 ; //720 minutes (12 Hours)
|
||||
|
||||
public boolean changeConfiguration = false;
|
||||
|
||||
private String name;
|
||||
|
||||
private AggregationConfig config;
|
||||
|
||||
//Schedule for flush and reload configuration
|
||||
protected ScheduledFuture<?> futureFlush = null;
|
||||
protected ScheduledFuture<?> futureReload = null;
|
||||
|
||||
public static AggregationScheduler newInstance(
|
||||
PersistenceExecutor persistenceExecutor) {
|
||||
return new BufferAggregationScheduler(persistenceExecutor);
|
||||
PersistenceExecutor persistenceExecutor, String name) {
|
||||
return new BufferAggregationScheduler(persistenceExecutor, name);
|
||||
}
|
||||
|
||||
|
||||
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor,
|
||||
PersistenceBackendConfiguration configuration)throws NumberFormatException, Exception {
|
||||
|
||||
CheckConfiguration(configuration,false);
|
||||
return new BufferAggregationScheduler(persistenceExecutor);
|
||||
PersistenceBackendConfiguration configuration, String name)throws NumberFormatException, Exception {
|
||||
AggregationConfig config = CheckConfiguration(configuration);
|
||||
BufferAggregationScheduler bas = new BufferAggregationScheduler(persistenceExecutor, config, name );
|
||||
return bas;
|
||||
}
|
||||
|
||||
protected AggregationScheduler(PersistenceExecutor persistenceExecutor) {
|
||||
|
||||
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, String name) {
|
||||
this(persistenceExecutor, AggregationConfig.getDefaultConfiguration(), name);
|
||||
|
||||
}
|
||||
|
||||
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfig config, String name) {
|
||||
this.config = config;
|
||||
this.name = name;
|
||||
|
||||
timeLoadConfiguration = Calendar.getInstance().getTimeInMillis();
|
||||
this.bufferedRecords = new HashMap<String, List<Record>>();
|
||||
this.totalBufferedRecords = 0;
|
||||
this.persistenceExecutor = persistenceExecutor;
|
||||
if (initialDelaySet == null)
|
||||
initialDelaySet = INITIAL_DELAY;
|
||||
if (delaySet == null)
|
||||
delaySet = DELAY;
|
||||
if ((initialDelaySet == 0) || (delaySet == 0)) {
|
||||
future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT);
|
||||
schedule();
|
||||
reloadConfiguration();
|
||||
}
|
||||
|
||||
private void schedule() {
|
||||
//reschedule event because change configuration
|
||||
if (futureFlush!=null)
|
||||
futureFlush.cancel(false);
|
||||
if ((config.getInitialDelaySet() == 0) || (config.getDelaySet() == 0)) {
|
||||
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT);
|
||||
}
|
||||
else{
|
||||
future = ExecutorUtils.scheduler.scheduleAtFixedRate(this,initialDelaySet, delaySet, TIME_UNIT);
|
||||
Random random = new Random();
|
||||
Integer randStart= Math.abs(random.nextInt(RANDOM_INIT_START));
|
||||
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,config.getInitialDelaySet()+randStart, config.getDelaySet(), TIME_UNIT);
|
||||
}
|
||||
logger.trace("AggregationScheduler- Thread scheduler created in {} ", this.toString());
|
||||
logger.trace("AggregationScheduler- Load configuration every {}", TIME_RELOAD_CONFIGURATION);
|
||||
logger.trace("AggregationScheduler- Aggregated for max record {}", maxRecordsNumberSet);
|
||||
logger.trace("AggregationScheduler- Aggregated for max time {}", OldRecordMaxTimeElapsedSet);
|
||||
|
||||
logger.trace("[{}] AggregationScheduler- Thread scheduler created in {} ",name, this.toString());
|
||||
logger.trace("[{}] AggregationScheduler- Load configuration every {}",name, TIME_RELOAD_CONFIGURATION);
|
||||
logger.trace("[{}] AggregationScheduler- Aggregated for max record {}", name, config.getMaxRecordsNumberSet());
|
||||
logger.trace("[{}] AggregationScheduler- Aggregated for max time {}", name, config.getOldRecordMaxTimeElapsedSet());
|
||||
}
|
||||
|
||||
private void reSchedule() {
|
||||
//reschedule event because change configuration
|
||||
future.cancel(false);
|
||||
if ((initialDelaySet == 0) || (delaySet == 0)) {
|
||||
future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT);
|
||||
}else
|
||||
future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, delaySet,delaySet, TIME_UNIT);
|
||||
changeConfiguration=false;
|
||||
logger.trace("reSchedule - Thread scheduler created in {} ", this.toString());
|
||||
logger.trace("reSchedule - Load configuration every {}", TIME_RELOAD_CONFIGURATION);
|
||||
logger.trace("reSchedule - Aggregated for max record {}", maxRecordsNumberSet);
|
||||
logger.trace("reSchedule - Aggregated for max time {}", OldRecordMaxTimeElapsedSet);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected static AggregatedRecord instantiateAggregatedRecord(Record record)
|
||||
throws Exception {
|
||||
|
||||
|
||||
String recordType = record.getRecordType();
|
||||
Class<? extends AggregatedRecord> clz = RecordUtility.getAggregatedRecordClass(recordType);
|
||||
Class[] argTypes = { record.getClass() };
|
||||
|
@ -140,11 +123,11 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
Object[] arguments = { record };
|
||||
return constructor.newInstance(arguments);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static AggregatedRecord getAggregatedRecord(Record record)
|
||||
throws Exception {
|
||||
|
||||
|
||||
AggregatedRecord aggregatedRecord;
|
||||
if (record instanceof AggregatedRecord) {
|
||||
// the record is already an aggregated version
|
||||
|
@ -152,41 +135,41 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
} else {
|
||||
aggregatedRecord = instantiateAggregatedRecord(record);
|
||||
}
|
||||
|
||||
|
||||
return aggregatedRecord;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
protected void madeAggregation(Record record) {
|
||||
|
||||
|
||||
String recordType = record.getRecordType();
|
||||
List<Record> records;
|
||||
|
||||
|
||||
if (this.bufferedRecords.containsKey(recordType)) {
|
||||
records = this.bufferedRecords.get(recordType);
|
||||
boolean found = false;
|
||||
|
||||
|
||||
for (Record bufferedRecord : records) {
|
||||
if (!(bufferedRecord instanceof AggregatedRecord)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
try {
|
||||
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
|
||||
|
||||
|
||||
if (record instanceof AggregatedRecord) {
|
||||
// TODO check compatibility using getAggregable
|
||||
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
|
||||
} else {
|
||||
bufferedAggregatedRecord.aggregate((Record) record);
|
||||
}
|
||||
logger.trace("Aggregated Record is {}",bufferedAggregatedRecord);
|
||||
logger.trace("Aggregated Record is {}",bufferedAggregatedRecord);
|
||||
found = true;
|
||||
break;
|
||||
} catch (NotAggregatableRecordsExceptions e) {
|
||||
logger.trace("{} is not usable for aggregation",bufferedRecord);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!found) {
|
||||
try {
|
||||
records.add(getAggregatedRecord(record));
|
||||
|
@ -196,7 +179,7 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
totalBufferedRecords++;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
records = new ArrayList<Record>();
|
||||
try {
|
||||
|
@ -207,41 +190,35 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
totalBufferedRecords++;
|
||||
this.bufferedRecords.put(recordType, records);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
|
||||
|
||||
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
|
||||
aggregate(null, persistenceExecutor, true);
|
||||
}
|
||||
|
||||
|
||||
protected abstract void schedulerSpecificClear();
|
||||
|
||||
|
||||
protected void clear() {
|
||||
totalBufferedRecords = 0;
|
||||
bufferedRecords.clear();
|
||||
schedulerSpecificClear();
|
||||
}
|
||||
|
||||
|
||||
protected synchronized void aggregate(Record record,
|
||||
PersistenceExecutor persistenceExecutor, boolean forceFlush)
|
||||
throws Exception {
|
||||
|
||||
|
||||
if (record != null) {
|
||||
madeAggregation(record);
|
||||
}
|
||||
if (isTimeToPersist(maxRecordsNumberSet, OldRecordMaxTimeElapsedSet)|| forceFlush) {
|
||||
if (isTimeToPersist(this.config.getMaxRecordsNumberSet(), this.config.getOldRecordMaxTimeElapsedSet())|| forceFlush) {
|
||||
reallyFlush(persistenceExecutor);
|
||||
}
|
||||
/**
|
||||
* check if reload a configuration
|
||||
*/
|
||||
long now = Calendar.getInstance().getTimeInMillis();
|
||||
if ((now - timeLoadConfiguration) >= TIME_RELOAD_CONFIGURATION) {
|
||||
reloadConfiguration();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
protected void reallyFlush(PersistenceExecutor persistenceExecutor)
|
||||
throws Exception {
|
||||
if (totalBufferedRecords == 0) {
|
||||
|
@ -256,11 +233,11 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
i++;
|
||||
}
|
||||
}
|
||||
logger.trace("reallyFlush It is time to persist buffered records {}",Arrays.toString(recordToPersist));
|
||||
logger.trace("[{}]reallyFlush It is time to persist buffered records {}",name, Arrays.toString(recordToPersist));
|
||||
persistenceExecutor.persist(recordToPersist);
|
||||
clear();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get an usage records and try to aggregate with other buffered Usage
|
||||
* Record.
|
||||
|
@ -275,38 +252,20 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
throws Exception {
|
||||
aggregate(record, persistenceExecutor, false);
|
||||
}
|
||||
|
||||
|
||||
protected abstract boolean isTimeToPersist(int maxRecordNumber,
|
||||
long oldRecordMaxTime);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* reloadConfiguration
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void reloadConfiguration() throws Exception {
|
||||
final AggregationScheduler thisAG = this;
|
||||
new Thread() {
|
||||
|
||||
public void run() {
|
||||
|
||||
PersistenceBackendConfiguration configuration=getConfiguration();
|
||||
try {
|
||||
CheckConfiguration(configuration, true);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (changeConfiguration) {
|
||||
thisAG.run();
|
||||
thisAG.reSchedule();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}.start();
|
||||
|
||||
protected void reloadConfiguration() {
|
||||
Random random = new Random();
|
||||
Integer randStart= Math.abs(random.nextInt(RANDOM_INIT_START));
|
||||
futureReload = ExecutorUtils.scheduler.scheduleAtFixedRate(new ReloaderThread(this),TIME_RELOAD_CONFIGURATION+randStart, TIME_RELOAD_CONFIGURATION, TIME_UNIT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Configuration (used from reload configuration)
|
||||
* @return
|
||||
|
@ -332,17 +291,16 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
}
|
||||
return configuration;
|
||||
}
|
||||
|
||||
|
||||
protected static void CheckConfiguration(PersistenceBackendConfiguration configuration, Boolean reload) throws IOException{
|
||||
logger.trace("CheckConfiguration reload:{}",reload);
|
||||
|
||||
|
||||
protected static AggregationConfig CheckConfiguration(PersistenceBackendConfiguration configuration) throws IOException{
|
||||
Integer delay = null;
|
||||
Integer maxRecordNumber = null;
|
||||
Integer maxRecordTime = null;
|
||||
try {
|
||||
ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
|
||||
Properties prop = properties.getPropValues();
|
||||
|
||||
|
||||
if (prop != null) {
|
||||
// get value from properties file
|
||||
logger.trace("Configuration from properties file");
|
||||
|
@ -362,7 +320,7 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
} catch (Exception e) {
|
||||
logger.trace("Configuration from properties file, not found a maxRecordTime value");
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
if (configuration != null) {
|
||||
// get value from service end point
|
||||
|
@ -387,40 +345,30 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
} catch (Exception e) {
|
||||
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",e.getLocalizedMessage()), e);
|
||||
}
|
||||
|
||||
if(reload){
|
||||
if ((delay != delaySet) ||(maxRecordNumber!=maxRecordsNumberSet)||((maxRecordTime )!=OldRecordMaxTimeElapsedSet)){
|
||||
logger.trace("reloadConfiguration delay/delaySet:{}/{},maxRecordNumber/maxRecordsNumberSet:{}/{}),maxRecordTime/OldRecordMaxTimeElapsedSet:{}/{}"
|
||||
,delay,delaySet,
|
||||
maxRecordNumber,maxRecordsNumberSet,
|
||||
maxRecordTime,OldRecordMaxTimeElapsedSet);
|
||||
changeConfiguration = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
AggregationConfig config = AggregationConfig.getDefaultConfiguration();
|
||||
if (delay != null) {
|
||||
delaySet = delay;
|
||||
initialDelaySet = delay;
|
||||
} else {
|
||||
delaySet = DELAY;
|
||||
initialDelaySet = INITIAL_DELAY;
|
||||
}
|
||||
|
||||
config.setDelaySet(delay);
|
||||
config.setInitialDelaySet(delay);
|
||||
}
|
||||
|
||||
if (maxRecordNumber != null) {
|
||||
maxRecordsNumberSet = maxRecordNumber;
|
||||
} else {
|
||||
maxRecordsNumberSet = MAX_RECORDS_NUMBER;
|
||||
}
|
||||
config.setMaxRecordsNumberSet(maxRecordNumber);
|
||||
}
|
||||
if (maxRecordTime != null) {
|
||||
OldRecordMaxTimeElapsedSet = maxRecordTime;
|
||||
} else {
|
||||
OldRecordMaxTimeElapsedSet = OLD_RECORD_MAX_TIME_ELAPSED;
|
||||
}
|
||||
|
||||
timeLoadConfiguration = Calendar.getInstance().getTimeInMillis();
|
||||
config.setOldRecordMaxTimeElapsedSet(maxRecordTime);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public AggregationConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
public void setConfig(AggregationConfig newConfig) {
|
||||
this.config = newConfig;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
@ -433,7 +381,43 @@ public abstract class AggregationScheduler implements Runnable {
|
|||
} catch (Exception e) {
|
||||
logger.error("Error flushing Buffered Records", e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public class ReloaderThread extends Thread {
|
||||
|
||||
private AggregationScheduler agScheduler;
|
||||
|
||||
public ReloaderThread(AggregationScheduler agScheduler) {
|
||||
super();
|
||||
this.agScheduler = agScheduler;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
logger.trace("[{}] reloadConfiguration",agScheduler.name );
|
||||
PersistenceBackendConfiguration configuration=getConfiguration();
|
||||
try {
|
||||
AggregationConfig agConf = CheckConfiguration(configuration);
|
||||
if (!agScheduler.config.equals(agConf)) {
|
||||
logger.trace("[{}] reloadConfiguration changeConfiguration "
|
||||
+ "old config:{} newconfig:{}",agScheduler.name,agScheduler.config.toString(),agConf.toString());
|
||||
agScheduler.setConfig(agConf);
|
||||
agScheduler.run();
|
||||
agScheduler.schedule();
|
||||
}
|
||||
else{
|
||||
logger.trace("[{}] reloadConfiguration no changeConfiguration",agScheduler.name );
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("error retrieving configuration",e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
futureReload.cancel(false);
|
||||
this.run();
|
||||
futureFlush.cancel(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,11 +19,16 @@ public class BufferAggregationScheduler extends AggregationScheduler {
|
|||
protected long firstBufferedTime;
|
||||
|
||||
|
||||
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){
|
||||
super(persistenceExecutor);
|
||||
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor, String name){
|
||||
super(persistenceExecutor, name);
|
||||
this.firstOfBuffer = true;
|
||||
}
|
||||
|
||||
|
||||
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfig config, String name){
|
||||
super(persistenceExecutor, config, name);
|
||||
this.firstOfBuffer = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void schedulerSpecificClear(){
|
||||
firstOfBuffer = true;
|
||||
|
|
|
@ -10,17 +10,19 @@ import org.slf4j.LoggerFactory;
|
|||
public class ConfigurationGetPropertyValues {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConfigurationGetPropertyValues.class);
|
||||
|
||||
public Properties getPropValues() throws IOException {
|
||||
Properties prop = null;
|
||||
String propFileName = "./config/accounting.properties";
|
||||
logger.trace("property file search"+propFileName);
|
||||
|
||||
try (FileInputStream inputStream= new FileInputStream(propFileName)){
|
||||
if (inputStream != null) {
|
||||
prop=new Properties();
|
||||
prop.load(inputStream);
|
||||
} else
|
||||
logger.trace("property file not Found"+propFileName);
|
||||
} else {
|
||||
logger.trace("property file not Found"+propFileName);
|
||||
}
|
||||
|
||||
}catch (Exception e) {
|
||||
logger.trace("property file error on input stream"+e.getLocalizedMessage());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue