refs #200: Create accouting-lib library

https://support.d4science.org/issues/200
Implementing buffer strategy

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115632 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-06-30 09:16:08 +00:00
parent ba405646d3
commit fd5bce26c2
4 changed files with 52 additions and 30 deletions

View File

@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.persistence.PersistenceExecutor;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -28,13 +29,13 @@ public abstract class AggregationScheduler {
public List<UsageRecord> getRecords() {
return records;
}
protected AggregationScheduler(){
this.records = new ArrayList<UsageRecord>();
}
protected void madeAggregation(UsageRecord UsageRecord){
// TODO
}
/**
@ -42,10 +43,14 @@ public abstract class AggregationScheduler {
* Usage Record.
* @param usageRecord the Usage Record To Buffer
* @return true if is time to persist the buffered Usage Record
* @throws Exception if fails
*/
public synchronized boolean aggregate(UsageRecord UsageRecord) {
public synchronized void aggregate(UsageRecord UsageRecord, PersistenceExecutor persistenceExecutor) throws Exception {
madeAggregation(UsageRecord);
return isTimeToPersist();
if(isTimeToPersist()){
persistenceExecutor.persist(records.toArray(new UsageRecord[records.size()]));
}
}

View File

@ -40,7 +40,7 @@ public class BufferAggregationScheduler extends AggregationScheduler {
* {@inheritDoc}
*/
@Override
public synchronized boolean isTimeToPersist(){
public boolean isTimeToPersist(){
long now = Calendar.getInstance().getTimeInMillis();
if(firstOfBuffer){

View File

@ -4,7 +4,6 @@
package org.gcube.accounting.persistence;
import java.io.File;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -129,27 +128,26 @@ public abstract class Persistence {
* This method contains the code to save the {@link #UsageRecord}
*
*/
protected abstract void reallyAccount(UsageRecord usageRecord) throws Exception;
protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
private void accountWithFallback(UsageRecord usageRecord) throws Exception {
private void accountWithFallback(UsageRecord... usageRecords) {
String persistenceName = getInstance().getClass().getSimpleName();
try {
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
persistence.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}.", usageRecord, persistenceName);
} catch (Exception e) {
String fallabackPersistenceName = fallback.getClass().getSimpleName();
for(UsageRecord usageRecord : usageRecords){
try {
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
usageRecord, persistenceName, fallabackPersistenceName);
fallback.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}",
usageRecord, fallabackPersistenceName);
}catch(Exception ex){
logger.error("{} was not accounted at all", usageRecord);
throw e;
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
persistence.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
} catch (Exception e) {
String fallabackPersistenceName = fallback.getClass().getSimpleName();
try {
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
usageRecord.toString(), persistenceName, fallabackPersistenceName);
fallback.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}",
usageRecord.toString(), fallabackPersistenceName);
}catch(Exception ex){
logger.error("{} was not accounted at all", usageRecord.toString());
}
}
}
}
@ -169,13 +167,16 @@ public abstract class Persistence {
public void run(){
try {
usageRecord.validate();
boolean timeToPersist = aggregationScheduler.aggregate(usageRecord);
if(timeToPersist){
List<UsageRecord> records = aggregationScheduler.getRecords();
for(UsageRecord record : records){
persistence.accountWithFallback(record);
aggregationScheduler.aggregate(usageRecord, new PersistenceExecutor(){
@Override
public void persist(UsageRecord... usageRecords) throws Exception {
persistence.accountWithFallback(usageRecords);
}
}
});
} catch (InvalidValueException e) {
logger.error("Error validating UsageRecord", e.getCause());
} catch (Exception e) {

View File

@ -0,0 +1,16 @@
/**
*
*/
package org.gcube.accounting.persistence;
import org.gcube.accounting.datamodel.UsageRecord;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface PersistenceExecutor {
public void persist(UsageRecord... usageRecords)throws Exception;
}