/** * */ package org.gcube.documentstore.records.aggregation; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.basetypes.TestUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.documentstore.persistence.PersistenceExecutor; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.Record; import org.gcube.utils.StressTestUtility; import org.gcube.utils.TestOperation; import org.gcube.utils.TestUtility; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ public class AggregationSchedulerTest { private static final Logger logger = LoggerFactory.getLogger(AggregationSchedulerTest.class); @Before public void before() throws Exception{ SecurityTokenProvider.instance.set(TestUtility.TOKEN); } @After public void after() throws Exception{ SecurityTokenProvider.instance.reset(); } public static AggregationScheduler getAggregationScheduler(){ return AggregationScheduler.newInstance(); } public static PersistenceExecutor persistenceExecutor = new PersistenceExecutor(){ @Override public void persist(Record... records) throws Exception { for(Record record : records){ logger.debug("Storing : {}", record.toString()); } } }; protected void madeAssertion(Map> bufferedRecords, List types, int size, int count) { for(String type : types){ Assert.assertTrue(bufferedRecords.containsKey(type)); List records = bufferedRecords.get(type); Assert.assertTrue(records.size()==size); for(Record record : records){ Assert.assertTrue(record.getRecordType().compareTo(type)==0); Assert.assertTrue(record instanceof AggregatedRecord); @SuppressWarnings("rawtypes") AggregatedRecord aggregatedRecord = (AggregatedRecord) record; Assert.assertTrue(aggregatedRecord.getOperationCount()==count); } } } @Test public void stressTestAggregableURSingleType() throws Exception { final AggregationScheduler aggregationScheduler = getAggregationScheduler(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) throws Exception { UsageRecord usageRecord = TestUsageRecord.createTestServiceUsageRecord(); aggregationScheduler.aggregate(usageRecord, persistenceExecutor); } }); List types = new ArrayList(); String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType(); types.add(serviceUsageRecordtype); madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS); aggregationScheduler.flush(persistenceExecutor); } public static final String ALTERNATIVE_SERVICE_CLASS = "AlternativeServiceClass"; @Test public void stressTestDifferentAggregableURSingleType() throws Exception { final AggregationScheduler aggregationScheduler = getAggregationScheduler(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) throws Exception { ServiceUsageRecord usageRecord = TestUsageRecord.createTestServiceUsageRecord(); if(i%2==0){ usageRecord.setServiceClass(ALTERNATIVE_SERVICE_CLASS); } aggregationScheduler.aggregate(usageRecord, persistenceExecutor); } }); List types = new ArrayList(); String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType(); types.add(serviceUsageRecordtype); madeAssertion(aggregationScheduler.bufferedRecords, types, 2, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/2); aggregationScheduler.flush(persistenceExecutor); } @Test public void stressTestDifferentAggregableURTwoType() throws Exception { final AggregationScheduler aggregationScheduler = getAggregationScheduler(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) throws Exception { UsageRecord usageRecord; if(i%2==0){ usageRecord = TestUsageRecord.createTestServiceUsageRecord(); }else{ usageRecord = TestUsageRecord.createTestStorageUsageRecord(); } aggregationScheduler.aggregate(usageRecord, persistenceExecutor); } }); List types = new ArrayList(); String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecord().getRecordType(); String storageUsageRecordtype = TestUsageRecord.createTestStorageUsageRecord().getRecordType(); types.add(serviceUsageRecordtype); types.add(storageUsageRecordtype); madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/2); aggregationScheduler.flush(persistenceExecutor); } @Test public void stressTestDifferentAggregableURMultipleType() throws Exception { final AggregationScheduler aggregationScheduler = getAggregationScheduler(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) throws Exception { UsageRecord usageRecord; switch (i%3) { case 0: usageRecord = TestUsageRecord.createTestServiceUsageRecord(); break; case 1: usageRecord = TestUsageRecord.createTestStorageUsageRecord(); break; case 2: usageRecord = TestUsageRecord.createTestJobUsageRecord(); break; default: usageRecord = TestUsageRecord.createTestJobUsageRecord(); } aggregationScheduler.aggregate(usageRecord, persistenceExecutor); } }); /* List types = new ArrayList(); String serviceUsageRecordtype = TestUsageRecord.createTestServiceUsageRecordAutomaticScope().getRecordType(); String storageUsageRecordtype = TestUsageRecord.createTestStorageUsageRecordAutomaticScope().getRecordType(); types.add(serviceUsageRecordtype); types.add(storageUsageRecordtype); madeAssertion(aggregationScheduler.bufferedRecords, types, 1, StressTestUtility.DEFAULT_NUMBER_OF_RECORDS/4); */ aggregationScheduler.flush(persistenceExecutor); } }