From 5c3fdbbe6b85bc024ef3091ba90426bba402ae4b Mon Sep 17 00:00:00 2001 From: Alessandro Pieve Date: Wed, 1 Mar 2017 10:24:34 +0000 Subject: [PATCH] Add new method shutdown git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@144456 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 2 +- .../persistence/ExecutorUtils.java | 2 + .../PersistenceBackendFactory.java | 122 ++++++++++++------ 3 files changed, 82 insertions(+), 44 deletions(-) diff --git a/pom.xml b/pom.xml index 76de238..34693ad 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.gcube.data.publishing document-store-lib - 1.5.0-SNAPSHOT + 1.6.0-SNAPSHOT Document Store Lib Allow to persist data in NoSQL Document Store Databases. Discover Model dynamically. diff --git a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java index d6f6abd..7dd7f77 100644 --- a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java +++ b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java @@ -27,4 +27,6 @@ public class ExecutorUtils { } }); + + } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java index 6e3a92c..95f657c 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -21,23 +21,23 @@ import org.slf4j.LoggerFactory; * */ public abstract class PersistenceBackendFactory { - + private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class); - + public static final String DEFAULT_CONTEXT = "DEFAULT_CONTEXT"; - + public final static String HOME_SYSTEM_PROPERTY = "user.home"; - + protected static final String FALLBACK_FILENAME = "fallback.log"; - + private static String fallbackLocation; private static Map persistenceBackends; private static Map forceImmediateRediscoveries; - + public static final long INITIAL_DELAY = 1000; // 1 min public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min - + static { persistenceBackends = new HashMap(); forceImmediateRediscoveries = new HashMap<>(); @@ -46,7 +46,7 @@ public abstract class PersistenceBackendFactory { public static void forceImmediateRediscovery(String context){ forceImmediateRediscoveries.put(context, new Boolean(true)); } - + public static Boolean getForceImmediateRediscovery(String context){ Boolean force = forceImmediateRediscoveries.get(context); if(force==null){ @@ -54,11 +54,11 @@ public abstract class PersistenceBackendFactory { } return force; } - + public static void addRecordPackage(Package packageObject) { RecordUtility.addRecordPackage(packageObject); } - + private static File file(File file) throws IllegalArgumentException { if(!file.isDirectory()){ file = file.getParentFile(); @@ -69,7 +69,7 @@ public abstract class PersistenceBackendFactory { } return file; } - + public synchronized static void setFallbackLocation(String path){ if(fallbackLocation == null){ if(path==null){ @@ -79,7 +79,7 @@ public abstract class PersistenceBackendFactory { fallbackLocation = path; } } - + protected synchronized static String getFallbackLocation(){ if(fallbackLocation==null){ try { @@ -90,18 +90,18 @@ public abstract class PersistenceBackendFactory { } return fallbackLocation; } - + protected static String sanitizeContext(final String context){ if(context==null || context.compareTo("")==0){ return DEFAULT_CONTEXT; } return context; } - + protected static String removeSlashFromContext(String context){ return context.replace("/", "_"); } - + public static File getFallbackFile(String context){ context = sanitizeContext(context); String slashLessContext = removeSlashFromContext(context); @@ -109,7 +109,7 @@ public abstract class PersistenceBackendFactory { File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME)); return fallbackFile; } - + protected static FallbackPersistenceBackend createFallback(String context){ context = sanitizeContext(context); logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context); @@ -120,7 +120,7 @@ public abstract class PersistenceBackendFactory { //fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence))); return fallbackPersistence; } - + protected static PersistenceBackend discoverPersistenceBackend(String context, FallbackPersistenceBackend fallback){ context = sanitizeContext(context); logger.debug("Discovering {} for scope {}", @@ -131,17 +131,17 @@ public abstract class PersistenceBackendFactory { try { String foundClassName = foundClass.getSimpleName(); logger.trace("Testing {}", foundClassName); - + PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass); if(configuration==null){ continue; } - + found.prepareConnection(configuration); found.setOpen(); - + logger.trace("{} will be used.", foundClassName); - + found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK")); if (fallback!=null) found.setFallback(fallback); @@ -155,13 +155,13 @@ public abstract class PersistenceBackendFactory { } return null; }; - - + + public static PersistenceBackend getPersistenceBackend(String context) { context = sanitizeContext(context); Boolean forceImmediateRediscovery = getForceImmediateRediscovery(context); - + PersistenceBackend persistence = null; logger.trace("Going to synchronized block in getPersistenceBackend"); synchronized (persistenceBackends) { @@ -176,10 +176,10 @@ public abstract class PersistenceBackendFactory { */ persistence = createFallback(context); persistenceBackends.put(context, persistence); - + if(forceImmediateRediscovery){ PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence); - + if (p!=null){ persistence=p; persistenceBackends.put(context, persistence); @@ -191,24 +191,24 @@ public abstract class PersistenceBackendFactory { } } } - + return persistence; } - - + + protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){ context = sanitizeContext(context); logger.debug("The {} for context {} is {}. " + "Is time to rediscover if there is another possibility.", PersistenceBackend.class.getSimpleName(), context, actual.getClass().getSimpleName()); - + PersistenceBackend discoveredPersistenceBackend = PersistenceBackendFactory.discoverPersistenceBackend(context, actual); - + if(discoveredPersistenceBackend!=null){ synchronized (persistenceBackends) { - + /* * Passing the aggregator to the new PersistenceBackend * so that the buffered records will be persisted with the @@ -217,7 +217,7 @@ public abstract class PersistenceBackendFactory { */ //discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler()); persistenceBackends.put(context, discoveredPersistenceBackend); - + /* * Not needed because close has no effect. Removed to * prevent problem in cases of future changes. @@ -234,11 +234,11 @@ public abstract class PersistenceBackendFactory { } } - + return actual; } - - + + /** * Not used * @param persistenceBackend @@ -246,7 +246,7 @@ public abstract class PersistenceBackendFactory { protected synchronized static void renew(PersistenceBackend persistenceBackend){ String context = null; logger.trace("Renew a configuration : {}", PersistenceBackend.class.getSimpleName()); - + Set> entrySet = persistenceBackends.entrySet(); for(Entry entry : entrySet){ if(entry.getValue() == persistenceBackend){ @@ -269,16 +269,16 @@ public abstract class PersistenceBackendFactory { fallbackPersistenceBackend, INITIAL_DELAY, FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS); } - - + + public static void flush(String context, long timeout, TimeUnit timeUnit){ context = sanitizeContext(context); - + PersistenceBackend apb; synchronized (persistenceBackends) { apb = persistenceBackends.get(context); } - + try { logger.debug("Flushing records in context {}", context); apb.flush(timeout, timeUnit); @@ -297,6 +297,42 @@ public abstract class PersistenceBackendFactory { flush(context, timeout, timeUnit); } } - - + + + /** + * + */ + public static void shutdown() { + //shutdown the scheduler + ExecutorUtils.scheduler.shutdown(); + try { + ExecutorUtils.scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + logger.error("Unable to shutdown the scheduler", e); + } + + //shutdown the threadPool + ExecutorUtils.threadPool.shutdown(); + try { + ExecutorUtils.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + logger.error("Unable to shutdown the threadPool", e); + } + //disconnect the persistence and clean + for(String context : persistenceBackends.keySet()){ + context = sanitizeContext(context); + PersistenceBackend apb; + synchronized (persistenceBackends) { + apb = persistenceBackends.get(context); + } + try { + logger.debug("Flushing records in context {}", context); + apb.closeAndClean(); + }catch(Exception e){ + logger.error("Unable to flush records in context {} with {}", context, apb, e); + } + } + + } + }