Reorganizing library

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@119712 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-10-14 08:46:13 +00:00
parent 623f6955e3
commit dcf3e3ec1c
14 changed files with 181 additions and 53 deletions

View File

@ -2,6 +2,7 @@ package org.gcube.accounting.aggregation.scheduler;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -26,14 +27,8 @@ public abstract class AggregationScheduler {
private static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
protected static AggregationScheduler aggregationScheduler;
static {
aggregationScheduler = new BufferAggregationScheduler();
}
public static AggregationScheduler getInstance(){
return aggregationScheduler;
public static AggregationScheduler newInstance(){
return new BufferAggregationScheduler();
}
protected int totalBufferedRecords;
@ -114,6 +109,7 @@ public abstract class AggregationScheduler {
for(AggregationStrategy aggregationStrategy : aggregationStrategies){
try {
aggregationStrategy.aggregate((SingleUsageRecord) usageRecord);
logger.trace("{} has been used for aggregation. Aggregated Record is {}", aggregationStrategy, aggregationStrategy.getAggregatedUsageRecord());
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
@ -175,6 +171,7 @@ public abstract class AggregationScheduler {
protected synchronized void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
if(usageRecord!=null){
logger.trace("Trying to aggregate {}", usageRecord);
madeAggregation(usageRecord);
}
@ -194,6 +191,7 @@ public abstract class AggregationScheduler {
i++;
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
@ -210,6 +208,7 @@ public abstract class AggregationScheduler {
* @throws Exception if fails
*/
public void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor) throws Exception {
logger.trace("Going to aggregate {}", usageRecord);
aggregate(usageRecord, persistenceExecutor, false);
}

View File

@ -4,26 +4,17 @@
package org.gcube.accounting.datamodel.basetypes;
import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Calendar;
import java.util.Map;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.decorators.ComputedField;
import org.gcube.accounting.datamodel.decorators.FieldAction;
import org.gcube.accounting.datamodel.decorators.FieldDecorator;
import org.gcube.accounting.datamodel.decorators.RequiredField;
import org.gcube.accounting.datamodel.deprecationmanagement.annotations.MoveToOperationResult;
import org.gcube.accounting.datamodel.validations.annotations.NotEmpty;
import org.gcube.accounting.datamodel.validations.annotations.ValidInteger;
import org.gcube.accounting.datamodel.validations.annotations.ValidLong;
import org.gcube.accounting.exception.InvalidValueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -31,7 +22,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractJobUsageRecord extends BasicUsageRecord {
private static Logger logger = LoggerFactory.getLogger(AbstractJobUsageRecord.class);
//private static final Logger logger = LoggerFactory.getLogger(AbstractJobUsageRecord.class);
/**
* Generated Serial Version UID
@ -59,27 +50,6 @@ public abstract class AbstractJobUsageRecord extends BasicUsageRecord {
@ComputedField @ValidLong @CalculateWallDuration
protected static final String WALL_DURATION = "wallDuration";
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@FieldDecorator(managed=CalculateWallDurationAction.class)
protected @interface CalculateWallDuration {}
protected class CalculateWallDurationAction implements FieldAction {
@Override
public Serializable validate(String key, Serializable value, UsageRecord usageRecord) throws InvalidValueException {
try {
long wallDuration = calculateWallDuration();
if(key.compareTo(WALL_DURATION)==0){
logger.warn("{} is automatically computed using {} and {}. This invocation has the only effect of recalculating the value. Any provided value is ignored.",
WALL_DURATION, JOB_START_TIME, JOB_END_TIME);
value = wallDuration;
}
}catch(InvalidValueException e){ }
return value;
}
}
public AbstractJobUsageRecord(){
super();
}

View File

@ -0,0 +1,16 @@
/**
*
*/
package org.gcube.accounting.datamodel.basetypes;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.gcube.accounting.datamodel.decorators.FieldDecorator;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@FieldDecorator(managed=CalculateWallDurationAction.class)
public @interface CalculateWallDuration {}

View File

@ -0,0 +1,30 @@
/**
*
*/
package org.gcube.accounting.datamodel.basetypes;
import java.io.Serializable;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.decorators.FieldAction;
import org.gcube.accounting.exception.InvalidValueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CalculateWallDurationAction implements FieldAction {
private static final Logger logger = LoggerFactory.getLogger(CalculateWallDurationAction.class);
@Override
public Serializable validate(String key, Serializable value, UsageRecord usageRecord) throws InvalidValueException {
try {
long wallDuration = ((AbstractJobUsageRecord) usageRecord).calculateWallDuration();
if(key.compareTo(AbstractJobUsageRecord.WALL_DURATION)==0){
logger.warn("{} is automatically computed using {} and {}. This invocation has the only effect of recalculating the value. Any provided value is ignored.",
AbstractJobUsageRecord.WALL_DURATION, AbstractJobUsageRecord.JOB_START_TIME, AbstractJobUsageRecord.JOB_END_TIME);
value = wallDuration;
}
}catch(InvalidValueException e){ }
return value;
}
}

View File

@ -172,6 +172,7 @@ public class TestUsageRecord {
try {
usageRecord.setConsumerId(TEST_CONSUMER_ID);
usageRecord.setOperationResult(OperationResult.SUCCESS);
//usageRecord.setJobId(JOB_ID);
} catch (InvalidValueException e) {

View File

@ -40,7 +40,7 @@ public class AccountingPersistence {
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
AccountingPersistenceBackendFactory.getPersistenceBackend().flush(timeout, timeUnit);
AccountingPersistenceBackendFactory.flush(timeout, timeUnit);
}
public void close() throws Exception{

View File

@ -3,6 +3,7 @@
*/
package org.gcube.accounting.persistence;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -70,9 +71,10 @@ public abstract class AccountingPersistenceBackend {
private void accountWithFallback(UsageRecord... usageRecords) {
String persistenceName = this.getClass().getSimpleName();
logger.trace("Going to account {} using {} : {}", Arrays.toString(usageRecords), persistenceName, this);
for(UsageRecord usageRecord : usageRecords){
try {
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
logger.trace("Going to account {} using {} : {}", usageRecord, persistenceName, this);
this.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
} catch (Exception e) {
@ -93,8 +95,10 @@ public abstract class AccountingPersistenceBackend {
protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
try {
logger.debug("Received record to account : {}", usageRecord);
if(validate){
usageRecord.validate();
logger.trace("Record {} valid", usageRecord);
}
if(aggregate){
final AccountingPersistenceBackend persistence = this;

View File

@ -7,11 +7,11 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.common.scope.impl.ScopeBean;
import org.gcube.common.scope.impl.ScopeBean.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,9 +71,11 @@ public abstract class AccountingPersistenceBackendFactory {
if(persistence==null){
ScopeBean bean = new ScopeBean(scope);
/*
if(bean.is(Type.VRE)){
bean = bean.enclosingScope();
}
*/
String name = bean.name();
File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
@ -110,12 +112,31 @@ public abstract class AccountingPersistenceBackendFactory {
e);
persistence = fallbackPersistence;
}
persistence.setAggregationScheduler(AggregationScheduler.getInstance());
persistence.setAggregationScheduler(AggregationScheduler.newInstance());
persistence.setFallback(fallbackPersistence);
persistencePersistenceBackends.put(scope, persistence);
}
return persistence;
}
/**
* @param timeout
* @param timeUnit
* @throws Exception
*/
public static void flush(long timeout, TimeUnit timeUnit) {
for(String scope : persistencePersistenceBackends.keySet()){
AccountingPersistenceBackend apb =
persistencePersistenceBackends.get(scope);
try {
logger.debug("Flushing records in scope {}", scope);
apb.flush(timeout, timeUnit);
}catch(Exception e){
logger.error("Unable to flush records in scope {} with {}", scope, apb);
}
}
}
}

View File

@ -20,7 +20,7 @@ public class FallbackPersistence extends AccountingPersistenceBackend {
private File accountingFallbackFile;
protected FallbackPersistence(File accountingFallbackFile) {
super(null, AggregationScheduler.getInstance());
super(null, AggregationScheduler.newInstance());
this.accountingFallbackFile = accountingFallbackFile;
}

View File

@ -6,6 +6,7 @@ package org.gcube.accounting.aggregation;
import java.util.Set;
import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.junit.Assert;
import org.junit.Test;
@ -22,7 +23,7 @@ public class ServiceUsageRecordTest {
@Test
public void testRequiredFields() throws InvalidValueException {
org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord serviceUsageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope();
ServiceUsageRecord serviceUsageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope();
AggregatedServiceUsageRecord aggregatedServiceUsageRecord = new AggregatedServiceUsageRecord(serviceUsageRecord);

View File

@ -10,6 +10,7 @@ import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceExecutor;
import org.gcube.accounting.testutility.StressTestUtility;
import org.gcube.accounting.testutility.TestOperation;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,7 +24,7 @@ public class AggregationSchedulerTest {
private static final Logger logger = LoggerFactory.getLogger(AggregationSchedulerTest.class);
public static AggregationScheduler getAggregationScheduler(){
return AggregationScheduler.getInstance();
return AggregationScheduler.newInstance();
}
public static AccountingPersistenceExecutor persistenceExecutor = new AccountingPersistenceExecutor(){
@ -31,7 +32,7 @@ public class AggregationSchedulerTest {
@Override
public void persist(UsageRecord... usageRecords) throws Exception {
for(UsageRecord usageRecord : usageRecords){
logger.debug(usageRecord.toString());
logger.debug("Storing : {}", usageRecord.toString());
}
}
@ -39,6 +40,7 @@ public class AggregationSchedulerTest {
@Test
public void stressTestAggregableURSingleType() throws Exception {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
@ -54,6 +56,7 @@ public class AggregationSchedulerTest {
@Test
public void stressTestDifferentAggregableURSingleType() throws Exception {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
@ -70,6 +73,7 @@ public class AggregationSchedulerTest {
@Test
public void stressTestDifferentAggregableURTwoType() throws Exception {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
@ -88,6 +92,7 @@ public class AggregationSchedulerTest {
@Test
public void stressTestDifferentAggregableURMultipleType() throws Exception {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override

View File

@ -8,6 +8,7 @@ import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.gcube.accounting.exception.NotAggregatableRecordsExceptions;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -22,9 +23,11 @@ public class StorageUsageRecordAggregationStrategyTest {
private static Logger logger = LoggerFactory.getLogger(StorageUsageRecordAggregationStrategyTest.class);
@Test
public void secondAsNotAggregated() throws InvalidValueException, NotAggregatableRecordsExceptions {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
StorageUsageRecord storageUsageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope();
storageUsageRecord.setResourceProperty(TestUsageRecord.TEST_PROPERTY_NAME, TestUsageRecord.TEST_PROPERTY_VALUE);
storageUsageRecord.validate();
@ -55,7 +58,7 @@ public class StorageUsageRecordAggregationStrategyTest {
@Test
public void secondAsAggregated() throws InvalidValueException, NotAggregatableRecordsExceptions {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
StorageUsageRecord storageUsageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope();
storageUsageRecord.validate();
logger.debug("StorageUsageRecord : {}", storageUsageRecord);
@ -91,7 +94,7 @@ public class StorageUsageRecordAggregationStrategyTest {
@Test
public void aggregationStressTest() throws InvalidValueException, NotAggregatableRecordsExceptions {
ScopeProvider.instance.set(TestUsageRecord.TEST_SCOPE);
StorageUsageRecord storageUsageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope();
storageUsageRecord.setResourceProperty(TestUsageRecord.TEST_PROPERTY_NAME, TestUsageRecord.TEST_PROPERTY_VALUE);
storageUsageRecord.validate();

View File

@ -0,0 +1,69 @@
/**
*
*/
package org.gcube.accounting.persistence;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.gcube.accounting.testutility.StressTestUtility;
import org.gcube.accounting.testutility.TestOperation;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AccontingPersistence {
private static final Logger logger = LoggerFactory.getLogger(AccontingPersistence.class);
public static final String[] SCOPES = new String[]{
"/gcube",
"/gcube/devsec", "/gcube/devsec/devVRE",
"/gcube/devNext", "/gcube/devNext/NextNext"
};
public static final long timeout = 5000;
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void stressTest() throws Exception {
final AccountingPersistence persistence = AccountingPersistence.getInstance();
AccountingPersistenceBackendFactory.setFallbackLocation(null);
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {
int randomNumber = ThreadLocalRandom.current().nextInt(0, 5);
ScopeProvider.instance.set(SCOPES[randomNumber]);
SingleUsageRecord usageRecord = null;
switch (i%2) {
case 0:
usageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope();
break;
case 1:
usageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope();
break;
}
try {
usageRecord.setConsumerId(UUID.randomUUID().toString());
persistence.account(usageRecord);
} catch (InvalidValueException e) {
throw new RuntimeException(e);
}
}
});
logger.debug(" START -----------------------------------------------");
logger.debug("Flushing the buffered records");
persistence.flush(timeout, timeUnit);
logger.debug(" END -----------------------------------------------");
}
}

View File

@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.gcube.accounting.testutility.StressTestUtility;
import org.gcube.accounting.testutility.TestOperation;
import org.gcube.common.scope.api.ScopeProvider;
@ -17,7 +18,7 @@ import org.junit.Test;
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AccountingPersistenceTest {
public class AccountingPersistenceBackendTest {
public static final String[] SCOPES = new String[]{"/gcube", "/gcube/devNext"};
public static final String GCUBE_SCOPE = SCOPES[0];
@ -40,8 +41,13 @@ public class AccountingPersistenceTest {
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {
SingleUsageRecord usageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope();
persistence.validateAccountAggregate(usageRecord, true, false);
SingleUsageRecord usageRecord;
try {
usageRecord = TestUsageRecord.createTestServiceUsageRecordExplicitScope();
persistence.validateAccountAggregate(usageRecord, true, false);
} catch (InvalidValueException e) {
throw new RuntimeException(e);
}
}
}, 1);
@ -88,4 +94,7 @@ public class AccountingPersistenceTest {
persistence.flush(timeout, timeUnit);
}
}