Properly terminating RegexRulesAggregator scheduled thread.

This commit is contained in:
Luca Frosini 2020-03-06 17:57:44 +01:00
parent 04b2b6e89b
commit c4b6808b36
2 changed files with 37 additions and 13 deletions

View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.validations.validators.MatcherReplace; import org.gcube.accounting.datamodel.validations.validators.MatcherReplace;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.documentstore.persistence.ExecutorUtils;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,19 +22,19 @@ public class RegexRulesAggregator implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RegexRulesAggregator.class); private static final Logger logger = LoggerFactory.getLogger(RegexRulesAggregator.class);
private static final ScheduledExecutorService REGEX_REDISCOVERY_POOL; public static final ScheduledExecutorService REGEX_REDISCOVERY_POOL;
protected static final String DELAY = "delay"; protected static final String DELAY = "delay";
protected static final String TIME_UNIT = "timeUnit"; protected static final String TIME_UNIT = "timeUnit";
protected static final String REPLACE_RULES = "MatcherReplaceRules"; protected static final String MATCHER_REPLACE_RULES = "MatcherReplaceRules";
static { static {
REGEX_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() { REGEX_REDISCOVERY_POOL = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0; private int counter = 0;
private static final String prefix = "RegexRediscoveryThread"; private static final String prefix = "AggregationRegexRediscoveryThread";
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++); return new Thread(r, prefix + "-" + counter++);
@ -53,6 +54,10 @@ public class RegexRulesAggregator implements Runnable {
protected static RegexRulesAggregator instance; protected static RegexRulesAggregator instance;
protected boolean changeRate;
protected ScheduledFuture<?> reloadAggregatorRules;
public synchronized static RegexRulesAggregator getInstance() { public synchronized static RegexRulesAggregator getInstance() {
if(instance==null) { if(instance==null) {
instance = new RegexRulesAggregator(); instance = new RegexRulesAggregator();
@ -62,6 +67,7 @@ public class RegexRulesAggregator implements Runnable {
protected RegexRulesAggregator() { protected RegexRulesAggregator() {
matcherReplaceList = new ArrayList<>(); matcherReplaceList = new ArrayList<>();
changeRate = false;
readConfiguration(); readConfiguration();
} }
@ -79,23 +85,35 @@ public class RegexRulesAggregator implements Runnable {
} }
protected ScheduledFuture<?> reloadAggregatorRules;
public void readConfiguration() { public void readConfiguration() {
try { try {
accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(this.getClass()); accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(this.getClass());
try { try {
long oldDelay = delay;
String delayString = accountingPersistenceConfiguration.getProperty(DELAY); String delayString = accountingPersistenceConfiguration.getProperty(DELAY);
delay = Long.parseLong(delayString); delay = Long.parseLong(delayString);
if(oldDelay!=delay) {
changeRate = true;
}
TimeUnit oldTimeUnit = timeUnit;
String timeUnitString = accountingPersistenceConfiguration.getProperty(TIME_UNIT); String timeUnitString = accountingPersistenceConfiguration.getProperty(TIME_UNIT);
timeUnit = TimeUnit.valueOf(timeUnitString.toUpperCase()); timeUnit = TimeUnit.valueOf(timeUnitString.toUpperCase());
if(oldTimeUnit.ordinal() != timeUnit.ordinal()) {
changeRate = true;
}
}catch (Exception e) { }catch (Exception e) {
logger.warn("Unable to retrieve regex reload delay. Goign to use last known delay {} {}", delay, timeUnit.name().toLowerCase()); logger.warn("Unable to retrieve regex reload delay. Goign to use last known delay {} {}", delay, timeUnit.name().toLowerCase());
} }
String rulesString = accountingPersistenceConfiguration.getProperty(REPLACE_RULES); String rulesString = accountingPersistenceConfiguration.getProperty(MATCHER_REPLACE_RULES);
ObjectMapper mapper = DSMapper.getObjectMapper(); ObjectMapper mapper = DSMapper.getObjectMapper();
JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, MatcherReplace.class); JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, MatcherReplace.class);
@ -112,9 +130,9 @@ public class RegexRulesAggregator implements Runnable {
@Override @Override
public void run() { public void run() {
readConfiguration(); readConfiguration();
} if(changeRate) {
stop();
public void start() { }
if(reloadAggregatorRules == null) { if(reloadAggregatorRules == null) {
reloadAggregatorRules = REGEX_REDISCOVERY_POOL.scheduleAtFixedRate(this, delay, delay, timeUnit); reloadAggregatorRules = REGEX_REDISCOVERY_POOL.scheduleAtFixedRate(this, delay, delay, timeUnit);
} }
@ -134,4 +152,9 @@ public class RegexRulesAggregator implements Runnable {
} }
} }
public static void shutdown() {
RegexRulesAggregator.getInstance().stop();
ExecutorUtils.shutDownPool(RegexRulesAggregator.REGEX_REDISCOVERY_POOL);
}
} }

View File

@ -7,6 +7,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.RegexRulesAggregator;
import org.gcube.accounting.datamodel.BasicUsageRecord; import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
@ -16,7 +17,7 @@ import org.gcube.documentstore.persistence.PersistenceBackendFactory;
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
public class AccountingPersistenceFactory { public class AccountingPersistenceFactory {
private AccountingPersistenceFactory(){} private AccountingPersistenceFactory(){}
protected final static Map<String, AccountingPersistence> persistences; protected final static Map<String, AccountingPersistence> persistences;
@ -24,7 +25,6 @@ public class AccountingPersistenceFactory {
public static void initAccountingPackages(){ public static void initAccountingPackages(){
PersistenceBackendFactory.addRecordPackage(ServiceUsageRecord.class.getPackage()); PersistenceBackendFactory.addRecordPackage(ServiceUsageRecord.class.getPackage());
PersistenceBackendFactory.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); PersistenceBackendFactory.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage());
} }
static { static {
@ -60,12 +60,13 @@ public class AccountingPersistenceFactory {
AccountingPersistenceFactory.flushAll(); AccountingPersistenceFactory.flushAll();
} }
/**
* Flush all accounting data and shutdown connection and scheduled thread
*/
public static void shutDown(){ public static void shutDown(){
//flush all and shutdown connection and thread
PersistenceBackendFactory.flushAll(); PersistenceBackendFactory.flushAll();
PersistenceBackendFactory.shutdown(); PersistenceBackendFactory.shutdown();
RegexRulesAggregator.shutdown();
} }
/** /**