accounting-lib/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java

159 lines
4.8 KiB
Java

/**
*
*/
package org.gcube.accounting.persistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public abstract class AccountingPersistence {
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistence.class);
protected FallbackPersistence fallback;
protected AggregationScheduler aggregationScheduler;
/**
* Pool for thread execution
*/
private ExecutorService pool;
protected AccountingPersistence(){
this.pool = Executors.newCachedThreadPool();
}
protected AccountingPersistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
this.fallback = fallback;
this.aggregationScheduler = aggregationScheduler;
this.pool = Executors.newCachedThreadPool();
}
/**
* @param fallback the fallback to set
*/
protected void setFallback(FallbackPersistence fallback) {
this.fallback = fallback;
}
/**
* @param aggregationScheduler the aggregationScheduler to set
*/
protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
this.aggregationScheduler = aggregationScheduler;
}
/**
* Prepare the connection to persistence.
* This method must be used by implementation class to open
* the connection with the persistence storage, DB, file etc.
* @param configuration The configuration to create the connection
* @throws Exception if fails
*/
protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception;
/**
* This method contains the code to save the {@link #UsageRecord}
*
*/
protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
private void accountWithFallback(UsageRecord... usageRecords) {
String persistenceName = this.getClass().getSimpleName();
for(UsageRecord usageRecord : usageRecords){
try {
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
this.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
} catch (Exception e) {
String fallabackPersistenceName = fallback.getClass().getSimpleName();
try {
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
usageRecord.toString(), persistenceName, fallabackPersistenceName);
fallback.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}",
usageRecord.toString(), fallabackPersistenceName);
}catch(Exception ex){
logger.error("{} was not accounted at all", usageRecord.toString());
}
}
}
}
protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
try {
if(validate){
usageRecord.validate();
}
if(aggregate){
final AccountingPersistence persistence = this;
aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){
@Override
public void persist(UsageRecord... usageRecords) throws Exception {
persistence.accountWithFallback(usageRecords);
}
});
}else{
this.accountWithFallback(usageRecord);
}
} catch (InvalidValueException e) {
logger.error("Error validating UsageRecord", e.getCause());
} catch (Exception e) {
logger.error("Error accounting UsageRecord", e.getCause());
}
}
/**
* Persist the {@link #UsageRecord}.
* The Record is validated first, then accounted, in a separated thread.
* So that the program can continue the execution.
* If the persistence fails the class write that the record in a local file
* so that the {@link #UsageRecord} can be recorder later.
* @param usageRecord the {@link #UsageRecord} to persist
* @throws InvalidValueException if the Record Validation Fails
*/
public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
Runnable runnable = new Runnable(){
@Override
public void run(){
validateAccountAggregate(usageRecord, true, true);
}
};
pool.execute(runnable);
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
pool.awaitTermination(timeout, timeUnit);
final AccountingPersistence persistence = this;
aggregationScheduler.flush(new AccountingPersistenceExecutor(){
@Override
public void persist(UsageRecord... usageRecords) throws Exception {
persistence.accountWithFallback(usageRecords);
}
});
}
public abstract void close() throws Exception;
}