diff --git a/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java new file mode 100644 index 0000000..a7e8784 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/ExecutorUtils.java @@ -0,0 +1,32 @@ +package org.gcube.documentstore.persistence; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +public class ExecutorUtils { + + public static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactory() { + + private int counter = 0; + private static final String prefix = "AccountingScheduledThread"; + + + public Thread newThread(Runnable r) { + return new Thread(r, prefix + "-" + counter++); + } + }); + + public static ExecutorService threadPool = Executors.newFixedThreadPool(100, new ThreadFactory() { + + private int counter = 0; + private static final String prefix = "AccountingAggregationThread"; + + + public Thread newThread(Runnable r) { + return new Thread(r, prefix + "-" + counter++); + } + }); + +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java index 27ec6f0..b1ee8fc 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java @@ -4,8 +4,6 @@ package org.gcube.documentstore.persistence; import java.util.Arrays; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.gcube.documentstore.exception.InvalidValueException; @@ -26,13 +24,8 @@ public abstract class PersistenceBackend { protected PersistenceBackendMonitor persistenceBackendMonitor; - /** - * Pool for thread execution - */ - private ExecutorService pool; protected PersistenceBackend(){ - this.pool = Executors.newCachedThreadPool(); if(!(this instanceof FallbackPersistenceBackend)){ this.persistenceBackendMonitor = new PersistenceBackendMonitor(this); } @@ -97,14 +90,14 @@ public abstract class PersistenceBackend { try { logger.trace("Going to account {} using {} : {}", record, persistenceName, this); this.reallyAccount(record); - logger.debug("{} accounted succesfully from {}.", record.toString(), persistenceName); + logger.trace("{} accounted succesfully from {}.", record.toString(), persistenceName); } catch (Exception e) { try { String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName(); logger.error("{} was not accounted succesfully from {}. Trying to use {}.", record.toString(), persistenceName, fallabackPersistenceName, e); fallbackPersistence.reallyAccount(record); - logger.debug("{} accounted succesfully from {}", + logger.trace("{} accounted succesfully from {}", record.toString(), fallabackPersistenceName); }catch(Exception ex){ logger.error("{} was not accounted at all", record.toString(), e); @@ -116,7 +109,7 @@ public abstract class PersistenceBackend { protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){ try { - logger.debug("Received {} to account : {}", record.getClass().getSimpleName(), record); + logger.trace("Received {} to account : {}", record.getClass().getSimpleName(), record); if(validate){ record.validate(); logger.trace("{} {} valid", record.getClass().getSimpleName(), record); @@ -154,12 +147,12 @@ public abstract class PersistenceBackend { accountValidateAggregate(record, true, true); } }; - pool.execute(runnable); - + //pool.execute(runnable); + ExecutorUtils.threadPool.execute(runnable); } public void flush(long timeout, TimeUnit timeUnit) throws Exception { - pool.awaitTermination(timeout, timeUnit); + //pool.awaitTermination(timeout, timeUnit); aggregationScheduler.flush(new DefaultPersitenceExecutor(this)); } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java index 26f592f..ed9f4d5 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java @@ -29,7 +29,7 @@ public abstract class PersistenceBackendConfiguration { for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) { Class configClass = foundConfiguration.getClass(); String foundConfigurationClassName = configClass.getSimpleName(); - logger.debug("{} will be used.", foundConfigurationClassName); + logger.trace("{} will be used.", foundConfigurationClassName); return foundConfiguration; } @@ -42,7 +42,7 @@ public abstract class PersistenceBackendConfiguration { try { Class configClass = foundConfiguration.getClass(); String foundConfigurationClassName = configClass.getSimpleName(); - logger.debug("Testing {}", foundConfigurationClassName); + logger.trace("Testing {}", foundConfigurationClassName); @SuppressWarnings("rawtypes") Class[] configArgTypes = { Class.class }; @@ -50,7 +50,7 @@ public abstract class PersistenceBackendConfiguration { Object[] configArguments = {clz}; PersistenceBackendConfiguration configuration = configurationConstructor.newInstance(configArguments); - logger.debug("{} will be used.", foundConfigurationClassName); + logger.trace("{} will be used.", foundConfigurationClassName); return configuration; } catch (Exception e) { diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java index 138c3f1..d313b90 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -102,8 +102,6 @@ public abstract class PersistenceBackendFactory { FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile); fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence))); - //Modify for configuration aggregation - //fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence))); return fallbackPersistence; } @@ -116,7 +114,7 @@ public abstract class PersistenceBackendFactory { Class foundClass = found.getClass(); try { String foundClassName = foundClass.getSimpleName(); - logger.debug("Testing {}", foundClassName); + logger.trace("Testing {}", foundClassName); PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass); if(configuration==null){ @@ -124,7 +122,7 @@ public abstract class PersistenceBackendFactory { } found.prepareConnection(configuration); - logger.debug("{} will be used.", foundClassName); + logger.trace("{} will be used.", foundClassName); found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration)); found.setFallback(createFallback(context)); @@ -142,10 +140,10 @@ public abstract class PersistenceBackendFactory { context = sanitizeContext(context); PersistenceBackend persistence = null; - logger.debug("Going to synchronized block in getPersistenceBackend"); + logger.trace("Going to synchronized block in getPersistenceBackend"); synchronized (persistenceBackends) { persistence = persistenceBackends.get(context); - logger.debug("{} {}", PersistenceBackend.class.getSimpleName(), persistence); + logger.trace("{} {}", PersistenceBackend.class.getSimpleName(), persistence); if(persistence==null){ /* * Setting FallbackPersistence and unlocking. diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java index a7be5b5..d845b73 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java @@ -9,8 +9,6 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.Calendar; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.gcube.documentstore.records.Record; @@ -29,7 +27,7 @@ class PersistenceBackendMonitor implements Runnable { private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION"; private final static String ELABORATION_FILE_NOT_DELETED_SUFFIX = ".ELABORATION.NOT-DELETED"; - protected final ScheduledExecutorService scheduler; + //protected final ScheduledExecutorService scheduler; protected final PersistenceBackend persistenceBackend; @@ -39,8 +37,9 @@ class PersistenceBackendMonitor implements Runnable { public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){ this.persistenceBackend = persistenceBackend; - this.scheduler = Executors.newSingleThreadScheduledExecutor(); - this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES); + //this.scheduler = Executors.newSingleThreadScheduledExecutor(); + //this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES); + ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES); } protected void elaborateFile(File elaborationFile){ @@ -102,7 +101,7 @@ class PersistenceBackendMonitor implements Runnable { elaborateFile(elaborationFile); boolean deleted = elaborationFile.delete(); if(!deleted){ - logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath()); + logger.trace("Failed to delete file {}", elaborationFile.getAbsolutePath()); File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX); elaborationFile.renameTo(elaborationFileNotDeleted); } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java index 260bcf1..5e69790 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendRediscover.java @@ -3,8 +3,7 @@ */ package org.gcube.documentstore.persistence; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -18,35 +17,39 @@ class PersistenceBackendRediscover implements Runnable { private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendRediscover.class); - protected final ScheduledExecutorService scheduler; + //protected final ScheduledExecutorService scheduler; protected final String context; protected final FallbackPersistenceBackend fallbackPersistenceBackend; + private ScheduledFuture scheduledThread; + public PersistenceBackendRediscover(String context, FallbackPersistenceBackend fallbackPersistenceBackend, long initialDelay, long delay, TimeUnit timeUnit){ this.context = context; this.fallbackPersistenceBackend = fallbackPersistenceBackend; - this.scheduler = Executors.newSingleThreadScheduledExecutor(); - this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit); + //this.scheduler = Executors.newSingleThreadScheduledExecutor(); + //this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit); + scheduledThread = ExecutorUtils.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit); } @Override public void run() { - logger.debug("Going to rediscover {}", PersistenceBackend.class.getSimpleName()); + logger.trace("Going to rediscover {}", PersistenceBackend.class.getSimpleName()); PersistenceBackend rediscovered = PersistenceBackendFactory. rediscoverPersistenceBackend(fallbackPersistenceBackend, context); if(rediscovered!=fallbackPersistenceBackend){ - logger.debug("Another {} was found : {}. " + logger.trace("Another {} was found : {}. " + "Shutting down {} Thread for context {}", PersistenceBackend.class.getSimpleName(), rediscovered.getClass().getSimpleName(), PersistenceBackendRediscover.class.getSimpleName(), context); - scheduler.shutdown(); + //scheduler.shutdown(); + scheduledThread.cancel(true); }else{ - logger.debug("{} for contaxt {} is still a {}. We will see if next time we will be more lucky.", + logger.trace("{} for contaxt {} is still a {}. We will see if next time we will be more lucky.", PersistenceBackend.class.getSimpleName(), context, FallbackPersistenceBackend.class.getSimpleName()); diff --git a/src/main/java/org/gcube/documentstore/records/RecordUtility.java b/src/main/java/org/gcube/documentstore/records/RecordUtility.java index 1eee44e..96ea239 100644 --- a/src/main/java/org/gcube/documentstore/records/RecordUtility.java +++ b/src/main/java/org/gcube/documentstore/records/RecordUtility.java @@ -149,6 +149,9 @@ public class RecordUtility { public static Class> getAggregatedRecordClass(String recordType) throws ClassNotFoundException { + + + if(getAggregatedRecordClassesFound().containsKey(recordType)){ logger.trace("record type {},getAggregatedRecordClassesFound {}",recordType,getAggregatedRecordClassesFound(),getAggregatedRecordClassesFound().get(recordType)); return getAggregatedRecordClassesFound().get(recordType); @@ -160,7 +163,7 @@ public class RecordUtility { - logger.trace("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound()); + logger.debug("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound()); throw new ClassNotFoundException(); } diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java index 9aef4de..9326dc6 100644 --- a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java +++ b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java @@ -10,11 +10,11 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions; +import org.gcube.documentstore.persistence.ExecutorUtils; //import org.gcube.documentstore.persistence.DefaultPersitenceExecutor; import org.gcube.documentstore.persistence.PersistenceBackend; import org.gcube.documentstore.persistence.PersistenceBackendConfiguration; @@ -38,7 +38,8 @@ public abstract class AggregationScheduler implements Runnable { protected Map> bufferedRecords; protected final PersistenceExecutor persistenceExecutor; - protected final ScheduledExecutorService scheduler; + + //protected final ScheduledExecutorService scheduler; public static int INITIAL_DELAY = 30; public static int DELAY = 30; @@ -54,7 +55,7 @@ public abstract class AggregationScheduler implements Runnable { * TODO Get from configuration */ public static long TIME_RELOAD_CONFIGURATION =1000*60*60*12; // 12 hour - + /** * The time for first @@ -93,7 +94,8 @@ public abstract class AggregationScheduler implements Runnable { Integer delay=0; Integer maxRecordNumber=0; Integer maxRecordTime=0; - + + if (prop==null){ //get value from service end point logger.trace("Configuration from service end point"); @@ -120,7 +122,10 @@ public abstract class AggregationScheduler implements Runnable { OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60; TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis(); - + logger.trace("Start Instance for time load configuration {}", TIME_LOAD_CONFIGURATION); + + + return new BufferAggregationScheduler(persistenceExecutor); } @@ -129,8 +134,11 @@ public abstract class AggregationScheduler implements Runnable { this.bufferedRecords = new HashMap>(); this.totalBufferedRecords = 0; this.persistenceExecutor = persistenceExecutor; - this.scheduler = Executors.newScheduledThreadPool(1); - this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT); + //this.scheduler = Executors.newScheduledThreadPool(1); + //this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT); + ScheduledFuture future =ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT); + logger.trace("Thread scheduler created in {} ", this.toString()); + logger.trace("Reload configuration every {}", TIME_RELOAD_CONFIGURATION); logger.trace("Aggregated for max record {}", MAX_RECORDS_NUMBER); logger.trace("Aggregated for max time {}", OLD_RECORD_MAX_TIME_ELAPSED); @@ -238,6 +246,7 @@ public abstract class AggregationScheduler implements Runnable { } if(isTimeToPersist( MAX_RECORDS_NUMBER , OLD_RECORD_MAX_TIME_ELAPSED ) || forceFlush){ reallyFlush(persistenceExecutor); + } /** @@ -276,7 +285,7 @@ public abstract class AggregationScheduler implements Runnable { Class foundClass = found.getClass(); try { String foundClassName = foundClass.getSimpleName(); - logger.debug("Testing {}", foundClassName); + logger.trace("Testing {}", foundClassName); configuration = PersistenceBackendConfiguration.getInstance(foundClass); if(configuration==null){ @@ -371,7 +380,7 @@ public abstract class AggregationScheduler implements Runnable { try { this.flush(persistenceExecutor); } catch (Exception e) { - logger.error("Error flushing Buffered Records"); + logger.error("Error flushing Buffered Records",e); } } diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java index e35f0b5..ae1ebe2 100644 --- a/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java +++ b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java @@ -18,6 +18,7 @@ public class BufferAggregationScheduler extends AggregationScheduler { protected boolean firstOfBuffer; protected long firstBufferedTime; + public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){ super(persistenceExecutor); this.firstOfBuffer = true; @@ -34,12 +35,13 @@ public class BufferAggregationScheduler extends AggregationScheduler { */ @Override protected boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime) { + + long now = Calendar.getInstance().getTimeInMillis(); if(firstOfBuffer){ firstOfBuffer = false; firstBufferedTime = now; } - if(totalBufferedRecords >= maxRecordNumber){ logger.trace("Time persist from maxRecordNumber:"+maxRecordNumber+" max totalBufferedRecords:"+totalBufferedRecords); return true; diff --git a/src/main/java/org/gcube/documentstore/records/implementation/ConfigurationGetPropertyValues.java b/src/main/java/org/gcube/documentstore/records/implementation/ConfigurationGetPropertyValues.java index ce7a4f0..5e5599a 100644 --- a/src/main/java/org/gcube/documentstore/records/implementation/ConfigurationGetPropertyValues.java +++ b/src/main/java/org/gcube/documentstore/records/implementation/ConfigurationGetPropertyValues.java @@ -2,7 +2,6 @@ package org.gcube.documentstore.records.implementation; import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Properties; import org.slf4j.Logger;