document-store-lib/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java

107 lines
3.2 KiB
Java

package org.gcube.documentstore.persistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExecutorUtils {
private static final Logger logger = LoggerFactory.getLogger(ExecutorUtils.class);
public static final ScheduledExecutorService PERSISTENCE_BACKEND_REDISCOVERY_POOL;
public static final ScheduledExecutorService CONFIGURATION_REDISCOVERY_POOL;
public static final ScheduledExecutorService FUTURE_FLUSH_POOL;
public static final ScheduledExecutorService FALLBACK_ELABORATOR_POOL;
public static final ExecutorService ASYNC_AGGREGATION_POOL;
static {
PERSISTENCE_BACKEND_REDISCOVERY_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "PersistenceBackendRediscoveryThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
CONFIGURATION_REDISCOVERY_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "ConfigurationRediscoveryThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
FUTURE_FLUSH_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "FlushThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
FALLBACK_ELABORATOR_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "FallbackElaboratorThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
ASYNC_AGGREGATION_POOL = Executors.newFixedThreadPool(30, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AsyncAggregationThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
}
public static void shutDownPool(ExecutorService pool) {
//shutdown the persitentBackendRediscoveryThread
pool.shutdown();
try {
if(!pool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
logger.error("Unable to shutdown the pool {}", pool);
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
public static void shutDownAll() {
shutDownPool(ExecutorUtils.PERSISTENCE_BACKEND_REDISCOVERY_POOL);
shutDownPool(ExecutorUtils.CONFIGURATION_REDISCOVERY_POOL);
shutDownPool(ExecutorUtils.FUTURE_FLUSH_POOL);
shutDownPool(ExecutorUtils.FALLBACK_ELABORATOR_POOL);
shutDownPool(ExecutorUtils.ASYNC_AGGREGATION_POOL);
}
}