diff --git a/pom.xml b/pom.xml
index 76de238..34693ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
org.gcube.data.publishing
document-store-lib
- 1.5.0-SNAPSHOT
+ 1.6.0-SNAPSHOT
Document Store Lib
Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically.
diff --git a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java
index d6f6abd..7dd7f77 100644
--- a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java
+++ b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java
@@ -27,4 +27,6 @@ public class ExecutorUtils {
}
});
+
+
}
diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java
index 6e3a92c..95f657c 100644
--- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java
+++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java
@@ -21,23 +21,23 @@ import org.slf4j.LoggerFactory;
*
*/
public abstract class PersistenceBackendFactory {
-
+
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class);
-
+
public static final String DEFAULT_CONTEXT = "DEFAULT_CONTEXT";
-
+
public final static String HOME_SYSTEM_PROPERTY = "user.home";
-
+
protected static final String FALLBACK_FILENAME = "fallback.log";
-
+
private static String fallbackLocation;
private static Map persistenceBackends;
private static Map forceImmediateRediscoveries;
-
+
public static final long INITIAL_DELAY = 1000; // 1 min
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
-
+
static {
persistenceBackends = new HashMap();
forceImmediateRediscoveries = new HashMap<>();
@@ -46,7 +46,7 @@ public abstract class PersistenceBackendFactory {
public static void forceImmediateRediscovery(String context){
forceImmediateRediscoveries.put(context, new Boolean(true));
}
-
+
public static Boolean getForceImmediateRediscovery(String context){
Boolean force = forceImmediateRediscoveries.get(context);
if(force==null){
@@ -54,11 +54,11 @@ public abstract class PersistenceBackendFactory {
}
return force;
}
-
+
public static void addRecordPackage(Package packageObject) {
RecordUtility.addRecordPackage(packageObject);
}
-
+
private static File file(File file) throws IllegalArgumentException {
if(!file.isDirectory()){
file = file.getParentFile();
@@ -69,7 +69,7 @@ public abstract class PersistenceBackendFactory {
}
return file;
}
-
+
public synchronized static void setFallbackLocation(String path){
if(fallbackLocation == null){
if(path==null){
@@ -79,7 +79,7 @@ public abstract class PersistenceBackendFactory {
fallbackLocation = path;
}
}
-
+
protected synchronized static String getFallbackLocation(){
if(fallbackLocation==null){
try {
@@ -90,18 +90,18 @@ public abstract class PersistenceBackendFactory {
}
return fallbackLocation;
}
-
+
protected static String sanitizeContext(final String context){
if(context==null || context.compareTo("")==0){
return DEFAULT_CONTEXT;
}
return context;
}
-
+
protected static String removeSlashFromContext(String context){
return context.replace("/", "_");
}
-
+
public static File getFallbackFile(String context){
context = sanitizeContext(context);
String slashLessContext = removeSlashFromContext(context);
@@ -109,7 +109,7 @@ public abstract class PersistenceBackendFactory {
File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME));
return fallbackFile;
}
-
+
protected static FallbackPersistenceBackend createFallback(String context){
context = sanitizeContext(context);
logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context);
@@ -120,7 +120,7 @@ public abstract class PersistenceBackendFactory {
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence;
}
-
+
protected static PersistenceBackend discoverPersistenceBackend(String context, FallbackPersistenceBackend fallback){
context = sanitizeContext(context);
logger.debug("Discovering {} for scope {}",
@@ -131,17 +131,17 @@ public abstract class PersistenceBackendFactory {
try {
String foundClassName = foundClass.getSimpleName();
logger.trace("Testing {}", foundClassName);
-
+
PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){
continue;
}
-
+
found.prepareConnection(configuration);
found.setOpen();
-
+
logger.trace("{} will be used.", foundClassName);
-
+
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK"));
if (fallback!=null)
found.setFallback(fallback);
@@ -155,13 +155,13 @@ public abstract class PersistenceBackendFactory {
}
return null;
};
-
-
+
+
public static PersistenceBackend getPersistenceBackend(String context) {
context = sanitizeContext(context);
Boolean forceImmediateRediscovery = getForceImmediateRediscovery(context);
-
+
PersistenceBackend persistence = null;
logger.trace("Going to synchronized block in getPersistenceBackend");
synchronized (persistenceBackends) {
@@ -176,10 +176,10 @@ public abstract class PersistenceBackendFactory {
*/
persistence = createFallback(context);
persistenceBackends.put(context, persistence);
-
+
if(forceImmediateRediscovery){
PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence);
-
+
if (p!=null){
persistence=p;
persistenceBackends.put(context, persistence);
@@ -191,24 +191,24 @@ public abstract class PersistenceBackendFactory {
}
}
}
-
+
return persistence;
}
-
-
+
+
protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){
context = sanitizeContext(context);
logger.debug("The {} for context {} is {}. "
+ "Is time to rediscover if there is another possibility.",
PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName());
-
+
PersistenceBackend discoveredPersistenceBackend =
PersistenceBackendFactory.discoverPersistenceBackend(context, actual);
-
+
if(discoveredPersistenceBackend!=null){
synchronized (persistenceBackends) {
-
+
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
@@ -217,7 +217,7 @@ public abstract class PersistenceBackendFactory {
*/
//discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
persistenceBackends.put(context, discoveredPersistenceBackend);
-
+
/*
* Not needed because close has no effect. Removed to
* prevent problem in cases of future changes.
@@ -234,11 +234,11 @@ public abstract class PersistenceBackendFactory {
}
}
-
+
return actual;
}
-
-
+
+
/**
* Not used
* @param persistenceBackend
@@ -246,7 +246,7 @@ public abstract class PersistenceBackendFactory {
protected synchronized static void renew(PersistenceBackend persistenceBackend){
String context = null;
logger.trace("Renew a configuration : {}", PersistenceBackend.class.getSimpleName());
-
+
Set> entrySet = persistenceBackends.entrySet();
for(Entry entry : entrySet){
if(entry.getValue() == persistenceBackend){
@@ -269,16 +269,16 @@ public abstract class PersistenceBackendFactory {
fallbackPersistenceBackend, INITIAL_DELAY,
FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS);
}
-
-
+
+
public static void flush(String context, long timeout, TimeUnit timeUnit){
context = sanitizeContext(context);
-
+
PersistenceBackend apb;
synchronized (persistenceBackends) {
apb = persistenceBackends.get(context);
}
-
+
try {
logger.debug("Flushing records in context {}", context);
apb.flush(timeout, timeUnit);
@@ -297,6 +297,42 @@ public abstract class PersistenceBackendFactory {
flush(context, timeout, timeUnit);
}
}
-
-
+
+
+ /**
+ *
+ */
+ public static void shutdown() {
+ //shutdown the scheduler
+ ExecutorUtils.scheduler.shutdown();
+ try {
+ ExecutorUtils.scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Unable to shutdown the scheduler", e);
+ }
+
+ //shutdown the threadPool
+ ExecutorUtils.threadPool.shutdown();
+ try {
+ ExecutorUtils.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Unable to shutdown the threadPool", e);
+ }
+ //disconnect the persistence and clean
+ for(String context : persistenceBackends.keySet()){
+ context = sanitizeContext(context);
+ PersistenceBackend apb;
+ synchronized (persistenceBackends) {
+ apb = persistenceBackends.get(context);
+ }
+ try {
+ logger.debug("Flushing records in context {}", context);
+ apb.closeAndClean();
+ }catch(Exception e){
+ logger.error("Unable to flush records in context {} with {}", context, apb, e);
+ }
+ }
+
+ }
+
}