infrastructure-tests/src/test/java/org/gcube/documentstore/records/aggregation/AggregationSchedulerTest.java

166 lines
5.9 KiB
Java

/**
*
*/
package org.gcube.documentstore.records.aggregation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.testutility.ScopedTest;
import org.gcube.testutility.StressTestUtility;
import org.gcube.testutility.TestOperation;
import org.gcube.testutility.TestUsageRecord;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public class AggregationSchedulerTest extends ScopedTest {
private static final Logger logger = LoggerFactory.getLogger(AggregationSchedulerTest.class);
public static AggregationScheduler getAggregationScheduler(){
return new BufferAggregationScheduler(persistenceExecutor, "TEST");
}
public static PersistenceExecutor persistenceExecutor = new PersistenceExecutor(){
@Override
public void persist(Record... records) throws Exception {
for(Record record : records){
logger.debug("Storing : {}", record.toString());
}
}
};
protected void madeAssertion(Map<String, List<Record>> bufferedRecords, List<String> types, int size, int count) {
for(String type : types){
Assert.assertTrue(bufferedRecords.containsKey(type));
List<Record> records = bufferedRecords.get(type);
Assert.assertTrue(records.size()==size);
for(Record record : records){
Assert.assertTrue(record.getRecordType().compareTo(type)==0);
Assert.assertTrue(record instanceof AggregatedRecord);
@SuppressWarnings("rawtypes")
AggregatedRecord aggregatedRecord = (AggregatedRecord) record;
Assert.assertTrue(aggregatedRecord.getOperationCount()==count);
}
}
}
@Test
public void stressTestAggregableURSingleType() throws Exception {
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) throws Exception {
UsageRecord usageRecord = TestUsageRecord.createTestServiceUsageRecord();
aggregationScheduler.aggregate(usageRecord, persistenceExecutor);
}
});
List<String> types = new ArrayList<String>();
String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType();
types.add(serviceUsageRecordtype);
madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS);
aggregationScheduler.flush(persistenceExecutor);
}
public static final String ALTERNATIVE_SERVICE_CLASS = "AlternativeServiceClass";
@Test
public void stressTestDifferentAggregableURSingleType() throws Exception {
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) throws Exception {
ServiceUsageRecord usageRecord = TestUsageRecord.createTestServiceUsageRecord();
if(i%2==0){
usageRecord.setServiceClass(ALTERNATIVE_SERVICE_CLASS);
}
aggregationScheduler.aggregate(usageRecord, persistenceExecutor);
}
});
List<String> types = new ArrayList<String>();
String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType();
types.add(serviceUsageRecordtype);
madeAssertion(aggregationScheduler.bufferedRecords, types, 2, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/2);
aggregationScheduler.flush(persistenceExecutor);
}
@Test
public void stressTestDifferentAggregableURTwoType() throws Exception {
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) throws Exception {
UsageRecord usageRecord;
if(i%2==0){
usageRecord = TestUsageRecord.createTestServiceUsageRecord();
}else{
usageRecord = TestUsageRecord.createTestStorageUsageRecord();
}
aggregationScheduler.aggregate(usageRecord, persistenceExecutor);
}
});
List<String> types = new ArrayList<String>();
String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType();
String storageUsageRecordtype = TestUsageRecord.createTestStorageUsageRecord().getRecordType();
types.add(serviceUsageRecordtype);
types.add(storageUsageRecordtype);
madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/2);
aggregationScheduler.flush(persistenceExecutor);
}
@Test
public void stressTestDifferentAggregableURMultipleType() throws Exception {
final AggregationScheduler aggregationScheduler = getAggregationScheduler();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) throws Exception {
UsageRecord usageRecord;
switch (i%3) {
case 0:
usageRecord = TestUsageRecord.createTestServiceUsageRecord();
break;
case 1:
usageRecord = TestUsageRecord.createTestStorageUsageRecord();
break;
case 2:
usageRecord = TestUsageRecord.createTestJobUsageRecord();
break;
default:
usageRecord = TestUsageRecord.createTestJobUsageRecord();
}
aggregationScheduler.aggregate(usageRecord, persistenceExecutor);
}
});
/*
List<String> types = new ArrayList<String>();
String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecordAutomaticScope().getRecordType();
String storageUsageRecordtype = TestUsageRecord.createTestStorageUsageRecordAutomaticScope().getRecordType();
types.add(serviceUsageRecordtype);
types.add(storageUsageRecordtype);
madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/4);
*/
aggregationScheduler.flush(persistenceExecutor);
}
}