diff --git a/.classpath b/.classpath
index a673149..ca9d765 100644
--- a/.classpath
+++ b/.classpath
@@ -23,5 +23,15 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs
index cdfe4f1..29abf99 100644
--- a/.settings/org.eclipse.core.resources.prefs
+++ b/.settings/org.eclipse.core.resources.prefs
@@ -1,5 +1,6 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
+encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/=UTF-8
diff --git a/.settings/org.eclipse.wst.common.component b/.settings/org.eclipse.wst.common.component
index 91b68a6..da21cbb 100644
--- a/.settings/org.eclipse.wst.common.component
+++ b/.settings/org.eclipse.wst.common.component
@@ -1,5 +1,7 @@
+
+
diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java
index c87b79b..2c26b9a 100644
--- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java
+++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java
@@ -3,118 +3,27 @@
*/
package org.gcube.accounting.persistence;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
import org.gcube.accounting.datamodel.SingleUsageRecord;
-import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
+ *
*/
-public abstract class AccountingPersistence {
-
- private static final Logger logger = LoggerFactory.getLogger(AccountingPersistence.class);
+public class AccountingPersistence {
- protected FallbackPersistence fallback;
- protected AggregationScheduler aggregationScheduler;
+ private static final AccountingPersistence accountingPersistence;
- /**
- * Pool for thread execution
- */
- private ExecutorService pool;
+ private AccountingPersistence(){}
- protected AccountingPersistence(){
- this.pool = Executors.newCachedThreadPool();
+ static {
+ accountingPersistence = new AccountingPersistence();
}
- protected AccountingPersistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
- this.fallback = fallback;
- this.aggregationScheduler = aggregationScheduler;
- this.pool = Executors.newCachedThreadPool();
- }
-
- /**
- * @param fallback the fallback to set
- */
- protected void setFallback(FallbackPersistence fallback) {
- this.fallback = fallback;
- }
-
- /**
- * @param aggregationScheduler the aggregationScheduler to set
- */
- protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
- this.aggregationScheduler = aggregationScheduler;
- }
-
- /**
- * Prepare the connection to persistence.
- * This method must be used by implementation class to open
- * the connection with the persistence storage, DB, file etc.
- * @param configuration The configuration to create the connection
- * @throws Exception if fails
- */
- protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception;
-
- /**
- * This method contains the code to save the {@link #UsageRecord}
- *
- */
- protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
-
- private void accountWithFallback(UsageRecord... usageRecords) {
- String persistenceName = this.getClass().getSimpleName();
- for(UsageRecord usageRecord : usageRecords){
- try {
- //logger.debug("Going to account {} using {}", usageRecord, persistenceName);
- this.reallyAccount(usageRecord);
- logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
- } catch (Exception e) {
- String fallabackPersistenceName = fallback.getClass().getSimpleName();
- try {
- logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
- usageRecord.toString(), persistenceName, fallabackPersistenceName, e);
- fallback.reallyAccount(usageRecord);
- logger.debug("{} accounted succesfully from {}",
- usageRecord.toString(), fallabackPersistenceName);
- }catch(Exception ex){
- logger.error("{} was not accounted at all", usageRecord.toString(), e);
- }
- }
- }
- }
-
-
- protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
- try {
- if(validate){
- usageRecord.validate();
- }
- if(aggregate){
- final AccountingPersistence persistence = this;
- aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){
-
- @Override
- public void persist(UsageRecord... usageRecords) throws Exception {
- persistence.accountWithFallback(usageRecords);
- }
-
- });
- }else{
- this.accountWithFallback(usageRecord);
- }
-
- } catch (InvalidValueException e) {
- logger.error("Error validating UsageRecord", e);
- } catch (Exception e) {
- logger.error("Error accounting UsageRecord", e);
- }
+ protected static synchronized AccountingPersistence getInstance(){
+ return accountingPersistence;
}
/**
@@ -127,32 +36,15 @@ public abstract class AccountingPersistence {
* @throws InvalidValueException if the Record Validation Fails
*/
public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
- Runnable runnable = new Runnable(){
- @Override
- public void run(){
- validateAccountAggregate(usageRecord, true, true);
- }
- };
- pool.execute(runnable);
-
+ AccountingPersistenceBackendFactory.getPersistenceBackend().account(usageRecord);
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
- pool.awaitTermination(timeout, timeUnit);
-
- final AccountingPersistence persistence = this;
- aggregationScheduler.flush(new AccountingPersistenceExecutor(){
-
- @Override
- public void persist(UsageRecord... usageRecords) throws Exception {
- persistence.accountWithFallback(usageRecords);
- }
-
- });
-
+ AccountingPersistenceBackendFactory.getPersistenceBackend().flush(timeout, timeUnit);
}
-
- public abstract void close() throws Exception;
+ public void close() throws Exception{
+ AccountingPersistenceBackendFactory.getPersistenceBackend().close();
+ }
}
diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java
new file mode 100644
index 0000000..b31a13a
--- /dev/null
+++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java
@@ -0,0 +1,158 @@
+/**
+ *
+ */
+package org.gcube.accounting.persistence;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
+import org.gcube.accounting.datamodel.SingleUsageRecord;
+import org.gcube.accounting.datamodel.UsageRecord;
+import org.gcube.accounting.exception.InvalidValueException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
+ */
+public abstract class AccountingPersistenceBackend {
+
+ private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackend.class);
+
+ protected FallbackPersistence fallback;
+ protected AggregationScheduler aggregationScheduler;
+
+ /**
+ * Pool for thread execution
+ */
+ private ExecutorService pool;
+
+ protected AccountingPersistenceBackend(){
+ this.pool = Executors.newCachedThreadPool();
+ }
+
+ protected AccountingPersistenceBackend(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
+ this.fallback = fallback;
+ this.aggregationScheduler = aggregationScheduler;
+ this.pool = Executors.newCachedThreadPool();
+ }
+
+ /**
+ * @param fallback the fallback to set
+ */
+ protected void setFallback(FallbackPersistence fallback) {
+ this.fallback = fallback;
+ }
+
+ /**
+ * @param aggregationScheduler the aggregationScheduler to set
+ */
+ protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
+ this.aggregationScheduler = aggregationScheduler;
+ }
+
+ /**
+ * Prepare the connection to persistence.
+ * This method must be used by implementation class to open
+ * the connection with the persistence storage, DB, file etc.
+ * @param configuration The configuration to create the connection
+ * @throws Exception if fails
+ */
+ protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception;
+
+ /**
+ * This method contains the code to save the {@link #UsageRecord}
+ *
+ */
+ protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
+
+ private void accountWithFallback(UsageRecord... usageRecords) {
+ String persistenceName = this.getClass().getSimpleName();
+ for(UsageRecord usageRecord : usageRecords){
+ try {
+ //logger.debug("Going to account {} using {}", usageRecord, persistenceName);
+ this.reallyAccount(usageRecord);
+ logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
+ } catch (Exception e) {
+ String fallabackPersistenceName = fallback.getClass().getSimpleName();
+ try {
+ logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
+ usageRecord.toString(), persistenceName, fallabackPersistenceName, e);
+ fallback.reallyAccount(usageRecord);
+ logger.debug("{} accounted succesfully from {}",
+ usageRecord.toString(), fallabackPersistenceName);
+ }catch(Exception ex){
+ logger.error("{} was not accounted at all", usageRecord.toString(), e);
+ }
+ }
+ }
+ }
+
+
+ protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
+ try {
+ if(validate){
+ usageRecord.validate();
+ }
+ if(aggregate){
+ final AccountingPersistenceBackend persistence = this;
+ aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){
+
+ @Override
+ public void persist(UsageRecord... usageRecords) throws Exception {
+ persistence.accountWithFallback(usageRecords);
+ }
+
+ });
+ }else{
+ this.accountWithFallback(usageRecord);
+ }
+
+ } catch (InvalidValueException e) {
+ logger.error("Error validating UsageRecord", e);
+ } catch (Exception e) {
+ logger.error("Error accounting UsageRecord", e);
+ }
+ }
+
+ /**
+ * Persist the {@link #UsageRecord}.
+ * The Record is validated first, then accounted, in a separated thread.
+ * So that the program can continue the execution.
+ * If the persistence fails the class write that the record in a local file
+ * so that the {@link #UsageRecord} can be recorder later.
+ * @param usageRecord the {@link #UsageRecord} to persist
+ * @throws InvalidValueException if the Record Validation Fails
+ */
+ public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
+ Runnable runnable = new Runnable(){
+ @Override
+ public void run(){
+ validateAccountAggregate(usageRecord, true, true);
+ }
+ };
+ pool.execute(runnable);
+
+ }
+
+ public void flush(long timeout, TimeUnit timeUnit) throws Exception {
+ pool.awaitTermination(timeout, timeUnit);
+
+ final AccountingPersistenceBackend persistence = this;
+ aggregationScheduler.flush(new AccountingPersistenceExecutor(){
+
+ @Override
+ public void persist(UsageRecord... usageRecords) throws Exception {
+ persistence.accountWithFallback(usageRecords);
+ }
+
+ });
+
+ }
+
+
+ public abstract void close() throws Exception;
+
+}
diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java
new file mode 100644
index 0000000..7550aed
--- /dev/null
+++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java
@@ -0,0 +1,121 @@
+/**
+ *
+ */
+package org.gcube.accounting.persistence;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
+import org.gcube.common.scope.api.ScopeProvider;
+import org.gcube.common.scope.impl.ScopeBean;
+import org.gcube.common.scope.impl.ScopeBean.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
+ *
+ */
+public abstract class AccountingPersistenceBackendFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendFactory.class);
+
+ public final static String HOME_SYSTEM_PROPERTY = "user.home";
+
+ private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
+
+ private static String fallbackLocation;
+
+ private static Map persistencePersistenceBackends;
+
+ static {
+ persistencePersistenceBackends = new HashMap();
+ }
+
+ private static File file(File file) throws IllegalArgumentException {
+
+ if(!file.isDirectory()){
+ file = file.getParentFile();
+ }
+
+ //create folder structure if not exist
+ if (!file.exists())
+ file.mkdirs();
+
+ return file;
+
+ }
+
+ protected synchronized static void setFallbackLocation(String path){
+ if(fallbackLocation == null){
+ if(path==null){
+ path = System.getProperty(HOME_SYSTEM_PROPERTY);
+ }
+ file(new File(path));
+ fallbackLocation = path;
+ }
+ }
+
+ protected static synchronized AccountingPersistenceBackend getPersistenceBackend() {
+ String scope = ScopeProvider.instance.get();
+ if(scope==null){
+ logger.error("No Scope available. FallbackPersistence will be used");
+ File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
+ return new FallbackPersistence(fallbackFile);
+ }
+
+ AccountingPersistenceBackend persistence = persistencePersistenceBackends.get(scope);
+ if(persistence==null){
+
+ ScopeBean bean = new ScopeBean(scope);
+ if(bean.is(Type.VRE)){
+ bean = bean.enclosingScope();
+ }
+ String name = bean.name();
+
+ File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
+ FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
+ try {
+ ServiceLoader serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
+ for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
+ if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){
+ continue;
+ }
+ try {
+ String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
+ logger.debug("Testing {}", foundPersistenceClassName);
+ AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
+ foundPersistence.prepareConnection(configuration);
+ /*
+ * Uncomment the following line of code if you want to try
+ * to create a test UsageRecord before setting the
+ * foundPersistence as default
+ *
+ * foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
+ */
+ persistence = foundPersistence;
+ logger.debug("{} will be used.", foundPersistenceClassName);
+ break;
+ } catch (Exception e) {
+ logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
+ }
+ } if(persistence==null){
+ persistence = fallbackPersistence;
+ }
+ } catch(Exception e){
+ logger.error("Unable to instance a Persistence Implementation. Using fallback as default",
+ e);
+ persistence = fallbackPersistence;
+ }
+ persistence.setAggregationScheduler(AggregationScheduler.getInstance());
+ persistence.setFallback(fallbackPersistence);
+ persistencePersistenceBackends.put(scope, persistence);
+ }
+
+ return persistence;
+ }
+
+}
diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java
index 50b46a8..b5375aa 100644
--- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java
+++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java
@@ -3,120 +3,18 @@
*/
package org.gcube.accounting.persistence;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
-import org.gcube.common.scope.api.ScopeProvider;
-import org.gcube.common.scope.impl.ScopeBean;
-import org.gcube.common.scope.impl.ScopeBean.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
-public abstract class AccountingPersistenceFactory {
-
- private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceFactory.class);
-
- public final static String HOME_SYSTEM_PROPERTY = "user.home";
-
- private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
-
- private static String fallbackLocation;
-
- private static Map persistences;
-
- static {
- persistences = new HashMap();
- }
-
- private static File file(File file) throws IllegalArgumentException {
-
- if(!file.isDirectory()){
- file = file.getParentFile();
- }
+public class AccountingPersistenceFactory {
- //create folder structure if not exist
- if (!file.exists())
- file.mkdirs();
-
- return file;
-
- }
-
- public synchronized static void setFallbackLocation(String path){
- if(fallbackLocation == null){
- if(path==null){
- path = System.getProperty(HOME_SYSTEM_PROPERTY);
- }
- file(new File(path));
- fallbackLocation = path;
- }
+ public static void setFallbackLocation(String path){
+ AccountingPersistenceBackendFactory.setFallbackLocation(path);
}
public static AccountingPersistence getPersistence() {
- String scope = ScopeProvider.instance.get();
- if(scope==null){
- logger.error("No Scope available. FallbackPersistence will be used");
- File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
- return new FallbackPersistence(fallbackFile);
- }
-
- AccountingPersistence persistence = persistences.get(scope);
- if(persistence==null){
-
- ScopeBean bean = new ScopeBean(scope);
- if(bean.is(Type.VRE)){
- bean = bean.enclosingScope();
- }
- String name = bean.name();
-
- File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
- FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
- try {
- ServiceLoader serviceLoader = ServiceLoader.load(AccountingPersistence.class);
- for (AccountingPersistence foundPersistence : serviceLoader) {
- if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){
- continue;
- }
- try {
- String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
- logger.debug("Testing {}", foundPersistenceClassName);
- AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
- foundPersistence.prepareConnection(configuration);
- /*
- * Uncomment the following line of code if you want to try
- * to create a test UsageRecord before setting the
- * foundPersistence as default
- *
- * foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
- */
- persistence = foundPersistence;
- logger.debug("{} will be used.", foundPersistenceClassName);
- break;
- } catch (Exception e) {
- logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
- }
- }
- if(persistence==null){
- persistence = fallbackPersistence;
- }
- } catch(Exception e){
- logger.error("Unable to instance a Persistence Implementation. Using fallback as default",
- e);
- persistence = fallbackPersistence;
- }
- persistence.setAggregationScheduler(AggregationScheduler.getInstance());
- persistence.setFallback(fallbackPersistence);
- persistences.put(scope, persistence);
- }
-
- return persistence;
+ return AccountingPersistence.getInstance();
}
}
diff --git a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java
index aee68ba..48043f5 100644
--- a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java
+++ b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java
@@ -15,7 +15,7 @@ import org.gcube.accounting.datamodel.UsageRecord;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
-public class FallbackPersistence extends AccountingPersistence {
+public class FallbackPersistence extends AccountingPersistenceBackend {
private File accountingFallbackFile;
diff --git a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java
index 9c9559e..6053083 100644
--- a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java
+++ b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java
@@ -26,16 +26,16 @@ public class AccountingPersistenceTest {
public static final long timeout = 5000;
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
- public static AccountingPersistence getPersistence(){
+ public static AccountingPersistenceBackend getPersistence(){
ScopeProvider.instance.set(GCUBE_DEVNEXT_SCOPE);
- AccountingPersistenceFactory.setFallbackLocation(null);
- return AccountingPersistenceFactory.getPersistence();
+ AccountingPersistenceBackendFactory.setFallbackLocation(null);
+ return AccountingPersistenceBackendFactory.getPersistenceBackend();
}
@Test
public void singleTestNoScope() throws Exception {
- AccountingPersistenceFactory.setFallbackLocation(null);
- final AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence();
+ AccountingPersistenceBackendFactory.setFallbackLocation(null);
+ final AccountingPersistenceBackend persistence = AccountingPersistenceBackendFactory.getPersistenceBackend();
Assert.assertTrue(persistence instanceof FallbackPersistence);
StressTestUtility.stressTest(new TestOperation() {
@Override
@@ -50,7 +50,7 @@ public class AccountingPersistenceTest {
@Test
public void singleTest() throws Exception {
- final AccountingPersistence persistence = getPersistence();
+ final AccountingPersistenceBackend persistence = getPersistence();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {
@@ -64,7 +64,7 @@ public class AccountingPersistenceTest {
@Test
public void stressTestNoAggregation() throws Exception {
- final AccountingPersistence persistence = getPersistence();
+ final AccountingPersistenceBackend persistence = getPersistence();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {
@@ -76,7 +76,7 @@ public class AccountingPersistenceTest {
@Test
public void stressTestWithAggregation() throws Exception {
- final AccountingPersistence persistence = getPersistence();
+ final AccountingPersistenceBackend persistence = getPersistence();
StressTestUtility.stressTest(new TestOperation() {
@Override