180 lines
6.3 KiB
Java
180 lines
6.3 KiB
Java
/**
|
|
*
|
|
*/
|
|
package org.gcube.accounting.persistence;
|
|
|
|
import java.io.File;
|
|
import java.util.Calendar;
|
|
import java.util.HashMap;
|
|
import java.util.Map;
|
|
import java.util.ServiceLoader;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
|
import org.gcube.common.scope.api.ScopeProvider;
|
|
import org.gcube.common.scope.impl.ScopeBean;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
/**
|
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
|
*
|
|
*/
|
|
public abstract class AccountingPersistenceBackendFactory {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendFactory.class);
|
|
|
|
public final static String HOME_SYSTEM_PROPERTY = "user.home";
|
|
|
|
private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
|
|
|
|
private static String fallbackLocation;
|
|
|
|
private static Map<String, AccountingPersistenceBackend> persistencePersistenceBackends;
|
|
|
|
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
|
|
|
|
protected static Map<String,Long> fallbackLastCheck;
|
|
|
|
static {
|
|
persistencePersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
|
|
fallbackLastCheck = new HashMap<String, Long>();
|
|
}
|
|
|
|
private static File file(File file) throws IllegalArgumentException {
|
|
if(!file.isDirectory()){
|
|
file = file.getParentFile();
|
|
}
|
|
// Create folder structure if not exist
|
|
if (!file.exists()) {
|
|
file.mkdirs();
|
|
}
|
|
return file;
|
|
}
|
|
|
|
protected synchronized static void setFallbackLocation(String path){
|
|
if(fallbackLocation == null){
|
|
if(path==null){
|
|
path = System.getProperty(HOME_SYSTEM_PROPERTY);
|
|
}
|
|
file(new File(path));
|
|
fallbackLocation = path;
|
|
}
|
|
}
|
|
|
|
protected static AccountingPersistenceBackend discoverAccountingPersistenceBackend(String scope){
|
|
ServiceLoader<AccountingPersistenceBackend> serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
|
|
for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
|
|
if(foundPersistence instanceof FallbackPersistence){
|
|
continue;
|
|
}
|
|
try {
|
|
String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
|
|
logger.debug("Testing {}", foundPersistenceClassName);
|
|
AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
|
|
foundPersistence.prepareConnection(configuration);
|
|
/*
|
|
* Uncomment the following line of code if you want to try
|
|
* to create a test UsageRecord before setting the
|
|
* foundPersistence as default
|
|
*
|
|
* foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
|
|
*/
|
|
logger.debug("{} will be used.", foundPersistenceClassName);
|
|
foundPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
|
|
return foundPersistence;
|
|
} catch (Exception e) {
|
|
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
|
|
}
|
|
}
|
|
return null;
|
|
};
|
|
|
|
protected static FallbackPersistence createFallback(String scope){
|
|
File fallbackFile = null;
|
|
if(scope!=null){
|
|
ScopeBean bean = new ScopeBean(scope);
|
|
/* if(bean.is(Type.VRE)){ bean = bean.enclosingScope(); } */
|
|
String name = bean.name();
|
|
fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
|
|
}else{
|
|
fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
|
|
}
|
|
FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
|
|
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
|
|
return fallbackPersistence;
|
|
}
|
|
|
|
|
|
protected static synchronized AccountingPersistenceBackend getPersistenceBackend() {
|
|
String scope = ScopeProvider.instance.get();
|
|
|
|
if(scope==null){
|
|
logger.error("No Scope available. FallbackPersistence will be used");
|
|
return createFallback(null);
|
|
}
|
|
|
|
AccountingPersistenceBackend persistence = persistencePersistenceBackends.get(scope);
|
|
if(persistence==null){
|
|
persistence = discoverAccountingPersistenceBackend(scope);
|
|
|
|
if(persistence==null){
|
|
logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistence.class.getSimpleName());
|
|
long now = Calendar.getInstance().getTimeInMillis();
|
|
fallbackLastCheck.put(scope, now);
|
|
persistence = createFallback(scope);
|
|
}
|
|
|
|
persistencePersistenceBackends.put(scope, persistence);
|
|
|
|
} else {
|
|
if(persistence instanceof FallbackPersistence && fallbackLastCheck.get(scope)!=null){
|
|
long now = Calendar.getInstance().getTimeInMillis();
|
|
Long lastCheckTimestamp = fallbackLastCheck.get(scope);
|
|
if(lastCheckTimestamp <= (now + FALLBACK_RETRY_TIME)){
|
|
logger.debug("The {} for scope {} is {}. Is time to rediscover if there is another possibility.",
|
|
AccountingPersistenceBackend.class.getSimpleName(), scope, persistence.getClass().getSimpleName());
|
|
|
|
AccountingPersistenceBackend discoveredPersistenceBackend = discoverAccountingPersistenceBackend(scope);
|
|
if(discoveredPersistenceBackend!=null){
|
|
discoveredPersistenceBackend.setAggregationScheduler(persistence.getAggregationScheduler());
|
|
fallbackLastCheck.remove(scope);
|
|
persistencePersistenceBackends.put(scope, discoveredPersistenceBackend);
|
|
try {
|
|
persistence.close();
|
|
} catch (Exception e) {
|
|
logger.error("Error closing {} for scope {} which has been substituted with {}.",
|
|
persistence.getClass().getSimpleName(), scope,
|
|
discoveredPersistenceBackend.getClass().getSimpleName(), e);
|
|
}
|
|
persistence = discoveredPersistenceBackend;
|
|
}else{
|
|
fallbackLastCheck.put(scope, now);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return persistence;
|
|
}
|
|
|
|
/**
|
|
* @param timeout
|
|
* @param timeUnit
|
|
* @throws Exception
|
|
*/
|
|
public static void flushAll(long timeout, TimeUnit timeUnit) {
|
|
for(String scope : persistencePersistenceBackends.keySet()){
|
|
AccountingPersistenceBackend apb = persistencePersistenceBackends.get(scope);
|
|
try {
|
|
logger.debug("Flushing records in scope {}", scope);
|
|
apb.flush(timeout, timeUnit);
|
|
}catch(Exception e){
|
|
logger.error("Unable to flush records in scope {} with {}", scope, apb);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
}
|