From 976f9f21f747a9cce3c5d4c7d91b334e719ca794 Mon Sep 17 00:00:00 2001 From: Alessandro Pieve Date: Tue, 22 Nov 2016 09:50:29 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@134489 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 2 +- .../aggregation/AggregationScheduler.java | 549 ++++++++++-------- 2 files changed, 301 insertions(+), 250 deletions(-) diff --git a/pom.xml b/pom.xml index 18f9282..5c34444 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.gcube.data.publishing document-store-lib - 1.3.0-SNAPSHOT + 1.3.1-SNAPSHOT Document Store Lib Allow to persist data in NoSQL Document Store Databases. Discover Model dynamically. diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java index 7531de2..ededb5f 100644 --- a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java +++ b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java @@ -1,5 +1,6 @@ package org.gcube.documentstore.records.aggregation; - + +import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; @@ -12,7 +13,7 @@ import java.util.Properties; import java.util.ServiceLoader; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - + import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions; import org.gcube.documentstore.persistence.ExecutorUtils; //import org.gcube.documentstore.persistence.DefaultPersitenceExecutor; @@ -25,185 +26,168 @@ import org.gcube.documentstore.records.RecordUtility; import org.gcube.documentstore.records.implementation.ConfigurationGetPropertyValues; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - + /** * @author Luca Frosini (ISTI - CNR) * */ public abstract class AggregationScheduler implements Runnable { - + public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class); - + protected int totalBufferedRecords; protected Map> bufferedRecords; - + protected final PersistenceExecutor persistenceExecutor; - - //protected final ScheduledExecutorService scheduler; - - public static int INITIAL_DELAY = 30; - public static int DELAY = 30; + + public static final int INITIAL_DELAY = 30; + public static Integer initialDelaySet; + + public static final int DELAY = 30; + public static Integer delaySet; + public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES; - - public static final String AGGREGATION_SCHEDULER_TIME="AggregationSchedulerTime"; - - public static final String BUFFER_RECORD_TIME="BufferRecordTime"; - public static final String BUFFER_RECORD_NUMBER="BufferRecordNumber"; - + + public static final String AGGREGATION_SCHEDULER_TIME = "AggregationSchedulerTime"; + + public static final String BUFFER_RECORD_TIME = "BufferRecordTime"; + public static final String BUFFER_RECORD_NUMBER = "BufferRecordNumber"; + /** - * The Max amount of time for reload a configuration - * TODO Get from configuration + * The Max amount of time for reload a configuration TODO Get from + * configuration */ - public static long TIME_RELOAD_CONFIGURATION =1000*60*60*12; // 12 hour - - + public static long TIME_RELOAD_CONFIGURATION = 1000 * 60 * 60 * 12; // 12 + /** - * The time for first - */ - public static long TIME_LOAD_CONFIGURATION=0L; - + * The time for first + */ + public static long timeLoadConfiguration = 0L; + /** - * Define the MAX number of Record to buffer. - * TODO Get from configuration + * Define the MAX number of Record to buffer. TODO Get from configuration */ - protected static int MAX_RECORDS_NUMBER = 100; - - - + protected static final int MAX_RECORDS_NUMBER = 100; + protected static int maxRecordsNumberSet; + public static boolean changeConfiguration = false; + /** - * The Max amount of time elapsed form last record before after that - * the buffered record are persisted even if - * TODO Get from configuration + * The Max amount of time elapsed form last record before after that the + * buffered record are persisted even if TODO Get from configuration */ - protected static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*30; // 10 min - - - - public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor){ - - + protected static final long OLD_RECORD_MAX_TIME_ELAPSED = 1000 * 60 * 30; // 30 min + protected static long OldRecordMaxTimeElapsedSet; + + protected ScheduledFuture future = null; + + public static AggregationScheduler newInstance( + PersistenceExecutor persistenceExecutor) { return new BufferAggregationScheduler(persistenceExecutor); } - - - - public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor, PersistenceBackendConfiguration configuration) throws NumberFormatException, Exception{ - - ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues(); - Properties prop =properties.getPropValues(); - Integer delay=0; - Integer maxRecordNumber=0; - Integer maxRecordTime=0; - - - if (prop==null){ - //get value from service end point - logger.trace("Configuration from service end point"); - delay=Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME)); - maxRecordTime=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME)); - maxRecordNumber=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER)); - } - else{ - //get value from properties file - logger.trace("Configuration from properties file"); - delay=Integer.parseInt(prop.getProperty("delay")); - maxRecordNumber=Integer.parseInt(prop.getProperty("maxrecordnumber")); - maxRecordTime=Integer.parseInt(prop.getProperty("maxtimenumber")); - } - - if (delay != 0){ - DELAY=delay; - INITIAL_DELAY=delay; - } - if (maxRecordNumber != 0) - MAX_RECORDS_NUMBER=maxRecordNumber; - - if (maxRecordTime != 0) - OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60; - - TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis(); - logger.trace("Start Instance for time load configuration {}", TIME_LOAD_CONFIGURATION); - - + + public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor, + PersistenceBackendConfiguration configuration)throws NumberFormatException, Exception { + CheckConfiguration(configuration,false); return new BufferAggregationScheduler(persistenceExecutor); } - - - protected AggregationScheduler(PersistenceExecutor persistenceExecutor){ + + protected AggregationScheduler(PersistenceExecutor persistenceExecutor) { + + timeLoadConfiguration = Calendar.getInstance().getTimeInMillis(); this.bufferedRecords = new HashMap>(); this.totalBufferedRecords = 0; this.persistenceExecutor = persistenceExecutor; - //this.scheduler = Executors.newScheduledThreadPool(1); - //this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT); - ScheduledFuture future =ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT); - logger.trace("Thread scheduler created in {} ", this.toString()); - logger.trace("Reload configuration every {}", TIME_RELOAD_CONFIGURATION); - logger.trace("Aggregated for max record {}", MAX_RECORDS_NUMBER); - logger.trace("Aggregated for max time {}", OLD_RECORD_MAX_TIME_ELAPSED); - + if (initialDelaySet == null) + initialDelaySet = INITIAL_DELAY; + if (delaySet == null) + delaySet = DELAY; + if ((initialDelaySet == 0) || (delaySet == 0)) { + future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT); + } + else{ + future = ExecutorUtils.scheduler.scheduleAtFixedRate(this,initialDelaySet, delaySet, TIME_UNIT); + } + logger.trace("AggregationScheduler- Thread scheduler created in {} ", this.toString()); + logger.trace("AggregationScheduler- Load configuration every {}", TIME_RELOAD_CONFIGURATION); + logger.trace("AggregationScheduler- Aggregated for max record {}", maxRecordsNumberSet); + logger.trace("AggregationScheduler- Aggregated for max time {}", OldRecordMaxTimeElapsedSet); + } - + + private void reSchedule() { + //reschedule event because change configuration + future.cancel(false); + if ((initialDelaySet == 0) || (delaySet == 0)) { + future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT); + }else + future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, delaySet,delaySet, TIME_UNIT); + changeConfiguration=false; + logger.trace("reSchedule - Thread scheduler created in {} ", this.toString()); + logger.trace("reSchedule - Load configuration every {}", TIME_RELOAD_CONFIGURATION); + logger.trace("reSchedule - Aggregated for max record {}", maxRecordsNumberSet); + logger.trace("reSchedule - Aggregated for max time {}", OldRecordMaxTimeElapsedSet); + } + @SuppressWarnings("rawtypes") - protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception{ - + protected static AggregatedRecord instantiateAggregatedRecord(Record record) + throws Exception { + String recordType = record.getRecordType(); Class clz = RecordUtility.getAggregatedRecordClass(recordType); Class[] argTypes = { record.getClass() }; Constructor constructor = clz.getDeclaredConstructor(argTypes); - Object[] arguments = {record}; + Object[] arguments = { record }; return constructor.newInstance(arguments); } - + @SuppressWarnings("rawtypes") - public static AggregatedRecord getAggregatedRecord(Record record) throws Exception { - + public static AggregatedRecord getAggregatedRecord(Record record) + throws Exception { + AggregatedRecord aggregatedRecord; - if(record instanceof AggregatedRecord){ - // the record is already an aggregated version + if (record instanceof AggregatedRecord) { + // the record is already an aggregated version aggregatedRecord = (AggregatedRecord) record; - }else{ + } else { aggregatedRecord = instantiateAggregatedRecord(record); } - + return aggregatedRecord; } - + @SuppressWarnings({ "rawtypes", "unchecked" }) - protected void madeAggregation(Record record){ - - + protected void madeAggregation(Record record) { + String recordType = record.getRecordType(); List records; - - if(this.bufferedRecords.containsKey(recordType)){ + + if (this.bufferedRecords.containsKey(recordType)) { records = this.bufferedRecords.get(recordType); boolean found = false; - - for(Record bufferedRecord : records){ - if(!(bufferedRecord instanceof AggregatedRecord)){ + + for (Record bufferedRecord : records) { + if (!(bufferedRecord instanceof AggregatedRecord)) { continue; } - - try { + try { AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord; - - if(record instanceof AggregatedRecord){ + + if (record instanceof AggregatedRecord) { // TODO check compatibility using getAggregable bufferedAggregatedRecord.aggregate((AggregatedRecord) record); - }else{ + } else { bufferedAggregatedRecord.aggregate((Record) record); } - - logger.trace("Aggregated Record is {}", bufferedAggregatedRecord); + logger.trace("Aggregated Record is {}",bufferedAggregatedRecord); found = true; break; - } catch(NotAggregatableRecordsExceptions e) { - logger.trace("{} is not usable for aggregation", bufferedRecord); - } + } catch (NotAggregatableRecordsExceptions e) { + logger.trace("{} is not usable for aggregation",bufferedRecord); + } } - - if(!found){ + + if (!found) { try { records.add(getAggregatedRecord(record)); } catch (Exception e) { @@ -212,9 +196,8 @@ public abstract class AggregationScheduler implements Runnable { totalBufferedRecords++; return; } - - - }else{ + + } else { records = new ArrayList(); try { records.add(getAggregatedRecord(record)); @@ -224,155 +207,223 @@ public abstract class AggregationScheduler implements Runnable { totalBufferedRecords++; this.bufferedRecords.put(recordType, records); } - + } - - public void flush(PersistenceExecutor persistenceExecutor) throws Exception{ + + public void flush(PersistenceExecutor persistenceExecutor) throws Exception { aggregate(null, persistenceExecutor, true); } - + protected abstract void schedulerSpecificClear(); - - protected void clear(){ - totalBufferedRecords=0; + + protected void clear() { + totalBufferedRecords = 0; bufferedRecords.clear(); schedulerSpecificClear(); } - - protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception { - - if(record!=null){ + + protected synchronized void aggregate(Record record, + PersistenceExecutor persistenceExecutor, boolean forceFlush) + throws Exception { + + if (record != null) { madeAggregation(record); } - if(isTimeToPersist( MAX_RECORDS_NUMBER , OLD_RECORD_MAX_TIME_ELAPSED ) || forceFlush){ + if (isTimeToPersist(maxRecordsNumberSet, OldRecordMaxTimeElapsedSet)|| forceFlush) { reallyFlush(persistenceExecutor); - } - /** - * reload a configuration - */ + * check if reload a configuration + */ long now = Calendar.getInstance().getTimeInMillis(); - if((now - TIME_LOAD_CONFIGURATION) >= TIME_RELOAD_CONFIGURATION){ - ReloadConfiguration(); + if ((now - timeLoadConfiguration) >= TIME_RELOAD_CONFIGURATION) { + reloadConfiguration(); } } - - protected void ReloadConfiguration()throws Exception{ - - new Thread(){ - - public void run(){ - - Integer delay=0; - Integer maxRecordNumber=0; - Integer maxRecordTime=0; - try { - - ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues(); - Properties prop =properties.getPropValues(); - if (prop!=null){ - //get value from properties file - logger.trace("Reload Configuration from properties file"); - delay=Integer.parseInt(prop.getProperty("delay")); - maxRecordNumber=Integer.parseInt(prop.getProperty("maxrecordnumber")); - maxRecordTime=Integer.parseInt(prop.getProperty("maxtimenumber")); - } - else{ - ServiceLoader serviceLoader = ServiceLoader.load(PersistenceBackend.class); - PersistenceBackendConfiguration configuration =null; - for (PersistenceBackend found : serviceLoader) { - Class foundClass = found.getClass(); - try { - String foundClassName = foundClass.getSimpleName(); - logger.trace("Testing {}", foundClassName); - - configuration = PersistenceBackendConfiguration.getInstance(foundClass); - if(configuration==null){ - continue; - } - logger.debug("{} will be used.", foundClassName); - - - } catch (Exception e) { - logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e); - } - } - - if (configuration!=null){ - //get value from service end point - logger.trace("Reload Configuration from service end point"); - delay=Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME)); - maxRecordTime=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME)); - maxRecordNumber=Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER)); - - } - } - - } catch (Exception e) { - // TODO Auto-generated catch block - logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", e.getLocalizedMessage()), e); - - } - //configure new value - if (delay != 0){ - DELAY=delay; - INITIAL_DELAY=delay; - } - if (maxRecordNumber != 0) - MAX_RECORDS_NUMBER=maxRecordNumber; - - if (maxRecordTime != 0) - OLD_RECORD_MAX_TIME_ELAPSED=maxRecordTime*1000*60; - //reset a timer - TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis(); - - logger.trace("Aggregated for max record {}", MAX_RECORDS_NUMBER); - logger.trace("Aggregated for max time {}", OLD_RECORD_MAX_TIME_ELAPSED); - } - - - }.start(); - - - } - - - - protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception{ - if(totalBufferedRecords==0){ + + + protected void reallyFlush(PersistenceExecutor persistenceExecutor) + throws Exception { + if (totalBufferedRecords == 0) { return; } Record[] recordToPersist = new Record[totalBufferedRecords]; int i = 0; Collection> values = bufferedRecords.values(); - for(List records : values){ - for(Record thisRecord: records){ + for (List records : values) { + for (Record thisRecord : records) { recordToPersist[i] = thisRecord; i++; } } - logger.trace("reallyFlush It is time to persist buffered records {}", Arrays.toString(recordToPersist)); + logger.trace("reallyFlush It is time to persist buffered records {}",Arrays.toString(recordToPersist)); persistenceExecutor.persist(recordToPersist); - clear(); } - + /** - * Get an usage records and try to aggregate with other buffered - * Usage Record. - * @param singleRecord the Usage Record To Buffer + * Get an usage records and try to aggregate with other buffered Usage + * Record. + * + * @param singleRecord + * the Usage Record To Buffer * @return true if is time to persist the buffered Usage Record - * @throws Exception if fails + * @throws Exception + * if fails */ - public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception { + public void aggregate(Record record, PersistenceExecutor persistenceExecutor) + throws Exception { aggregate(record, persistenceExecutor, false); } - - - protected abstract boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime); - - /* (non-Javadoc) + + protected abstract boolean isTimeToPersist(int maxRecordNumber, + long oldRecordMaxTime); + + + /** + * reloadConfiguration + * @throws Exception + */ + protected void reloadConfiguration() throws Exception { + final AggregationScheduler thisAG = this; + new Thread() { + + public void run() { + + PersistenceBackendConfiguration configuration=getConfiguration(); + try { + CheckConfiguration(configuration, true); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (changeConfiguration) { + thisAG.run(); + thisAG.reSchedule(); + } + + } + + }.start(); + + } + /** + * Get Configuration (used from reload configuration) + * @return + */ + protected PersistenceBackendConfiguration getConfiguration(){ + ServiceLoader serviceLoader = ServiceLoader + .load(PersistenceBackend.class); + PersistenceBackendConfiguration configuration = null; + for (PersistenceBackend found : serviceLoader) { + Class foundClass = found + .getClass(); + try { + String foundClassName = foundClass.getSimpleName(); + logger.trace("getConfiguration - foundClassName {}", foundClassName); + configuration = PersistenceBackendConfiguration.getInstance(foundClass); + if (configuration == null) { + continue; + } + logger.debug("{} will be used.", foundClassName); + } catch (Exception e) { + logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",foundClass.getSimpleName()), e); + } + } + return configuration; + } + + + protected static void CheckConfiguration(PersistenceBackendConfiguration configuration, Boolean reload) throws IOException{ + logger.trace("CheckConfiguration reload:{}",reload); + Integer delay = null; + Integer maxRecordNumber = null; + Integer maxRecordTime = null; + try { + ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues(); + Properties prop = properties.getPropValues(); + + if (prop != null) { + // get value from properties file + logger.trace("Configuration from properties file"); + try { + delay = Integer.parseInt(prop.getProperty("delay")); + } catch (Exception e) { + logger.trace("Configuration from properties file, not found a delay value"); + } + try { + maxRecordNumber = Integer.parseInt(prop + .getProperty("maxrecordnumber")); + } catch (Exception e) { + logger.trace("Configuration from properties file, not found a maxRecordNumber value"); + } + try { + maxRecordTime = Integer.parseInt(prop.getProperty("maxtimenumber")) * 1000 * 60; + } catch (Exception e) { + logger.trace("Configuration from properties file, not found a maxRecordTime value"); + } + + } else { + if (configuration != null) { + // get value from service end point + logger.trace("Configuration from service end point"); + try { + delay = Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME)); + } catch (Exception e) { + logger.trace("Configuration from service end point, not found delay value"); + } + try { + maxRecordTime = Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME)) * 1000 * 60; + } catch (Exception e) { + logger.trace("Configuration from service end point, not found maxRecordTime value"); + } + try { + maxRecordNumber = Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER)); + } catch (Exception e) { + logger.trace("Configuration from service end point, not found maxRecordNumber value"); + } + } + } + } catch (Exception e) { + logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",e.getLocalizedMessage()), e); + } + + if(reload){ + if ((delay != delaySet) ||(maxRecordNumber!=maxRecordsNumberSet)||((maxRecordTime )!=OldRecordMaxTimeElapsedSet)){ + logger.trace("reloadConfiguration delay/delaySet:{}/{},maxRecordNumber/maxRecordsNumberSet:{}/{}),maxRecordTime/OldRecordMaxTimeElapsedSet:{}/{}" + ,delay,delaySet, + maxRecordNumber,maxRecordsNumberSet, + maxRecordTime,OldRecordMaxTimeElapsedSet); + changeConfiguration = true; + } + } + + if (delay != null) { + delaySet = delay; + initialDelaySet = delay; + } else { + delaySet = DELAY; + initialDelaySet = INITIAL_DELAY; + } + + if (maxRecordNumber != null) { + maxRecordsNumberSet = maxRecordNumber; + } else { + maxRecordsNumberSet = MAX_RECORDS_NUMBER; + } + if (maxRecordTime != null) { + OldRecordMaxTimeElapsedSet = maxRecordTime; + } else { + OldRecordMaxTimeElapsedSet = OLD_RECORD_MAX_TIME_ELAPSED; + } + + timeLoadConfiguration = Calendar.getInstance().getTimeInMillis(); + } + + + /* + * (non-Javadoc) + * * @see java.lang.Runnable#run() */ @Override @@ -380,9 +431,9 @@ public abstract class AggregationScheduler implements Runnable { try { this.flush(persistenceExecutor); } catch (Exception e) { - logger.error("Error flushing Buffered Records",e); + logger.error("Error flushing Buffered Records", e); } + } - + } -