Refs #12184: Create different pools for Scheduled Thread in accounting stack

Task-Url: https://support.d4science.org/issues/12184

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@169818 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2018-07-18 14:19:47 +00:00
parent e97003c74a
commit a01fb52fe8
6 changed files with 82 additions and 18 deletions

View File

@ -7,25 +7,63 @@ import java.util.concurrent.ThreadFactory;
public class ExecutorUtils {
public static final ScheduledExecutorService scheduler;
public static final ScheduledExecutorService PERSISTENCE_BACKEND_REDISCOVERY_POOL;
public static final ScheduledExecutorService CONFIGURATION_REDISCOVERY_POOL;
public static final ExecutorService threadPool;
public static final ScheduledExecutorService FUTURE_FLUSH_POOL;
public static final ScheduledExecutorService FALLBACK_REDISCOVERY_POOL;
public static final ExecutorService ASYNC_AGGREGATION_POOL;
static {
scheduler = Executors.newScheduledThreadPool(50, new ThreadFactory() {
PERSISTENCE_BACKEND_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingScheduledThread";
private static final String prefix = "PersistenceBackendRediscoveryThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
threadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {
CONFIGURATION_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingAggregationThread";
private static final String prefix = "ConfigurationRediscoveryThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
FUTURE_FLUSH_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "FlushThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
FALLBACK_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "FallbackRediscoveryThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
ASYNC_AGGREGATION_POOL = Executors.newFixedThreadPool(100, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AsyncAggregationThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);

View File

@ -40,7 +40,7 @@ public class FallbackMonitor implements Runnable {
public FallbackMonitor(PersistenceBackend persistenceBackend, boolean schedule){
this.persistenceBackend = persistenceBackend;
if(schedule){
ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
ExecutorUtils.FALLBACK_REDISCOVERY_POOL.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
}
}

View File

@ -284,7 +284,7 @@ public abstract class PersistenceBackend {
accountValidateAggregate(record, true, true);
}
};
ExecutorUtils.threadPool.execute(runnable);
ExecutorUtils.ASYNC_AGGREGATION_POOL.execute(runnable);
}
/**

View File

@ -332,22 +332,48 @@ public abstract class PersistenceBackendFactory {
}
}
//shutdown the scheduler
ExecutorUtils.scheduler.shutdown();
//shutdown the persitentBackendRediscoveryThread
ExecutorUtils.PERSISTENCE_BACKEND_REDISCOVERY_POOL.shutdown();
try {
ExecutorUtils.scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ExecutorUtils.PERSISTENCE_BACKEND_REDISCOVERY_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the scheduler", e);
}
//shutdown the threadPool
ExecutorUtils.threadPool.shutdown();
//shutdown the configurationRediscoveryThread
ExecutorUtils.CONFIGURATION_REDISCOVERY_POOL.shutdown();
try {
ExecutorUtils.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ExecutorUtils.CONFIGURATION_REDISCOVERY_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
//shutdown the configurationRediscoveryThread
ExecutorUtils.FUTURE_FLUSH_POOL.shutdown();
try {
ExecutorUtils.FUTURE_FLUSH_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
ExecutorUtils.FALLBACK_REDISCOVERY_POOL.shutdown();
try {
ExecutorUtils.FALLBACK_REDISCOVERY_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
//shutdown the threadPool
ExecutorUtils.ASYNC_AGGREGATION_POOL.shutdown();
try {
ExecutorUtils.ASYNC_AGGREGATION_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
}
}

View File

@ -29,7 +29,7 @@ class PersistenceBackendRediscover implements Runnable {
long initialDelay, long delay, TimeUnit timeUnit){
this.context = context;
this.fallbackPersistenceBackend = fallbackPersistenceBackend;
scheduledThread = ExecutorUtils.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
scheduledThread = ExecutorUtils.PERSISTENCE_BACKEND_REDISCOVERY_POOL.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
}
@Override

View File

@ -103,7 +103,7 @@ public abstract class AggregationScheduler implements Runnable {
if (futureFlush != null) {
futureFlush.cancel(false);
}
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this, config.getInitialDelay(), config.getDelay(), AggregationConfiguration.TIME_UNIT);
futureFlush = ExecutorUtils.FUTURE_FLUSH_POOL.scheduleAtFixedRate(this, config.getInitialDelay(), config.getDelay(), AggregationConfiguration.TIME_UNIT);
}
@SuppressWarnings("rawtypes")
@ -249,7 +249,7 @@ public abstract class AggregationScheduler implements Runnable {
protected void reloadConfiguration() {
Random random = new Random();
Integer randStart = Math.abs(random.nextInt(RANDOM_INIT_START));
futureReload = ExecutorUtils.scheduler.scheduleAtFixedRate(new ReloaderThread(this),
futureReload = ExecutorUtils.CONFIGURATION_REDISCOVERY_POOL.scheduleAtFixedRate(new ReloaderThread(this),
TIME_RELOAD_CONFIGURATION + randStart, TIME_RELOAD_CONFIGURATION, TimeUnit.MINUTES);
}