accounting-lib/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBacken...

166 lines
4.9 KiB
Java

/**
*
*/
package org.gcube.accounting.persistence;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.common.scope.impl.ScopeBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class AccountingPersistenceBackendFactory {
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendFactory.class);
public final static String HOME_SYSTEM_PROPERTY = "user.home";
private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
private static String fallbackLocation;
private static ReentrantLock lock = new ReentrantLock();
private static ReentrantLock persistenceLock = new ReentrantLock();
private static Map<String, AccountingPersistenceBackend> persistencePersistenceBackends;
static {
persistencePersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
}
private static File file(File file) throws IllegalArgumentException {
if(!file.isDirectory()){
file = file.getParentFile();
}
//create folder structure if not exist
if (!file.exists())
file.mkdirs();
return file;
}
protected static void setFallbackLocation(String path){
lock.lock();
try{
if(fallbackLocation == null){
if(path==null){
path = System.getProperty(HOME_SYSTEM_PROPERTY);
}
file(new File(path));
fallbackLocation = path;
}
}finally{
lock.unlock();
}
}
protected static AccountingPersistenceBackend getPersistenceBackend() {
String scope = ScopeProvider.instance.get();
if(scope==null){
logger.error("No Scope available. FallbackPersistence will be used");
File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
return new FallbackPersistence(fallbackFile);
}
persistenceLock.lock();
AccountingPersistenceBackend persistence = persistencePersistenceBackends.get(scope);
if(persistence==null){
FallbackPersistence fallbackPersistence;
try{
ScopeBean bean = new ScopeBean(scope);
/*
if(bean.is(Type.VRE)){
bean = bean.enclosingScope();
}
*/
String name = bean.name();
File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
fallbackPersistence = new FallbackPersistence(fallbackFile);
// set only to avoid deadlock
persistencePersistenceBackends.put(scope, fallbackPersistence);
}finally{
persistenceLock.unlock();
}
try {
ServiceLoader<AccountingPersistenceBackend> serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){
continue;
}
try {
String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
logger.debug("Testing {}", foundPersistenceClassName);
AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
foundPersistence.prepareConnection(configuration);
/*
* Uncomment the following line of code if you want to try
* to create a test UsageRecord before setting the
* foundPersistence as default
*
* foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
*/
persistence = foundPersistence;
logger.debug("{} will be used.", foundPersistenceClassName);
break;
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
}
} if(persistence==null){
persistence = fallbackPersistence;
}
} catch(Exception e){
logger.error("Unable to instance a Persistence Implementation. Using fallback as default",
e);
persistence = fallbackPersistence;
}
persistence.setAggregationScheduler(AggregationScheduler.newInstance());
persistence.setFallback(fallbackPersistence);
persistencePersistenceBackends.put(scope, persistence);
} else{
persistenceLock.unlock();
}
return persistence;
}
/**
* @param timeout
* @param timeUnit
* @throws Exception
*/
public static void flush(long timeout, TimeUnit timeUnit) {
for(String scope : persistencePersistenceBackends.keySet()){
AccountingPersistenceBackend apb =
persistencePersistenceBackends.get(scope);
try {
logger.debug("Flushing records in scope {}", scope);
apb.flush(timeout, timeUnit);
}catch(Exception e){
logger.error("Unable to flush records in scope {} with {}", scope, apb);
}
}
}
}