diff --git a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java index 63b5490..5336bd6 100644 --- a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java +++ b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java @@ -11,14 +11,14 @@ public class ExecutorUtils { public static final ScheduledExecutorService CONFIGURATION_REDISCOVERY_POOL; public static final ScheduledExecutorService FUTURE_FLUSH_POOL; - public static final ScheduledExecutorService FALLBACK_REDISCOVERY_POOL; + public static final ScheduledExecutorService FALLBACK_ELABORATOR_POOL; public static final ExecutorService ASYNC_AGGREGATION_POOL; static { - PERSISTENCE_BACKEND_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() { + PERSISTENCE_BACKEND_REDISCOVERY_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() { private int counter = 0; private static final String prefix = "PersistenceBackendRediscoveryThread"; @@ -29,7 +29,7 @@ public class ExecutorUtils { }); - CONFIGURATION_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() { + CONFIGURATION_REDISCOVERY_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() { private int counter = 0; private static final String prefix = "ConfigurationRediscoveryThread"; @@ -39,7 +39,7 @@ public class ExecutorUtils { } }); - FUTURE_FLUSH_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() { + FUTURE_FLUSH_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() { private int counter = 0; private static final String prefix = "FlushThread"; @@ -49,10 +49,10 @@ public class ExecutorUtils { } }); - FALLBACK_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() { + FALLBACK_ELABORATOR_POOL = Executors.newScheduledThreadPool(20, new ThreadFactory() { private int counter = 0; - private static final String prefix = "FallbackRediscoveryThread"; + private static final String prefix = "FallbackElaboratorThread"; public Thread newThread(Runnable r) { return new Thread(r, prefix + "-" + counter++); @@ -60,7 +60,7 @@ public class ExecutorUtils { }); - ASYNC_AGGREGATION_POOL = Executors.newFixedThreadPool(100, new ThreadFactory() { + ASYNC_AGGREGATION_POOL = Executors.newFixedThreadPool(30, new ThreadFactory() { private int counter = 0; private static final String prefix = "AsyncAggregationThread"; diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java b/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java index 01feb2b..d0bc643 100644 --- a/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java +++ b/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java @@ -40,7 +40,7 @@ public class FallbackMonitor implements Runnable { public FallbackMonitor(PersistenceBackend persistenceBackend, boolean schedule){ this.persistenceBackend = persistenceBackend; if(schedule){ - ExecutorUtils.FALLBACK_REDISCOVERY_POOL.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES); + ExecutorUtils.FALLBACK_ELABORATOR_POOL.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES); } } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java index a9a05fa..737b2ea 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java @@ -206,35 +206,37 @@ public abstract class PersistenceBackend { protected void accountWithFallback(Record... records) throws Exception { String persistenceName = this.getClass().getSimpleName(); String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName(); - - this.openConnection(); - - for (Record record : records) { - String recordString = null; - try { - recordString = record.toString(); - - if (isFallbackForced()) { - logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, record.toString()); - fallbackPersistence.reallyAccount(record); - } else { - this.reallyAccount(record); - logger.trace("{} accounted succesfully from {}.", recordString, persistenceName); - } - - } catch (Throwable t) { + try { + this.openConnection(); + + for (Record record : records) { + String recordString = null; try { - logger.warn("{} was not accounted succesfully using {}. Trying to use {}.", recordString, - persistenceName, fallbackPersistenceName, t); - fallbackPersistence.reallyAccount(record); - }finally { - registerUseOfFallback(); + recordString = record.toString(); + + if (isFallbackForced()) { + logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, record.toString()); + fallbackPersistence.reallyAccount(record); + } else { + this.reallyAccount(record); + logger.trace("{} accounted succesfully from {}.", recordString, persistenceName); + } + + } catch (Throwable t) { + try { + logger.warn("{} was not accounted succesfully using {}. Trying to use {}.", recordString, + persistenceName, fallbackPersistenceName, t); + fallbackPersistence.reallyAccount(record); + }finally { + registerUseOfFallback(); + } + } - } + }finally { + logger.trace("{} is going to close the connection (if any)", this.getClass().getSimpleName()); + this.closeConnection(); } - - this.closeConnection(); } /** @@ -258,11 +260,9 @@ public abstract class PersistenceBackend { } else { this.accountWithFallback(record); } - } catch (InvalidValueException e) { logger.error("Error validating {}", record.getClass().getSimpleName(), e); } catch (Exception e) { - logger.error("Error recording {}", record.getClass().getSimpleName(), e); } } @@ -281,7 +281,12 @@ public abstract class PersistenceBackend { Runnable runnable = new Runnable() { @Override public void run() { - accountValidateAggregate(record, true, true); + try { + accountValidateAggregate(record, true, true); + logger.trace("Record {} validated and aggregated were possible", record); + }catch (Throwable t) { + logger.error("Unable to account record {}", record, t); + } } }; ExecutorUtils.ASYNC_AGGREGATION_POOL.execute(runnable); diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java index 9daca51..eff9c4d 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -33,8 +33,8 @@ public abstract class PersistenceBackendFactory { private static Map persistenceBackends; private static Map forceImmediateRediscoveries; - public static final long INITIAL_DELAY = 1000; // 1 min - public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min + public static final long INITIAL_DELAY = TimeUnit.MINUTES.toMillis(1); // 1 min + public static final long FALLBACK_RETRY_TIME = TimeUnit.MINUTES.toMillis(10); // 10 min static { persistenceBackends = new HashMap(); @@ -125,28 +125,26 @@ public abstract class PersistenceBackendFactory { ServiceLoader serviceLoader = ServiceLoader.load(PersistenceBackend.class); - logger.trace("discoverPersistenceBackend Found a service loader {}", serviceLoader.toString()); - logger.trace("discoverPersistenceBackend Found a service loader with {}", PersistenceBackend.class.toString()); - + logger.trace("Created service loader for {}", PersistenceBackend.class.toString()); for (PersistenceBackend found : serviceLoader) { - logger.trace("for PersistenceBackend"); - logger.trace("Testing before cast {}", found.toString()); Class foundClass = found.getClass(); - + logger.trace("ServiceLoader found {}", foundClass.toString()); try { String foundClassName = foundClass.getSimpleName(); - logger.trace("Testing {}", foundClassName); + logger.trace("Going to look for configuration for {} on IS", foundClassName); PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass); if(configuration==null){ + logger.trace("No configuration found for {} on IS. Trying another persistence if any.", foundClassName); continue; } + logger.trace("Going to prepare connection for {} with discoverd configuration", foundClassName); found.prepareConnection(configuration); - logger.trace("{} will be used.", foundClassName); + logger.trace("The connection has been configured properly for {} so it will be used as {}.", foundClassName, PersistenceBackend.class.getSimpleName()); found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found))); @@ -162,7 +160,7 @@ public abstract class PersistenceBackendFactory { logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e); } } - logger.trace("Not Found any service loader"); + logger.trace("No valid {} found.", PersistenceBackend.class.getSimpleName()); return null; }; @@ -176,10 +174,8 @@ public abstract class PersistenceBackendFactory { logger.trace("Going to synchronized block in getPersistenceBackend"); synchronized (persistenceBackends) { persistence = persistenceBackends.get(context); - //logger.trace("[getPersistenceBackend]{} {} in context {}", PersistenceBackend.class.getSimpleName(), persistence,context); + logger.trace("{} in context {} is {}", PersistenceBackend.class.getSimpleName(), context, persistence); if(persistence==null){ - logger.trace("[getPersistenceBackend]{} {} in context {}", PersistenceBackend.class.getSimpleName(), persistence,context); - /* * Setting FallbackPersistence and unlocking. * There will be another thread which will try to discover the @@ -189,13 +185,19 @@ public abstract class PersistenceBackendFactory { persistenceBackends.put(context, persistence); if(forceImmediateRediscovery){ + logger.trace("Immediate Rediscovery has been forced"); PersistenceBackend p = discoverPersistenceBackend(context, (FallbackPersistenceBackend) persistence); - if (p!=null){ persistence=p; persistenceBackends.put(context, persistence); } - }else{ + } + + if(persistence instanceof FallbackPersistenceBackend) { + logger.trace("{} is {}. Going to schedule a thread with inital delay {} and period {} (in {}) to retry to discover and configure another {}", + PersistenceBackend.class.getSimpleName(), FallbackPersistenceBackend.class.getSimpleName(), + INITIAL_DELAY, FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS.name(), PersistenceBackend.class.getSimpleName()); + new PersistenceBackendRediscover(context, (FallbackPersistenceBackend) persistence, INITIAL_DELAY, FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS); @@ -264,9 +266,8 @@ public abstract class PersistenceBackendFactory { PersistenceBackendFactory.discoverPersistenceBackend(context, actual); if(discoveredPersistenceBackend!=null){ - switchPersistenceBackend(actual, discoveredPersistenceBackend, context); - + return discoveredPersistenceBackend; } return actual; @@ -356,9 +357,9 @@ public abstract class PersistenceBackendFactory { logger.error("Unable to shutdown the threadPool", e); } - ExecutorUtils.FALLBACK_REDISCOVERY_POOL.shutdown(); + ExecutorUtils.FALLBACK_ELABORATOR_POOL.shutdown(); try { - ExecutorUtils.FALLBACK_REDISCOVERY_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + ExecutorUtils.FALLBACK_ELABORATOR_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { logger.error("Unable to shutdown the threadPool", e); } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java index 8529e02..4d327cb 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java @@ -37,7 +37,7 @@ class PersistenceBackendRediscover implements Runnable { logger.trace("Going to rediscover {}", PersistenceBackend.class.getSimpleName()); PersistenceBackend rediscovered = PersistenceBackendFactory. rediscoverPersistenceBackend(fallbackPersistenceBackend, context); - if(rediscovered!=fallbackPersistenceBackend){ + if(!rediscovered.getClass().equals(fallbackPersistenceBackend.getClass())){ logger.trace("Another {} was found : {}. " + "Shutting down {} Thread for context {}", PersistenceBackend.class.getSimpleName(),