Add new method shutdown

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@144456 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2017-03-01 10:24:34 +00:00
parent b73f53cd50
commit 5c3fdbbe6b
3 changed files with 82 additions and 44 deletions

View File

@ -9,7 +9,7 @@
<groupId>org.gcube.data.publishing</groupId> <groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId> <artifactId>document-store-lib</artifactId>
<version>1.5.0-SNAPSHOT</version> <version>1.6.0-SNAPSHOT</version>
<name>Document Store Lib</name> <name>Document Store Lib</name>
<description>Allow to persist data in NoSQL Document Store Databases. <description>Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically. Discover Model dynamically.

View File

@ -27,4 +27,6 @@ public class ExecutorUtils {
} }
}); });
} }

View File

@ -21,23 +21,23 @@ import org.slf4j.LoggerFactory;
* *
*/ */
public abstract class PersistenceBackendFactory { public abstract class PersistenceBackendFactory {
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class); private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class);
public static final String DEFAULT_CONTEXT = "DEFAULT_CONTEXT"; public static final String DEFAULT_CONTEXT = "DEFAULT_CONTEXT";
public final static String HOME_SYSTEM_PROPERTY = "user.home"; public final static String HOME_SYSTEM_PROPERTY = "user.home";
protected static final String FALLBACK_FILENAME = "fallback.log"; protected static final String FALLBACK_FILENAME = "fallback.log";
private static String fallbackLocation; private static String fallbackLocation;
private static Map<String, PersistenceBackend> persistenceBackends; private static Map<String, PersistenceBackend> persistenceBackends;
private static Map<String, Boolean> forceImmediateRediscoveries; private static Map<String, Boolean> forceImmediateRediscoveries;
public static final long INITIAL_DELAY = 1000; // 1 min public static final long INITIAL_DELAY = 1000; // 1 min
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
static { static {
persistenceBackends = new HashMap<String, PersistenceBackend>(); persistenceBackends = new HashMap<String, PersistenceBackend>();
forceImmediateRediscoveries = new HashMap<>(); forceImmediateRediscoveries = new HashMap<>();
@ -46,7 +46,7 @@ public abstract class PersistenceBackendFactory {
public static void forceImmediateRediscovery(String context){ public static void forceImmediateRediscovery(String context){
forceImmediateRediscoveries.put(context, new Boolean(true)); forceImmediateRediscoveries.put(context, new Boolean(true));
} }
public static Boolean getForceImmediateRediscovery(String context){ public static Boolean getForceImmediateRediscovery(String context){
Boolean force = forceImmediateRediscoveries.get(context); Boolean force = forceImmediateRediscoveries.get(context);
if(force==null){ if(force==null){
@ -54,11 +54,11 @@ public abstract class PersistenceBackendFactory {
} }
return force; return force;
} }
public static void addRecordPackage(Package packageObject) { public static void addRecordPackage(Package packageObject) {
RecordUtility.addRecordPackage(packageObject); RecordUtility.addRecordPackage(packageObject);
} }
private static File file(File file) throws IllegalArgumentException { private static File file(File file) throws IllegalArgumentException {
if(!file.isDirectory()){ if(!file.isDirectory()){
file = file.getParentFile(); file = file.getParentFile();
@ -69,7 +69,7 @@ public abstract class PersistenceBackendFactory {
} }
return file; return file;
} }
public synchronized static void setFallbackLocation(String path){ public synchronized static void setFallbackLocation(String path){
if(fallbackLocation == null){ if(fallbackLocation == null){
if(path==null){ if(path==null){
@ -79,7 +79,7 @@ public abstract class PersistenceBackendFactory {
fallbackLocation = path; fallbackLocation = path;
} }
} }
protected synchronized static String getFallbackLocation(){ protected synchronized static String getFallbackLocation(){
if(fallbackLocation==null){ if(fallbackLocation==null){
try { try {
@ -90,18 +90,18 @@ public abstract class PersistenceBackendFactory {
} }
return fallbackLocation; return fallbackLocation;
} }
protected static String sanitizeContext(final String context){ protected static String sanitizeContext(final String context){
if(context==null || context.compareTo("")==0){ if(context==null || context.compareTo("")==0){
return DEFAULT_CONTEXT; return DEFAULT_CONTEXT;
} }
return context; return context;
} }
protected static String removeSlashFromContext(String context){ protected static String removeSlashFromContext(String context){
return context.replace("/", "_"); return context.replace("/", "_");
} }
public static File getFallbackFile(String context){ public static File getFallbackFile(String context){
context = sanitizeContext(context); context = sanitizeContext(context);
String slashLessContext = removeSlashFromContext(context); String slashLessContext = removeSlashFromContext(context);
@ -109,7 +109,7 @@ public abstract class PersistenceBackendFactory {
File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME)); File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME));
return fallbackFile; return fallbackFile;
} }
protected static FallbackPersistenceBackend createFallback(String context){ protected static FallbackPersistenceBackend createFallback(String context){
context = sanitizeContext(context); context = sanitizeContext(context);
logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context); logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context);
@ -120,7 +120,7 @@ public abstract class PersistenceBackendFactory {
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence))); //fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence; return fallbackPersistence;
} }
protected static PersistenceBackend discoverPersistenceBackend(String context, FallbackPersistenceBackend fallback){ protected static PersistenceBackend discoverPersistenceBackend(String context, FallbackPersistenceBackend fallback){
context = sanitizeContext(context); context = sanitizeContext(context);
logger.debug("Discovering {} for scope {}", logger.debug("Discovering {} for scope {}",
@ -131,17 +131,17 @@ public abstract class PersistenceBackendFactory {
try { try {
String foundClassName = foundClass.getSimpleName(); String foundClassName = foundClass.getSimpleName();
logger.trace("Testing {}", foundClassName); logger.trace("Testing {}", foundClassName);
PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass); PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){ if(configuration==null){
continue; continue;
} }
found.prepareConnection(configuration); found.prepareConnection(configuration);
found.setOpen(); found.setOpen();
logger.trace("{} will be used.", foundClassName); logger.trace("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK")); found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK"));
if (fallback!=null) if (fallback!=null)
found.setFallback(fallback); found.setFallback(fallback);
@ -155,13 +155,13 @@ public abstract class PersistenceBackendFactory {
} }
return null; return null;
}; };
public static PersistenceBackend getPersistenceBackend(String context) { public static PersistenceBackend getPersistenceBackend(String context) {
context = sanitizeContext(context); context = sanitizeContext(context);
Boolean forceImmediateRediscovery = getForceImmediateRediscovery(context); Boolean forceImmediateRediscovery = getForceImmediateRediscovery(context);
PersistenceBackend persistence = null; PersistenceBackend persistence = null;
logger.trace("Going to synchronized block in getPersistenceBackend"); logger.trace("Going to synchronized block in getPersistenceBackend");
synchronized (persistenceBackends) { synchronized (persistenceBackends) {
@ -176,10 +176,10 @@ public abstract class PersistenceBackendFactory {
*/ */
persistence = createFallback(context); persistence = createFallback(context);
persistenceBackends.put(context, persistence); persistenceBackends.put(context, persistence);
if(forceImmediateRediscovery){ if(forceImmediateRediscovery){
PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence); PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence);
if (p!=null){ if (p!=null){
persistence=p; persistence=p;
persistenceBackends.put(context, persistence); persistenceBackends.put(context, persistence);
@ -191,24 +191,24 @@ public abstract class PersistenceBackendFactory {
} }
} }
} }
return persistence; return persistence;
} }
protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){ protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){
context = sanitizeContext(context); context = sanitizeContext(context);
logger.debug("The {} for context {} is {}. " logger.debug("The {} for context {} is {}. "
+ "Is time to rediscover if there is another possibility.", + "Is time to rediscover if there is another possibility.",
PersistenceBackend.class.getSimpleName(), context, PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName()); actual.getClass().getSimpleName());
PersistenceBackend discoveredPersistenceBackend = PersistenceBackend discoveredPersistenceBackend =
PersistenceBackendFactory.discoverPersistenceBackend(context, actual); PersistenceBackendFactory.discoverPersistenceBackend(context, actual);
if(discoveredPersistenceBackend!=null){ if(discoveredPersistenceBackend!=null){
synchronized (persistenceBackends) { synchronized (persistenceBackends) {
/* /*
* Passing the aggregator to the new PersistenceBackend * Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the * so that the buffered records will be persisted with the
@ -217,7 +217,7 @@ public abstract class PersistenceBackendFactory {
*/ */
//discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler()); //discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
persistenceBackends.put(context, discoveredPersistenceBackend); persistenceBackends.put(context, discoveredPersistenceBackend);
/* /*
* Not needed because close has no effect. Removed to * Not needed because close has no effect. Removed to
* prevent problem in cases of future changes. * prevent problem in cases of future changes.
@ -234,11 +234,11 @@ public abstract class PersistenceBackendFactory {
} }
} }
return actual; return actual;
} }
/** /**
* Not used * Not used
* @param persistenceBackend * @param persistenceBackend
@ -246,7 +246,7 @@ public abstract class PersistenceBackendFactory {
protected synchronized static void renew(PersistenceBackend persistenceBackend){ protected synchronized static void renew(PersistenceBackend persistenceBackend){
String context = null; String context = null;
logger.trace("Renew a configuration : {}", PersistenceBackend.class.getSimpleName()); logger.trace("Renew a configuration : {}", PersistenceBackend.class.getSimpleName());
Set<Entry<String, PersistenceBackend>> entrySet = persistenceBackends.entrySet(); Set<Entry<String, PersistenceBackend>> entrySet = persistenceBackends.entrySet();
for(Entry<String, PersistenceBackend> entry : entrySet){ for(Entry<String, PersistenceBackend> entry : entrySet){
if(entry.getValue() == persistenceBackend){ if(entry.getValue() == persistenceBackend){
@ -269,16 +269,16 @@ public abstract class PersistenceBackendFactory {
fallbackPersistenceBackend, INITIAL_DELAY, fallbackPersistenceBackend, INITIAL_DELAY,
FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS); FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS);
} }
public static void flush(String context, long timeout, TimeUnit timeUnit){ public static void flush(String context, long timeout, TimeUnit timeUnit){
context = sanitizeContext(context); context = sanitizeContext(context);
PersistenceBackend apb; PersistenceBackend apb;
synchronized (persistenceBackends) { synchronized (persistenceBackends) {
apb = persistenceBackends.get(context); apb = persistenceBackends.get(context);
} }
try { try {
logger.debug("Flushing records in context {}", context); logger.debug("Flushing records in context {}", context);
apb.flush(timeout, timeUnit); apb.flush(timeout, timeUnit);
@ -297,6 +297,42 @@ public abstract class PersistenceBackendFactory {
flush(context, timeout, timeUnit); flush(context, timeout, timeUnit);
} }
} }
/**
*
*/
public static void shutdown() {
//shutdown the scheduler
ExecutorUtils.scheduler.shutdown();
try {
ExecutorUtils.scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the scheduler", e);
}
//shutdown the threadPool
ExecutorUtils.threadPool.shutdown();
try {
ExecutorUtils.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
//disconnect the persistence and clean
for(String context : persistenceBackends.keySet()){
context = sanitizeContext(context);
PersistenceBackend apb;
synchronized (persistenceBackends) {
apb = persistenceBackends.get(context);
}
try {
logger.debug("Flushing records in context {}", context);
apb.closeAndClean();
}catch(Exception e){
logger.error("Unable to flush records in context {} with {}", context, apb, e);
}
}
}
} }