diff --git a/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java b/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java index b31c6dc..4ecd7fe 100644 --- a/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java +++ b/src/main/java/org/gcube/accounting/aggregation/scheduler/AggregationScheduler.java @@ -12,7 +12,7 @@ import org.gcube.accounting.datamodel.AggregationStrategy; import org.gcube.accounting.datamodel.SingleUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.exception.NotAggregatableRecordsExceptions; -import org.gcube.accounting.persistence.PersistenceExecutor; +import org.gcube.accounting.persistence.AccountingPersistenceExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,11 +158,20 @@ public abstract class AggregationScheduler { } - public void flush(PersistenceExecutor persistenceExecutor) throws Exception{ + public void flush(AccountingPersistenceExecutor persistenceExecutor) throws Exception{ aggregate(null, persistenceExecutor, true); } - protected synchronized void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception { + protected abstract void specificClear(); + + protected void clear(){ + totalBufferedRecords=0; + records.clear(); + unaggregableRecords.clear(); + specificClear(); + } + + protected synchronized void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception { if(usageRecord!=null){ madeAggregation(usageRecord); } @@ -185,12 +194,12 @@ public abstract class AggregationScheduler { persistenceExecutor.persist(recordToPersist); - totalBufferedRecords=0; - records.clear(); - unaggregableRecords.clear(); + clear(); } } + + /** * Get an usage records and try to aggregate with other buffered * Usage Record. @@ -198,7 +207,7 @@ public abstract class AggregationScheduler { * @return true if is time to persist the buffered Usage Record * @throws Exception if fails */ - public void aggregate(SingleUsageRecord usageRecord, PersistenceExecutor persistenceExecutor) throws Exception { + public void aggregate(SingleUsageRecord usageRecord, AccountingPersistenceExecutor persistenceExecutor) throws Exception { aggregate(usageRecord, persistenceExecutor, false); } diff --git a/src/main/java/org/gcube/accounting/aggregation/scheduler/BufferAggregationScheduler.java b/src/main/java/org/gcube/accounting/aggregation/scheduler/BufferAggregationScheduler.java index 692dfc9..f9c0bf4 100644 --- a/src/main/java/org/gcube/accounting/aggregation/scheduler/BufferAggregationScheduler.java +++ b/src/main/java/org/gcube/accounting/aggregation/scheduler/BufferAggregationScheduler.java @@ -36,6 +36,12 @@ public class BufferAggregationScheduler extends AggregationScheduler { this.firstOfBuffer = true; } + @Override + protected void specificClear(){ + firstOfBuffer = true; + } + + /** * {@inheritDoc} */ diff --git a/src/main/java/org/gcube/accounting/messaging/ResourceAccounting.java b/src/main/java/org/gcube/accounting/messaging/ResourceAccounting.java index c33d3c1..9137eb8 100644 --- a/src/main/java/org/gcube/accounting/messaging/ResourceAccounting.java +++ b/src/main/java/org/gcube/accounting/messaging/ResourceAccounting.java @@ -2,8 +2,8 @@ package org.gcube.accounting.messaging; import org.gcube.accounting.datamodel.RawUsageRecord; import org.gcube.accounting.exception.InvalidValueException; -import org.gcube.accounting.persistence.Persistence; -import org.gcube.accounting.persistence.PersistenceFactory; +import org.gcube.accounting.persistence.AccountingPersistence; +import org.gcube.accounting.persistence.AccountingPersistenceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,11 +19,11 @@ public class ResourceAccounting { private static final Logger logger = LoggerFactory.getLogger(ResourceAccounting.class); @Deprecated - protected Persistence persistence; + protected AccountingPersistence persistence; @Deprecated public ResourceAccounting() { - persistence = PersistenceFactory.getPersistence(); + persistence = AccountingPersistenceFactory.getPersistence(); } @Deprecated diff --git a/src/main/java/org/gcube/accounting/persistence/Persistence.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java similarity index 87% rename from src/main/java/org/gcube/accounting/persistence/Persistence.java rename to src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java index f1d29fc..4939b18 100644 --- a/src/main/java/org/gcube/accounting/persistence/Persistence.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java @@ -16,9 +16,9 @@ import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ */ -public abstract class Persistence { +public abstract class AccountingPersistence { - private static final Logger logger = LoggerFactory.getLogger(Persistence.class); + private static final Logger logger = LoggerFactory.getLogger(AccountingPersistence.class); protected FallbackPersistence fallback; protected AggregationScheduler aggregationScheduler; @@ -28,11 +28,11 @@ public abstract class Persistence { */ private ExecutorService pool; - protected Persistence(){ + protected AccountingPersistence(){ this.pool = Executors.newCachedThreadPool(); } - protected Persistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){ + protected AccountingPersistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){ this.fallback = fallback; this.aggregationScheduler = aggregationScheduler; this.pool = Executors.newCachedThreadPool(); @@ -59,7 +59,7 @@ public abstract class Persistence { * @param configuration The configuration to create the connection * @throws Exception if fails */ - protected abstract void prepareConnection(PersistenceConfiguration configuration) throws Exception; + protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception; /* * * Prepare the connection and try to write a test record on default @@ -108,8 +108,8 @@ public abstract class Persistence { usageRecord.validate(); } if(aggregate){ - final Persistence persistence = this; - aggregationScheduler.aggregate(usageRecord, new PersistenceExecutor(){ + final AccountingPersistence persistence = this; + aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){ @Override public void persist(UsageRecord... usageRecords) throws Exception { @@ -149,8 +149,8 @@ public abstract class Persistence { } public void flush() throws Exception { - final Persistence persistence = this; - aggregationScheduler.flush(new PersistenceExecutor(){ + final AccountingPersistence persistence = this; + aggregationScheduler.flush(new AccountingPersistenceExecutor(){ @Override public void persist(UsageRecord... usageRecords) throws Exception { diff --git a/src/main/java/org/gcube/accounting/persistence/PersistenceConfiguration.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceConfiguration.java similarity index 87% rename from src/main/java/org/gcube/accounting/persistence/PersistenceConfiguration.java rename to src/main/java/org/gcube/accounting/persistence/AccountingPersistenceConfiguration.java index 05dd324..b6996b2 100644 --- a/src/main/java/org/gcube/accounting/persistence/PersistenceConfiguration.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceConfiguration.java @@ -21,7 +21,7 @@ import org.gcube.resources.discovery.icclient.ICFactory; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ */ -public class PersistenceConfiguration { +public class AccountingPersistenceConfiguration { protected final static String SERVICE_ENDPOINT_CATEGORY = "Accounting"; protected final static String SERVICE_ENDPOINT_NAME = "Persistence"; @@ -34,11 +34,11 @@ public class PersistenceConfiguration { protected Map propertyMap; - public PersistenceConfiguration(){ + public AccountingPersistenceConfiguration(){ propertyMap = new HashMap(); } - public PersistenceConfiguration(URI uri, String username, String password){ + public AccountingPersistenceConfiguration(URI uri, String username, String password){ this.uri = uri; this.username = username; this.password = password; @@ -117,8 +117,8 @@ public class PersistenceConfiguration { return StringEncrypter.getEncrypter().decrypt(encrypted); } - protected static PersistenceConfiguration createPersistenceConfiguration(ServiceEndpoint serviceEndpoint) throws Exception{ - PersistenceConfiguration persistenceConfiguration = new PersistenceConfiguration(); + protected static AccountingPersistenceConfiguration createPersistenceConfiguration(ServiceEndpoint serviceEndpoint) throws Exception{ + AccountingPersistenceConfiguration persistenceConfiguration = new AccountingPersistenceConfiguration(); Group accessPoints = serviceEndpoint.profile().accessPoints(); for(AccessPoint accessPoint : accessPoints){ persistenceConfiguration.uri = new URI(accessPoint.address()); @@ -140,7 +140,7 @@ public class PersistenceConfiguration { * @return * @throws Exception */ - public static PersistenceConfiguration getPersistenceConfiguration(String persistenceClassName) throws Exception { + public static AccountingPersistenceConfiguration getPersistenceConfiguration(String persistenceClassName) throws Exception { ServiceEndpoint serviceEndpoint = getServiceEndpoint(persistenceClassName); return createPersistenceConfiguration(serviceEndpoint); } diff --git a/src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceExecutor.java similarity index 83% rename from src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java rename to src/main/java/org/gcube/accounting/persistence/AccountingPersistenceExecutor.java index 95d1552..ad691c8 100644 --- a/src/main/java/org/gcube/accounting/persistence/PersistenceExecutor.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceExecutor.java @@ -9,7 +9,7 @@ import org.gcube.accounting.datamodel.UsageRecord; * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ -public interface PersistenceExecutor { +public interface AccountingPersistenceExecutor { public void persist(UsageRecord... usageRecords)throws Exception; diff --git a/src/main/java/org/gcube/accounting/persistence/PersistenceFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java similarity index 81% rename from src/main/java/org/gcube/accounting/persistence/PersistenceFactory.java rename to src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java index bea8602..cf83031 100644 --- a/src/main/java/org/gcube/accounting/persistence/PersistenceFactory.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java @@ -19,9 +19,9 @@ import org.slf4j.LoggerFactory; * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ -public abstract class PersistenceFactory { +public abstract class AccountingPersistenceFactory { - private static final Logger logger = LoggerFactory.getLogger(PersistenceFactory.class); + private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceFactory.class); public final static String HOME_SYSTEM_PROPERTY = "user.home"; @@ -29,10 +29,10 @@ public abstract class PersistenceFactory { private static String fallbackLocation; - private static Map persistences; + private static Map persistences; static { - persistences = new HashMap(); + persistences = new HashMap(); } private static File file(File file) throws IllegalArgumentException { @@ -59,7 +59,7 @@ public abstract class PersistenceFactory { } } - public static Persistence getPersistence() { + public static AccountingPersistence getPersistence() { String scope = ScopeProvider.instance.get(); if(scope==null){ logger.error("No Scope available. FallbackPersistence will be used"); @@ -67,7 +67,7 @@ public abstract class PersistenceFactory { return new FallbackPersistence(fallbackFile); } - Persistence persistence = persistences.get(scope); + AccountingPersistence persistence = persistences.get(scope); if(persistence==null){ ScopeBean bean = new ScopeBean(scope); @@ -79,15 +79,15 @@ public abstract class PersistenceFactory { File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME)); FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile); try { - ServiceLoader serviceLoader = ServiceLoader.load(Persistence.class); - for (Persistence foundPersistence : serviceLoader) { + ServiceLoader serviceLoader = ServiceLoader.load(AccountingPersistence.class); + for (AccountingPersistence foundPersistence : serviceLoader) { if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){ continue; } try { String foundPersistenceClassName = foundPersistence.getClass().getSimpleName(); logger.debug("Testing {}", foundPersistenceClassName); - PersistenceConfiguration configuration = PersistenceConfiguration.getPersistenceConfiguration(foundPersistenceClassName); + AccountingPersistenceConfiguration configuration = AccountingPersistenceConfiguration.getPersistenceConfiguration(foundPersistenceClassName); foundPersistence.prepareConnection(configuration); /* * Uncomment the following line of code if you want to try diff --git a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java index 3a8b21d..aee68ba 100644 --- a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java +++ b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java @@ -15,7 +15,7 @@ import org.gcube.accounting.datamodel.UsageRecord; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ */ -public class FallbackPersistence extends Persistence { +public class FallbackPersistence extends AccountingPersistence { private File accountingFallbackFile; @@ -28,7 +28,7 @@ public class FallbackPersistence extends Persistence { * {@inheritDoc} */ @Override - public void prepareConnection(PersistenceConfiguration configuration) { + public void prepareConnection(AccountingPersistenceConfiguration configuration) { // Nothing TO DO } diff --git a/src/test/java/org/gcube/accounting/aggregation/scheduler/AggregationSchedulerTest.java b/src/test/java/org/gcube/accounting/aggregation/scheduler/AggregationSchedulerTest.java index fdf170b..284a015 100644 --- a/src/test/java/org/gcube/accounting/aggregation/scheduler/AggregationSchedulerTest.java +++ b/src/test/java/org/gcube/accounting/aggregation/scheduler/AggregationSchedulerTest.java @@ -7,7 +7,7 @@ import org.gcube.accounting.datamodel.SingleUsageRecord; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.basetypes.TestUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; -import org.gcube.accounting.persistence.PersistenceExecutor; +import org.gcube.accounting.persistence.AccountingPersistenceExecutor; import org.gcube.accounting.testutility.StressTestUtility; import org.gcube.accounting.testutility.TestOperation; import org.junit.Test; @@ -26,7 +26,7 @@ public class AggregationSchedulerTest { return AggregationScheduler.getInstance(); } - public static PersistenceExecutor persistenceExecutor = new PersistenceExecutor(){ + public static AccountingPersistenceExecutor persistenceExecutor = new AccountingPersistenceExecutor(){ @Override public void persist(UsageRecord... usageRecords) throws Exception { diff --git a/src/test/java/org/gcube/accounting/persistence/PersistenceConfigurationTest.java b/src/test/java/org/gcube/accounting/persistence/PersistenceConfigurationTest.java index 06361e3..a1cafd6 100644 --- a/src/test/java/org/gcube/accounting/persistence/PersistenceConfigurationTest.java +++ b/src/test/java/org/gcube/accounting/persistence/PersistenceConfigurationTest.java @@ -117,8 +117,8 @@ public class PersistenceConfigurationTest { logger.debug("Creating ServiceEndpoint to publish on IS available plugins and their own supported capabilities"); ServiceEndpoint serviceEndpoint = new ServiceEndpoint(); Profile profile = serviceEndpoint.newProfile(); - profile.category(PersistenceConfiguration.SERVICE_ENDPOINT_CATEGORY); - profile.name(PersistenceConfiguration.SERVICE_ENDPOINT_NAME); + profile.category(AccountingPersistenceConfiguration.SERVICE_ENDPOINT_CATEGORY); + profile.name(AccountingPersistenceConfiguration.SERVICE_ENDPOINT_NAME); profile.version(TEST_VERSION); profile.description(PROFILE_DESCRIPTION); @@ -148,7 +148,7 @@ public class PersistenceConfigurationTest { Group properties = accessPointElement.properties(); Property className = new Property(); - className.nameAndValue(PersistenceConfiguration.PERSISTENCE_CLASS_NAME, COUCHDB_CLASS_NAME); + className.nameAndValue(AccountingPersistenceConfiguration.PERSISTENCE_CLASS_NAME, COUCHDB_CLASS_NAME); properties.add(className); Property dbName = new Property(); @@ -166,8 +166,8 @@ public class PersistenceConfigurationTest { ScopeProvider.instance.set(GCUBE_DEVNEXT_SCOPE); SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class) - .addCondition(String.format("$resource/Profile/Category/text() eq '%s'", PersistenceConfiguration.SERVICE_ENDPOINT_CATEGORY)) - .addCondition(String.format("$resource/Profile/Name/text() eq '%s'", PersistenceConfiguration.SERVICE_ENDPOINT_NAME)) + .addCondition(String.format("$resource/Profile/Category/text() eq '%s'", AccountingPersistenceConfiguration.SERVICE_ENDPOINT_CATEGORY)) + .addCondition(String.format("$resource/Profile/Name/text() eq '%s'", AccountingPersistenceConfiguration.SERVICE_ENDPOINT_NAME)) .addCondition(String.format("$resource/Profile/RunTime/HostedOn/text() eq '%s'", RUNNING_ON)) .setResult("$resource"); @@ -198,7 +198,7 @@ public class PersistenceConfigurationTest { } try { - PersistenceConfiguration persitenceConfiguration = PersistenceConfiguration.getPersistenceConfiguration(COUCHDB_CLASS_NAME); + AccountingPersistenceConfiguration persitenceConfiguration = AccountingPersistenceConfiguration.getPersistenceConfiguration(COUCHDB_CLASS_NAME); if(createResource){ Assert.assertTrue(persitenceConfiguration.getUri().toURL().equals(new URL(RUNNING_ON))); Assert.assertTrue(persitenceConfiguration.getUsername().compareTo(FAKE_USERNAME)==0); diff --git a/src/test/java/org/gcube/accounting/persistence/PersistenceTest.java b/src/test/java/org/gcube/accounting/persistence/PersistenceTest.java index d916325..da96d34 100644 --- a/src/test/java/org/gcube/accounting/persistence/PersistenceTest.java +++ b/src/test/java/org/gcube/accounting/persistence/PersistenceTest.java @@ -17,16 +17,16 @@ import org.junit.Test; */ public class PersistenceTest { - public static Persistence getPersistence(){ + public static AccountingPersistence getPersistence(){ ScopeProvider.instance.set(PersistenceConfigurationTest.GCUBE_DEVNEXT_SCOPE); - PersistenceFactory.setFallbackLocation(null); - return PersistenceFactory.getPersistence(); + AccountingPersistenceFactory.setFallbackLocation(null); + return AccountingPersistenceFactory.getPersistence(); } @Test public void singleTestNoScope() throws Exception { - PersistenceFactory.setFallbackLocation(null); - final Persistence persistence = PersistenceFactory.getPersistence(); + AccountingPersistenceFactory.setFallbackLocation(null); + final AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence(); Assert.assertTrue(persistence instanceof FallbackPersistence); StressTestUtility.stressTest(new TestOperation() { @Override @@ -41,7 +41,7 @@ public class PersistenceTest { @Test public void singleTest() throws Exception { - final Persistence persistence = getPersistence(); + final AccountingPersistence persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) { @@ -55,7 +55,7 @@ public class PersistenceTest { @Test public void stressTestNoAggregation() throws Exception { - final Persistence persistence = getPersistence(); + final AccountingPersistence persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) { @@ -67,7 +67,7 @@ public class PersistenceTest { @Test public void stressTestWithAggregation() throws Exception { - final Persistence persistence = getPersistence(); + final AccountingPersistence persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override