From 623f6955e352e0f25ec041d748f7944575d23fdc Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Mon, 12 Oct 2015 16:03:39 +0000 Subject: [PATCH] Fixing problem on scopes git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@119630 82a268e6-3cf1-43bd-a215-b396298e98cf --- .classpath | 10 ++ .settings/org.eclipse.core.resources.prefs | 1 + .settings/org.eclipse.wst.common.component | 2 + .../persistence/AccountingPersistence.java | 134 ++------------- .../AccountingPersistenceBackend.java | 158 ++++++++++++++++++ .../AccountingPersistenceBackendFactory.java | 121 ++++++++++++++ .../AccountingPersistenceFactory.java | 110 +----------- .../persistence/FallbackPersistence.java | 2 +- .../AccountingPersistenceTest.java | 16 +- 9 files changed, 318 insertions(+), 236 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java create mode 100644 src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java diff --git a/.classpath b/.classpath index a673149..ca9d765 100644 --- a/.classpath +++ b/.classpath @@ -23,5 +23,15 @@ + + + + + + + + + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs index cdfe4f1..29abf99 100644 --- a/.settings/org.eclipse.core.resources.prefs +++ b/.settings/org.eclipse.core.resources.prefs @@ -1,5 +1,6 @@ eclipse.preferences.version=1 encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 encoding//src/test/java=UTF-8 encoding//src/test/resources=UTF-8 encoding/=UTF-8 diff --git a/.settings/org.eclipse.wst.common.component b/.settings/org.eclipse.wst.common.component index 91b68a6..da21cbb 100644 --- a/.settings/org.eclipse.wst.common.component +++ b/.settings/org.eclipse.wst.common.component @@ -1,5 +1,7 @@ + + diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java index c87b79b..2c26b9a 100644 --- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistence.java @@ -3,118 +3,27 @@ */ package org.gcube.accounting.persistence; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.gcube.accounting.aggregation.scheduler.AggregationScheduler; import org.gcube.accounting.datamodel.SingleUsageRecord; -import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.exception.InvalidValueException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * */ -public abstract class AccountingPersistence { - - private static final Logger logger = LoggerFactory.getLogger(AccountingPersistence.class); +public class AccountingPersistence { - protected FallbackPersistence fallback; - protected AggregationScheduler aggregationScheduler; + private static final AccountingPersistence accountingPersistence; - /** - * Pool for thread execution - */ - private ExecutorService pool; + private AccountingPersistence(){} - protected AccountingPersistence(){ - this.pool = Executors.newCachedThreadPool(); + static { + accountingPersistence = new AccountingPersistence(); } - protected AccountingPersistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){ - this.fallback = fallback; - this.aggregationScheduler = aggregationScheduler; - this.pool = Executors.newCachedThreadPool(); - } - - /** - * @param fallback the fallback to set - */ - protected void setFallback(FallbackPersistence fallback) { - this.fallback = fallback; - } - - /** - * @param aggregationScheduler the aggregationScheduler to set - */ - protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) { - this.aggregationScheduler = aggregationScheduler; - } - - /** - * Prepare the connection to persistence. - * This method must be used by implementation class to open - * the connection with the persistence storage, DB, file etc. - * @param configuration The configuration to create the connection - * @throws Exception if fails - */ - protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception; - - /** - * This method contains the code to save the {@link #UsageRecord} - * - */ - protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception; - - private void accountWithFallback(UsageRecord... usageRecords) { - String persistenceName = this.getClass().getSimpleName(); - for(UsageRecord usageRecord : usageRecords){ - try { - //logger.debug("Going to account {} using {}", usageRecord, persistenceName); - this.reallyAccount(usageRecord); - logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName); - } catch (Exception e) { - String fallabackPersistenceName = fallback.getClass().getSimpleName(); - try { - logger.error("{} was not accounted succesfully from {}. Trying to use {}.", - usageRecord.toString(), persistenceName, fallabackPersistenceName, e); - fallback.reallyAccount(usageRecord); - logger.debug("{} accounted succesfully from {}", - usageRecord.toString(), fallabackPersistenceName); - }catch(Exception ex){ - logger.error("{} was not accounted at all", usageRecord.toString(), e); - } - } - } - } - - - protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){ - try { - if(validate){ - usageRecord.validate(); - } - if(aggregate){ - final AccountingPersistence persistence = this; - aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){ - - @Override - public void persist(UsageRecord... usageRecords) throws Exception { - persistence.accountWithFallback(usageRecords); - } - - }); - }else{ - this.accountWithFallback(usageRecord); - } - - } catch (InvalidValueException e) { - logger.error("Error validating UsageRecord", e); - } catch (Exception e) { - logger.error("Error accounting UsageRecord", e); - } + protected static synchronized AccountingPersistence getInstance(){ + return accountingPersistence; } /** @@ -127,32 +36,15 @@ public abstract class AccountingPersistence { * @throws InvalidValueException if the Record Validation Fails */ public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{ - Runnable runnable = new Runnable(){ - @Override - public void run(){ - validateAccountAggregate(usageRecord, true, true); - } - }; - pool.execute(runnable); - + AccountingPersistenceBackendFactory.getPersistenceBackend().account(usageRecord); } public void flush(long timeout, TimeUnit timeUnit) throws Exception { - pool.awaitTermination(timeout, timeUnit); - - final AccountingPersistence persistence = this; - aggregationScheduler.flush(new AccountingPersistenceExecutor(){ - - @Override - public void persist(UsageRecord... usageRecords) throws Exception { - persistence.accountWithFallback(usageRecords); - } - - }); - + AccountingPersistenceBackendFactory.getPersistenceBackend().flush(timeout, timeUnit); } - - public abstract void close() throws Exception; + public void close() throws Exception{ + AccountingPersistenceBackendFactory.getPersistenceBackend().close(); + } } diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java new file mode 100644 index 0000000..b31a13a --- /dev/null +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java @@ -0,0 +1,158 @@ +/** + * + */ +package org.gcube.accounting.persistence; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.gcube.accounting.aggregation.scheduler.AggregationScheduler; +import org.gcube.accounting.datamodel.SingleUsageRecord; +import org.gcube.accounting.datamodel.UsageRecord; +import org.gcube.accounting.exception.InvalidValueException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public abstract class AccountingPersistenceBackend { + + private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackend.class); + + protected FallbackPersistence fallback; + protected AggregationScheduler aggregationScheduler; + + /** + * Pool for thread execution + */ + private ExecutorService pool; + + protected AccountingPersistenceBackend(){ + this.pool = Executors.newCachedThreadPool(); + } + + protected AccountingPersistenceBackend(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){ + this.fallback = fallback; + this.aggregationScheduler = aggregationScheduler; + this.pool = Executors.newCachedThreadPool(); + } + + /** + * @param fallback the fallback to set + */ + protected void setFallback(FallbackPersistence fallback) { + this.fallback = fallback; + } + + /** + * @param aggregationScheduler the aggregationScheduler to set + */ + protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) { + this.aggregationScheduler = aggregationScheduler; + } + + /** + * Prepare the connection to persistence. + * This method must be used by implementation class to open + * the connection with the persistence storage, DB, file etc. + * @param configuration The configuration to create the connection + * @throws Exception if fails + */ + protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception; + + /** + * This method contains the code to save the {@link #UsageRecord} + * + */ + protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception; + + private void accountWithFallback(UsageRecord... usageRecords) { + String persistenceName = this.getClass().getSimpleName(); + for(UsageRecord usageRecord : usageRecords){ + try { + //logger.debug("Going to account {} using {}", usageRecord, persistenceName); + this.reallyAccount(usageRecord); + logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName); + } catch (Exception e) { + String fallabackPersistenceName = fallback.getClass().getSimpleName(); + try { + logger.error("{} was not accounted succesfully from {}. Trying to use {}.", + usageRecord.toString(), persistenceName, fallabackPersistenceName, e); + fallback.reallyAccount(usageRecord); + logger.debug("{} accounted succesfully from {}", + usageRecord.toString(), fallabackPersistenceName); + }catch(Exception ex){ + logger.error("{} was not accounted at all", usageRecord.toString(), e); + } + } + } + } + + + protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){ + try { + if(validate){ + usageRecord.validate(); + } + if(aggregate){ + final AccountingPersistenceBackend persistence = this; + aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){ + + @Override + public void persist(UsageRecord... usageRecords) throws Exception { + persistence.accountWithFallback(usageRecords); + } + + }); + }else{ + this.accountWithFallback(usageRecord); + } + + } catch (InvalidValueException e) { + logger.error("Error validating UsageRecord", e); + } catch (Exception e) { + logger.error("Error accounting UsageRecord", e); + } + } + + /** + * Persist the {@link #UsageRecord}. + * The Record is validated first, then accounted, in a separated thread. + * So that the program can continue the execution. + * If the persistence fails the class write that the record in a local file + * so that the {@link #UsageRecord} can be recorder later. + * @param usageRecord the {@link #UsageRecord} to persist + * @throws InvalidValueException if the Record Validation Fails + */ + public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{ + Runnable runnable = new Runnable(){ + @Override + public void run(){ + validateAccountAggregate(usageRecord, true, true); + } + }; + pool.execute(runnable); + + } + + public void flush(long timeout, TimeUnit timeUnit) throws Exception { + pool.awaitTermination(timeout, timeUnit); + + final AccountingPersistenceBackend persistence = this; + aggregationScheduler.flush(new AccountingPersistenceExecutor(){ + + @Override + public void persist(UsageRecord... usageRecords) throws Exception { + persistence.accountWithFallback(usageRecords); + } + + }); + + } + + + public abstract void close() throws Exception; + +} diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java new file mode 100644 index 0000000..7550aed --- /dev/null +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java @@ -0,0 +1,121 @@ +/** + * + */ +package org.gcube.accounting.persistence; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +import org.gcube.accounting.aggregation.scheduler.AggregationScheduler; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.common.scope.impl.ScopeBean; +import org.gcube.common.scope.impl.ScopeBean.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public abstract class AccountingPersistenceBackendFactory { + + private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendFactory.class); + + public final static String HOME_SYSTEM_PROPERTY = "user.home"; + + private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log"; + + private static String fallbackLocation; + + private static Map persistencePersistenceBackends; + + static { + persistencePersistenceBackends = new HashMap(); + } + + private static File file(File file) throws IllegalArgumentException { + + if(!file.isDirectory()){ + file = file.getParentFile(); + } + + //create folder structure if not exist + if (!file.exists()) + file.mkdirs(); + + return file; + + } + + protected synchronized static void setFallbackLocation(String path){ + if(fallbackLocation == null){ + if(path==null){ + path = System.getProperty(HOME_SYSTEM_PROPERTY); + } + file(new File(path)); + fallbackLocation = path; + } + } + + protected static synchronized AccountingPersistenceBackend getPersistenceBackend() { + String scope = ScopeProvider.instance.get(); + if(scope==null){ + logger.error("No Scope available. FallbackPersistence will be used"); + File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME); + return new FallbackPersistence(fallbackFile); + } + + AccountingPersistenceBackend persistence = persistencePersistenceBackends.get(scope); + if(persistence==null){ + + ScopeBean bean = new ScopeBean(scope); + if(bean.is(Type.VRE)){ + bean = bean.enclosingScope(); + } + String name = bean.name(); + + File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME)); + FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile); + try { + ServiceLoader serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class); + for (AccountingPersistenceBackend foundPersistence : serviceLoader) { + if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){ + continue; + } + try { + String foundPersistenceClassName = foundPersistence.getClass().getSimpleName(); + logger.debug("Testing {}", foundPersistenceClassName); + AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName); + foundPersistence.prepareConnection(configuration); + /* + * Uncomment the following line of code if you want to try + * to create a test UsageRecord before setting the + * foundPersistence as default + * + * foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord()); + */ + persistence = foundPersistence; + logger.debug("{} will be used.", foundPersistenceClassName); + break; + } catch (Exception e) { + logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e); + } + } if(persistence==null){ + persistence = fallbackPersistence; + } + } catch(Exception e){ + logger.error("Unable to instance a Persistence Implementation. Using fallback as default", + e); + persistence = fallbackPersistence; + } + persistence.setAggregationScheduler(AggregationScheduler.getInstance()); + persistence.setFallback(fallbackPersistence); + persistencePersistenceBackends.put(scope, persistence); + } + + return persistence; + } + +} diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java index 50b46a8..b5375aa 100644 --- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceFactory.java @@ -3,120 +3,18 @@ */ package org.gcube.accounting.persistence; -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.ServiceLoader; - -import org.gcube.accounting.aggregation.scheduler.AggregationScheduler; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.common.scope.impl.ScopeBean; -import org.gcube.common.scope.impl.ScopeBean.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ * */ -public abstract class AccountingPersistenceFactory { - - private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceFactory.class); - - public final static String HOME_SYSTEM_PROPERTY = "user.home"; - - private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log"; - - private static String fallbackLocation; - - private static Map persistences; - - static { - persistences = new HashMap(); - } - - private static File file(File file) throws IllegalArgumentException { - - if(!file.isDirectory()){ - file = file.getParentFile(); - } +public class AccountingPersistenceFactory { - //create folder structure if not exist - if (!file.exists()) - file.mkdirs(); - - return file; - - } - - public synchronized static void setFallbackLocation(String path){ - if(fallbackLocation == null){ - if(path==null){ - path = System.getProperty(HOME_SYSTEM_PROPERTY); - } - file(new File(path)); - fallbackLocation = path; - } + public static void setFallbackLocation(String path){ + AccountingPersistenceBackendFactory.setFallbackLocation(path); } public static AccountingPersistence getPersistence() { - String scope = ScopeProvider.instance.get(); - if(scope==null){ - logger.error("No Scope available. FallbackPersistence will be used"); - File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME); - return new FallbackPersistence(fallbackFile); - } - - AccountingPersistence persistence = persistences.get(scope); - if(persistence==null){ - - ScopeBean bean = new ScopeBean(scope); - if(bean.is(Type.VRE)){ - bean = bean.enclosingScope(); - } - String name = bean.name(); - - File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME)); - FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile); - try { - ServiceLoader serviceLoader = ServiceLoader.load(AccountingPersistence.class); - for (AccountingPersistence foundPersistence : serviceLoader) { - if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){ - continue; - } - try { - String foundPersistenceClassName = foundPersistence.getClass().getSimpleName(); - logger.debug("Testing {}", foundPersistenceClassName); - AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName); - foundPersistence.prepareConnection(configuration); - /* - * Uncomment the following line of code if you want to try - * to create a test UsageRecord before setting the - * foundPersistence as default - * - * foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord()); - */ - persistence = foundPersistence; - logger.debug("{} will be used.", foundPersistenceClassName); - break; - } catch (Exception e) { - logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e); - } - } - if(persistence==null){ - persistence = fallbackPersistence; - } - } catch(Exception e){ - logger.error("Unable to instance a Persistence Implementation. Using fallback as default", - e); - persistence = fallbackPersistence; - } - persistence.setAggregationScheduler(AggregationScheduler.getInstance()); - persistence.setFallback(fallbackPersistence); - persistences.put(scope, persistence); - } - - return persistence; + return AccountingPersistence.getInstance(); } } diff --git a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java index aee68ba..48043f5 100644 --- a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java +++ b/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java @@ -15,7 +15,7 @@ import org.gcube.accounting.datamodel.UsageRecord; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ */ -public class FallbackPersistence extends AccountingPersistence { +public class FallbackPersistence extends AccountingPersistenceBackend { private File accountingFallbackFile; diff --git a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java index 9c9559e..6053083 100644 --- a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java +++ b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceTest.java @@ -26,16 +26,16 @@ public class AccountingPersistenceTest { public static final long timeout = 5000; public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS; - public static AccountingPersistence getPersistence(){ + public static AccountingPersistenceBackend getPersistence(){ ScopeProvider.instance.set(GCUBE_DEVNEXT_SCOPE); - AccountingPersistenceFactory.setFallbackLocation(null); - return AccountingPersistenceFactory.getPersistence(); + AccountingPersistenceBackendFactory.setFallbackLocation(null); + return AccountingPersistenceBackendFactory.getPersistenceBackend(); } @Test public void singleTestNoScope() throws Exception { - AccountingPersistenceFactory.setFallbackLocation(null); - final AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence(); + AccountingPersistenceBackendFactory.setFallbackLocation(null); + final AccountingPersistenceBackend persistence = AccountingPersistenceBackendFactory.getPersistenceBackend(); Assert.assertTrue(persistence instanceof FallbackPersistence); StressTestUtility.stressTest(new TestOperation() { @Override @@ -50,7 +50,7 @@ public class AccountingPersistenceTest { @Test public void singleTest() throws Exception { - final AccountingPersistence persistence = getPersistence(); + final AccountingPersistenceBackend persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) { @@ -64,7 +64,7 @@ public class AccountingPersistenceTest { @Test public void stressTestNoAggregation() throws Exception { - final AccountingPersistence persistence = getPersistence(); + final AccountingPersistenceBackend persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) { @@ -76,7 +76,7 @@ public class AccountingPersistenceTest { @Test public void stressTestWithAggregation() throws Exception { - final AccountingPersistence persistence = getPersistence(); + final AccountingPersistenceBackend persistence = getPersistence(); StressTestUtility.stressTest(new TestOperation() { @Override