refs #200: Create accouting-lib library
https://support.d4science.org/issues/200 Implementing buffer strategy git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@115626 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
907d31bb4f
commit
ba405646d3
|
@ -0,0 +1,54 @@
|
||||||
|
package org.gcube.accounting.datamodel.aggregation.scheduler;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.gcube.accounting.datamodel.UsageRecord;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AggregationScheduler {
|
||||||
|
|
||||||
|
protected static AggregationScheduler aggregationScheduler;
|
||||||
|
|
||||||
|
public static AggregationScheduler getInstance(){
|
||||||
|
if(aggregationScheduler==null){
|
||||||
|
aggregationScheduler = new BufferAggregationScheduler();
|
||||||
|
}
|
||||||
|
return aggregationScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<UsageRecord> records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the records
|
||||||
|
*/
|
||||||
|
public List<UsageRecord> getRecords() {
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AggregationScheduler(){
|
||||||
|
this.records = new ArrayList<UsageRecord>();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void madeAggregation(UsageRecord UsageRecord){
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an usage records and try to aggregate with other buffered
|
||||||
|
* Usage Record.
|
||||||
|
* @param usageRecord the Usage Record To Buffer
|
||||||
|
* @return true if is time to persist the buffered Usage Record
|
||||||
|
*/
|
||||||
|
public synchronized boolean aggregate(UsageRecord UsageRecord) {
|
||||||
|
madeAggregation(UsageRecord);
|
||||||
|
return isTimeToPersist();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected abstract boolean isTimeToPersist();
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.gcube.accounting.datamodel.aggregation.scheduler;
|
||||||
|
|
||||||
|
import java.util.Calendar;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
|
*
|
||||||
|
* This class implements a Simple Buffer with timeout strategy.
|
||||||
|
* It buffer a predefined number of Records before invoking a persistence.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class BufferAggregationScheduler extends AggregationScheduler {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define the MAX number of Record to buffer.
|
||||||
|
* TODO Get from configuration
|
||||||
|
*/
|
||||||
|
protected final static int MAX_RECORDS_NUMBER = 15;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Max amount of time elapsed form last record before after that
|
||||||
|
* the buffered record are persisted even if
|
||||||
|
* TODO Get from configuration
|
||||||
|
*/
|
||||||
|
protected final static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*5; // 5 min
|
||||||
|
|
||||||
|
protected boolean firstOfBuffer;
|
||||||
|
protected long firstBufferedTime;
|
||||||
|
|
||||||
|
protected BufferAggregationScheduler(){
|
||||||
|
super();
|
||||||
|
this.firstOfBuffer = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized boolean isTimeToPersist(){
|
||||||
|
long now = Calendar.getInstance().getTimeInMillis();
|
||||||
|
|
||||||
|
if(firstOfBuffer){
|
||||||
|
firstOfBuffer = false;
|
||||||
|
firstBufferedTime = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(records.size() >= MAX_RECORDS_NUMBER){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if((now - firstBufferedTime) >= OLD_RECORD_MAX_TIME_ELAPSED){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -4,6 +4,7 @@
|
||||||
package org.gcube.accounting.persistence;
|
package org.gcube.accounting.persistence;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.List;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -11,6 +12,7 @@ import java.util.concurrent.Executors;
|
||||||
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.TestUsageRecord;
|
import org.gcube.accounting.datamodel.TestUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.UsageRecord;
|
import org.gcube.accounting.datamodel.UsageRecord;
|
||||||
|
import org.gcube.accounting.datamodel.aggregation.scheduler.AggregationScheduler;
|
||||||
import org.gcube.accounting.exception.InvalidValueException;
|
import org.gcube.accounting.exception.InvalidValueException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -29,6 +31,7 @@ public abstract class Persistence {
|
||||||
*/
|
*/
|
||||||
protected static Persistence persistence;
|
protected static Persistence persistence;
|
||||||
protected static FallbackPersistence fallback;
|
protected static FallbackPersistence fallback;
|
||||||
|
protected static AggregationScheduler aggregationScheduler;
|
||||||
|
|
||||||
private static File file(File file) throws IllegalArgumentException {
|
private static File file(File file) throws IllegalArgumentException {
|
||||||
|
|
||||||
|
@ -79,6 +82,7 @@ public abstract class Persistence {
|
||||||
e.getCause());
|
e.getCause());
|
||||||
persistence = fallback;
|
persistence = fallback;
|
||||||
}
|
}
|
||||||
|
aggregationScheduler = AggregationScheduler.getInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -127,7 +131,9 @@ public abstract class Persistence {
|
||||||
*/
|
*/
|
||||||
protected abstract void reallyAccount(UsageRecord usageRecord) throws Exception;
|
protected abstract void reallyAccount(UsageRecord usageRecord) throws Exception;
|
||||||
|
|
||||||
private void accountWithFallback(SingleUsageRecord usageRecord) throws Exception {
|
|
||||||
|
|
||||||
|
private void accountWithFallback(UsageRecord usageRecord) throws Exception {
|
||||||
String persistenceName = getInstance().getClass().getSimpleName();
|
String persistenceName = getInstance().getClass().getSimpleName();
|
||||||
try {
|
try {
|
||||||
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
|
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
|
||||||
|
@ -163,9 +169,17 @@ public abstract class Persistence {
|
||||||
public void run(){
|
public void run(){
|
||||||
try {
|
try {
|
||||||
usageRecord.validate();
|
usageRecord.validate();
|
||||||
persistence.accountWithFallback(usageRecord);
|
boolean timeToPersist = aggregationScheduler.aggregate(usageRecord);
|
||||||
|
if(timeToPersist){
|
||||||
|
List<UsageRecord> records = aggregationScheduler.getRecords();
|
||||||
|
for(UsageRecord record : records){
|
||||||
|
persistence.accountWithFallback(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InvalidValueException e) {
|
||||||
|
logger.error("Error validating UsageRecord", e.getCause());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error accouting UsageRecod", e.getCause());
|
logger.error("Error accounting UsageRecord", e.getCause());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue