This commit is contained in:
Alessandro Pieve 2016-07-06 07:56:12 +00:00
parent 1409bcffcd
commit 757c589f4c
10 changed files with 88 additions and 50 deletions

View File

@ -0,0 +1,32 @@
package org.gcube.documentstore.persistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
public class ExecutorUtils {
public static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingScheduledThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
public static ExecutorService threadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingAggregationThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
}

View File

@ -4,8 +4,6 @@
package org.gcube.documentstore.persistence;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.InvalidValueException;
@ -26,13 +24,8 @@ public abstract class PersistenceBackend {
protected PersistenceBackendMonitor persistenceBackendMonitor;
/**
* Pool for thread execution
*/
private ExecutorService pool;
protected PersistenceBackend(){
this.pool = Executors.newCachedThreadPool();
if(!(this instanceof FallbackPersistenceBackend)){
this.persistenceBackendMonitor = new PersistenceBackendMonitor(this);
}
@ -97,14 +90,14 @@ public abstract class PersistenceBackend {
try {
logger.trace("Going to account {} using {} : {}", record, persistenceName, this);
this.reallyAccount(record);
logger.debug("{} accounted succesfully from {}.", record.toString(), persistenceName);
logger.trace("{} accounted succesfully from {}.", record.toString(), persistenceName);
} catch (Exception e) {
try {
String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
record.toString(), persistenceName, fallabackPersistenceName, e);
fallbackPersistence.reallyAccount(record);
logger.debug("{} accounted succesfully from {}",
logger.trace("{} accounted succesfully from {}",
record.toString(), fallabackPersistenceName);
}catch(Exception ex){
logger.error("{} was not accounted at all", record.toString(), e);
@ -116,7 +109,7 @@ public abstract class PersistenceBackend {
protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){
try {
logger.debug("Received {} to account : {}", record.getClass().getSimpleName(), record);
logger.trace("Received {} to account : {}", record.getClass().getSimpleName(), record);
if(validate){
record.validate();
logger.trace("{} {} valid", record.getClass().getSimpleName(), record);
@ -154,12 +147,12 @@ public abstract class PersistenceBackend {
accountValidateAggregate(record, true, true);
}
};
pool.execute(runnable);
//pool.execute(runnable);
ExecutorUtils.threadPool.execute(runnable);
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
pool.awaitTermination(timeout, timeUnit);
//pool.awaitTermination(timeout, timeUnit);
aggregationScheduler.flush(new DefaultPersitenceExecutor(this));
}

View File

@ -29,7 +29,7 @@ public abstract class PersistenceBackendConfiguration {
for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) {
Class<? extends PersistenceBackendConfiguration> configClass = foundConfiguration.getClass();
String foundConfigurationClassName = configClass.getSimpleName();
logger.debug("{} will be used.", foundConfigurationClassName);
logger.trace("{} will be used.", foundConfigurationClassName);
return foundConfiguration;
}
@ -42,7 +42,7 @@ public abstract class PersistenceBackendConfiguration {
try {
Class<? extends PersistenceBackendConfiguration> configClass = foundConfiguration.getClass();
String foundConfigurationClassName = configClass.getSimpleName();
logger.debug("Testing {}", foundConfigurationClassName);
logger.trace("Testing {}", foundConfigurationClassName);
@SuppressWarnings("rawtypes")
Class[] configArgTypes = { Class.class };
@ -50,7 +50,7 @@ public abstract class PersistenceBackendConfiguration {
Object[] configArguments = {clz};
PersistenceBackendConfiguration configuration = configurationConstructor.newInstance(configArguments);
logger.debug("{} will be used.", foundConfigurationClassName);
logger.trace("{} will be used.", foundConfigurationClassName);
return configuration;
} catch (Exception e) {

View File

@ -102,8 +102,6 @@ public abstract class PersistenceBackendFactory {
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
//Modify for configuration aggregation
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence;
}
@ -116,7 +114,7 @@ public abstract class PersistenceBackendFactory {
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.debug("Testing {}", foundClassName);
logger.trace("Testing {}", foundClassName);
PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){
@ -124,7 +122,7 @@ public abstract class PersistenceBackendFactory {
}
found.prepareConnection(configuration);
logger.debug("{} will be used.", foundClassName);
logger.trace("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration));
found.setFallback(createFallback(context));
@ -142,10 +140,10 @@ public abstract class PersistenceBackendFactory {
context = sanitizeContext(context);
PersistenceBackend persistence = null;
logger.debug("Going to synchronized block in getPersistenceBackend");
logger.trace("Going to synchronized block in getPersistenceBackend");
synchronized (persistenceBackends) {
persistence = persistenceBackends.get(context);
logger.debug("{} {}", PersistenceBackend.class.getSimpleName(), persistence);
logger.trace("{} {}", PersistenceBackend.class.getSimpleName(), persistence);
if(persistence==null){
/*
* Setting FallbackPersistence and unlocking.

View File

@ -9,8 +9,6 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.Record;
@ -29,7 +27,7 @@ class PersistenceBackendMonitor implements Runnable {
private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION";
private final static String ELABORATION_FILE_NOT_DELETED_SUFFIX = ".ELABORATION.NOT-DELETED";
protected final ScheduledExecutorService scheduler;
//protected final ScheduledExecutorService scheduler;
protected final PersistenceBackend persistenceBackend;
@ -39,8 +37,9 @@ class PersistenceBackendMonitor implements Runnable {
public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){
this.persistenceBackend = persistenceBackend;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
//this.scheduler = Executors.newSingleThreadScheduledExecutor();
//this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
}
protected void elaborateFile(File elaborationFile){
@ -102,7 +101,7 @@ class PersistenceBackendMonitor implements Runnable {
elaborateFile(elaborationFile);
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath());
logger.trace("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
}

View File

@ -3,8 +3,7 @@
*/
package org.gcube.documentstore.persistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -18,35 +17,39 @@ class PersistenceBackendRediscover implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendRediscover.class);
protected final ScheduledExecutorService scheduler;
//protected final ScheduledExecutorService scheduler;
protected final String context;
protected final FallbackPersistenceBackend fallbackPersistenceBackend;
private ScheduledFuture<?> scheduledThread;
public PersistenceBackendRediscover(String context,
FallbackPersistenceBackend fallbackPersistenceBackend,
long initialDelay, long delay, TimeUnit timeUnit){
this.context = context;
this.fallbackPersistenceBackend = fallbackPersistenceBackend;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
//this.scheduler = Executors.newSingleThreadScheduledExecutor();
//this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
scheduledThread = ExecutorUtils.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
}
@Override
public void run() {
logger.debug("Going to rediscover {}", PersistenceBackend.class.getSimpleName());
logger.trace("Going to rediscover {}", PersistenceBackend.class.getSimpleName());
PersistenceBackend rediscovered = PersistenceBackendFactory.
rediscoverPersistenceBackend(fallbackPersistenceBackend, context);
if(rediscovered!=fallbackPersistenceBackend){
logger.debug("Another {} was found : {}. "
logger.trace("Another {} was found : {}. "
+ "Shutting down {} Thread for context {}",
PersistenceBackend.class.getSimpleName(),
rediscovered.getClass().getSimpleName(),
PersistenceBackendRediscover.class.getSimpleName(),
context);
scheduler.shutdown();
//scheduler.shutdown();
scheduledThread.cancel(true);
}else{
logger.debug("{} for contaxt {} is still a {}. We will see if next time we will be more lucky.",
logger.trace("{} for contaxt {} is still a {}. We will see if next time we will be more lucky.",
PersistenceBackend.class.getSimpleName(),
context,
FallbackPersistenceBackend.class.getSimpleName());

View File

@ -149,6 +149,9 @@ public class RecordUtility {
public static Class<? extends AggregatedRecord<?,?>> getAggregatedRecordClass(String recordType) throws ClassNotFoundException {
if(getAggregatedRecordClassesFound().containsKey(recordType)){
logger.trace("record type {},getAggregatedRecordClassesFound {}",recordType,getAggregatedRecordClassesFound(),getAggregatedRecordClassesFound().get(recordType));
return getAggregatedRecordClassesFound().get(recordType);
@ -160,7 +163,7 @@ public class RecordUtility {
logger.trace("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound());
logger.debug("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound());
throw new ClassNotFoundException();
}

View File

@ -10,11 +10,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.persistence.ExecutorUtils;
//import org.gcube.documentstore.persistence.DefaultPersitenceExecutor;
import org.gcube.documentstore.persistence.PersistenceBackend;
import org.gcube.documentstore.persistence.PersistenceBackendConfiguration;
@ -38,7 +38,8 @@ public abstract class AggregationScheduler implements Runnable {
protected Map<String, List<Record>> bufferedRecords;
protected final PersistenceExecutor persistenceExecutor;
protected final ScheduledExecutorService scheduler;
//protected final ScheduledExecutorService scheduler;
public static int INITIAL_DELAY = 30;
public static int DELAY = 30;
@ -54,7 +55,7 @@ public abstract class AggregationScheduler implements Runnable {
* TODO Get from configuration
*/
public static long TIME_RELOAD_CONFIGURATION =1000*60*60*12; // 12 hour
/**
* The time for first
@ -93,7 +94,8 @@ public abstract class AggregationScheduler implements Runnable {
Integer delay=0;
Integer maxRecordNumber=0;
Integer maxRecordTime=0;
if (prop==null){
//get value from service end point
logger.trace("Configuration from service end point");
@ -120,7 +122,10 @@ public abstract class AggregationScheduler implements Runnable {
OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60;
TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis();
logger.trace("Start Instance for time load configuration {}", TIME_LOAD_CONFIGURATION);
return new BufferAggregationScheduler(persistenceExecutor);
}
@ -129,8 +134,11 @@ public abstract class AggregationScheduler implements Runnable {
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceExecutor = persistenceExecutor;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
//this.scheduler = Executors.newScheduledThreadPool(1);
//this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
ScheduledFuture<?> future =ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
logger.trace("Thread scheduler created in {} ", this.toString());
logger.trace("Reload configuration every {}", TIME_RELOAD_CONFIGURATION);
logger.trace("Aggregated for max record {}", MAX_RECORDS_NUMBER);
logger.trace("Aggregated for max time {}", OLD_RECORD_MAX_TIME_ELAPSED);
@ -238,6 +246,7 @@ public abstract class AggregationScheduler implements Runnable {
}
if(isTimeToPersist( MAX_RECORDS_NUMBER , OLD_RECORD_MAX_TIME_ELAPSED ) || forceFlush){
reallyFlush(persistenceExecutor);
}
/**
@ -276,7 +285,7 @@ public abstract class AggregationScheduler implements Runnable {
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.debug("Testing {}", foundClassName);
logger.trace("Testing {}", foundClassName);
configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){
@ -371,7 +380,7 @@ public abstract class AggregationScheduler implements Runnable {
try {
this.flush(persistenceExecutor);
} catch (Exception e) {
logger.error("Error flushing Buffered Records");
logger.error("Error flushing Buffered Records",e);
}
}

View File

@ -18,6 +18,7 @@ public class BufferAggregationScheduler extends AggregationScheduler {
protected boolean firstOfBuffer;
protected long firstBufferedTime;
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){
super(persistenceExecutor);
this.firstOfBuffer = true;
@ -34,12 +35,13 @@ public class BufferAggregationScheduler extends AggregationScheduler {
*/
@Override
protected boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime) {
long now = Calendar.getInstance().getTimeInMillis();
if(firstOfBuffer){
firstOfBuffer = false;
firstBufferedTime = now;
}
if(totalBufferedRecords >= maxRecordNumber){
logger.trace("Time persist from maxRecordNumber:"+maxRecordNumber+" max totalBufferedRecords:"+totalBufferedRecords);
return true;

View File

@ -2,7 +2,6 @@ package org.gcube.documentstore.records.implementation;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.slf4j.Logger;