Fixing problem on scopes
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@119630 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
78dc833d14
commit
623f6955e3
10
.classpath
10
.classpath
|
@ -23,5 +23,15 @@
|
||||||
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
|
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
|
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
<classpathentry kind="output" path="target/classes"/>
|
<classpathentry kind="output" path="target/classes"/>
|
||||||
</classpath>
|
</classpath>
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
eclipse.preferences.version=1
|
eclipse.preferences.version=1
|
||||||
encoding//src/main/java=UTF-8
|
encoding//src/main/java=UTF-8
|
||||||
|
encoding//src/main/resources=UTF-8
|
||||||
encoding//src/test/java=UTF-8
|
encoding//src/test/java=UTF-8
|
||||||
encoding//src/test/resources=UTF-8
|
encoding//src/test/resources=UTF-8
|
||||||
encoding/<project>=UTF-8
|
encoding/<project>=UTF-8
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?><project-modules id="moduleCoreId" project-version="1.5.0">
|
<?xml version="1.0" encoding="UTF-8"?><project-modules id="moduleCoreId" project-version="1.5.0">
|
||||||
<wb-module deploy-name="accounting-lib">
|
<wb-module deploy-name="accounting-lib">
|
||||||
<wb-resource deploy-path="/" source-path="/src/main/java"/>
|
<wb-resource deploy-path="/" source-path="/src/main/java"/>
|
||||||
|
<wb-resource deploy-path="/" source-path="/src/main/resources"/>
|
||||||
|
<wb-resource deploy-path="/" source-path="/src/test/resources"/>
|
||||||
</wb-module>
|
</wb-module>
|
||||||
</project-modules>
|
</project-modules>
|
||||||
|
|
|
@ -3,118 +3,27 @@
|
||||||
*/
|
*/
|
||||||
package org.gcube.accounting.persistence;
|
package org.gcube.accounting.persistence;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
|
||||||
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.UsageRecord;
|
|
||||||
import org.gcube.accounting.exception.InvalidValueException;
|
import org.gcube.accounting.exception.InvalidValueException;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*/
|
|
||||||
public abstract class AccountingPersistence {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistence.class);
|
|
||||||
|
|
||||||
protected FallbackPersistence fallback;
|
|
||||||
protected AggregationScheduler aggregationScheduler;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pool for thread execution
|
|
||||||
*/
|
|
||||||
private ExecutorService pool;
|
|
||||||
|
|
||||||
protected AccountingPersistence(){
|
|
||||||
this.pool = Executors.newCachedThreadPool();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected AccountingPersistence(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
|
|
||||||
this.fallback = fallback;
|
|
||||||
this.aggregationScheduler = aggregationScheduler;
|
|
||||||
this.pool = Executors.newCachedThreadPool();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param fallback the fallback to set
|
|
||||||
*/
|
|
||||||
protected void setFallback(FallbackPersistence fallback) {
|
|
||||||
this.fallback = fallback;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param aggregationScheduler the aggregationScheduler to set
|
|
||||||
*/
|
|
||||||
protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
|
|
||||||
this.aggregationScheduler = aggregationScheduler;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Prepare the connection to persistence.
|
|
||||||
* This method must be used by implementation class to open
|
|
||||||
* the connection with the persistence storage, DB, file etc.
|
|
||||||
* @param configuration The configuration to create the connection
|
|
||||||
* @throws Exception if fails
|
|
||||||
*/
|
|
||||||
protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method contains the code to save the {@link #UsageRecord}
|
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
|
public class AccountingPersistence {
|
||||||
|
|
||||||
private void accountWithFallback(UsageRecord... usageRecords) {
|
private static final AccountingPersistence accountingPersistence;
|
||||||
String persistenceName = this.getClass().getSimpleName();
|
|
||||||
for(UsageRecord usageRecord : usageRecords){
|
private AccountingPersistence(){}
|
||||||
try {
|
|
||||||
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
|
static {
|
||||||
this.reallyAccount(usageRecord);
|
accountingPersistence = new AccountingPersistence();
|
||||||
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
|
|
||||||
} catch (Exception e) {
|
|
||||||
String fallabackPersistenceName = fallback.getClass().getSimpleName();
|
|
||||||
try {
|
|
||||||
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
|
|
||||||
usageRecord.toString(), persistenceName, fallabackPersistenceName, e);
|
|
||||||
fallback.reallyAccount(usageRecord);
|
|
||||||
logger.debug("{} accounted succesfully from {}",
|
|
||||||
usageRecord.toString(), fallabackPersistenceName);
|
|
||||||
}catch(Exception ex){
|
|
||||||
logger.error("{} was not accounted at all", usageRecord.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static synchronized AccountingPersistence getInstance(){
|
||||||
protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
|
return accountingPersistence;
|
||||||
try {
|
|
||||||
if(validate){
|
|
||||||
usageRecord.validate();
|
|
||||||
}
|
|
||||||
if(aggregate){
|
|
||||||
final AccountingPersistence persistence = this;
|
|
||||||
aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void persist(UsageRecord... usageRecords) throws Exception {
|
|
||||||
persistence.accountWithFallback(usageRecords);
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
}else{
|
|
||||||
this.accountWithFallback(usageRecord);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (InvalidValueException e) {
|
|
||||||
logger.error("Error validating UsageRecord", e);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Error accounting UsageRecord", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -127,32 +36,15 @@ public abstract class AccountingPersistence {
|
||||||
* @throws InvalidValueException if the Record Validation Fails
|
* @throws InvalidValueException if the Record Validation Fails
|
||||||
*/
|
*/
|
||||||
public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
|
public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
|
||||||
Runnable runnable = new Runnable(){
|
AccountingPersistenceBackendFactory.getPersistenceBackend().account(usageRecord);
|
||||||
@Override
|
|
||||||
public void run(){
|
|
||||||
validateAccountAggregate(usageRecord, true, true);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
pool.execute(runnable);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
|
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
|
||||||
pool.awaitTermination(timeout, timeUnit);
|
AccountingPersistenceBackendFactory.getPersistenceBackend().flush(timeout, timeUnit);
|
||||||
|
|
||||||
final AccountingPersistence persistence = this;
|
|
||||||
aggregationScheduler.flush(new AccountingPersistenceExecutor(){
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void persist(UsageRecord... usageRecords) throws Exception {
|
|
||||||
persistence.accountWithFallback(usageRecords);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
public void close() throws Exception{
|
||||||
|
AccountingPersistenceBackendFactory.getPersistenceBackend().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public abstract void close() throws Exception;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.gcube.accounting.persistence;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
||||||
|
import org.gcube.accounting.datamodel.SingleUsageRecord;
|
||||||
|
import org.gcube.accounting.datamodel.UsageRecord;
|
||||||
|
import org.gcube.accounting.exception.InvalidValueException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
|
*/
|
||||||
|
public abstract class AccountingPersistenceBackend {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackend.class);
|
||||||
|
|
||||||
|
protected FallbackPersistence fallback;
|
||||||
|
protected AggregationScheduler aggregationScheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pool for thread execution
|
||||||
|
*/
|
||||||
|
private ExecutorService pool;
|
||||||
|
|
||||||
|
protected AccountingPersistenceBackend(){
|
||||||
|
this.pool = Executors.newCachedThreadPool();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AccountingPersistenceBackend(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
|
||||||
|
this.fallback = fallback;
|
||||||
|
this.aggregationScheduler = aggregationScheduler;
|
||||||
|
this.pool = Executors.newCachedThreadPool();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param fallback the fallback to set
|
||||||
|
*/
|
||||||
|
protected void setFallback(FallbackPersistence fallback) {
|
||||||
|
this.fallback = fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param aggregationScheduler the aggregationScheduler to set
|
||||||
|
*/
|
||||||
|
protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
|
||||||
|
this.aggregationScheduler = aggregationScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the connection to persistence.
|
||||||
|
* This method must be used by implementation class to open
|
||||||
|
* the connection with the persistence storage, DB, file etc.
|
||||||
|
* @param configuration The configuration to create the connection
|
||||||
|
* @throws Exception if fails
|
||||||
|
*/
|
||||||
|
protected abstract void prepareConnection(AccountingPersistenceConfiguration configuration) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method contains the code to save the {@link #UsageRecord}
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
|
||||||
|
|
||||||
|
private void accountWithFallback(UsageRecord... usageRecords) {
|
||||||
|
String persistenceName = this.getClass().getSimpleName();
|
||||||
|
for(UsageRecord usageRecord : usageRecords){
|
||||||
|
try {
|
||||||
|
//logger.debug("Going to account {} using {}", usageRecord, persistenceName);
|
||||||
|
this.reallyAccount(usageRecord);
|
||||||
|
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
String fallabackPersistenceName = fallback.getClass().getSimpleName();
|
||||||
|
try {
|
||||||
|
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
|
||||||
|
usageRecord.toString(), persistenceName, fallabackPersistenceName, e);
|
||||||
|
fallback.reallyAccount(usageRecord);
|
||||||
|
logger.debug("{} accounted succesfully from {}",
|
||||||
|
usageRecord.toString(), fallabackPersistenceName);
|
||||||
|
}catch(Exception ex){
|
||||||
|
logger.error("{} was not accounted at all", usageRecord.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void validateAccountAggregate(final SingleUsageRecord usageRecord, boolean validate, boolean aggregate){
|
||||||
|
try {
|
||||||
|
if(validate){
|
||||||
|
usageRecord.validate();
|
||||||
|
}
|
||||||
|
if(aggregate){
|
||||||
|
final AccountingPersistenceBackend persistence = this;
|
||||||
|
aggregationScheduler.aggregate(usageRecord, new AccountingPersistenceExecutor(){
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void persist(UsageRecord... usageRecords) throws Exception {
|
||||||
|
persistence.accountWithFallback(usageRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}else{
|
||||||
|
this.accountWithFallback(usageRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (InvalidValueException e) {
|
||||||
|
logger.error("Error validating UsageRecord", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error accounting UsageRecord", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Persist the {@link #UsageRecord}.
|
||||||
|
* The Record is validated first, then accounted, in a separated thread.
|
||||||
|
* So that the program can continue the execution.
|
||||||
|
* If the persistence fails the class write that the record in a local file
|
||||||
|
* so that the {@link #UsageRecord} can be recorder later.
|
||||||
|
* @param usageRecord the {@link #UsageRecord} to persist
|
||||||
|
* @throws InvalidValueException if the Record Validation Fails
|
||||||
|
*/
|
||||||
|
public void account(final SingleUsageRecord usageRecord) throws InvalidValueException{
|
||||||
|
Runnable runnable = new Runnable(){
|
||||||
|
@Override
|
||||||
|
public void run(){
|
||||||
|
validateAccountAggregate(usageRecord, true, true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
pool.execute(runnable);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
|
||||||
|
pool.awaitTermination(timeout, timeUnit);
|
||||||
|
|
||||||
|
final AccountingPersistenceBackend persistence = this;
|
||||||
|
aggregationScheduler.flush(new AccountingPersistenceExecutor(){
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void persist(UsageRecord... usageRecords) throws Exception {
|
||||||
|
persistence.accountWithFallback(usageRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public abstract void close() throws Exception;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.gcube.accounting.persistence;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.ServiceLoader;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
||||||
|
import org.gcube.common.scope.api.ScopeProvider;
|
||||||
|
import org.gcube.common.scope.impl.ScopeBean;
|
||||||
|
import org.gcube.common.scope.impl.ScopeBean.Type;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AccountingPersistenceBackendFactory {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendFactory.class);
|
||||||
|
|
||||||
|
public final static String HOME_SYSTEM_PROPERTY = "user.home";
|
||||||
|
|
||||||
|
private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
|
||||||
|
|
||||||
|
private static String fallbackLocation;
|
||||||
|
|
||||||
|
private static Map<String, AccountingPersistenceBackend> persistencePersistenceBackends;
|
||||||
|
|
||||||
|
static {
|
||||||
|
persistencePersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static File file(File file) throws IllegalArgumentException {
|
||||||
|
|
||||||
|
if(!file.isDirectory()){
|
||||||
|
file = file.getParentFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
//create folder structure if not exist
|
||||||
|
if (!file.exists())
|
||||||
|
file.mkdirs();
|
||||||
|
|
||||||
|
return file;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized static void setFallbackLocation(String path){
|
||||||
|
if(fallbackLocation == null){
|
||||||
|
if(path==null){
|
||||||
|
path = System.getProperty(HOME_SYSTEM_PROPERTY);
|
||||||
|
}
|
||||||
|
file(new File(path));
|
||||||
|
fallbackLocation = path;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static synchronized AccountingPersistenceBackend getPersistenceBackend() {
|
||||||
|
String scope = ScopeProvider.instance.get();
|
||||||
|
if(scope==null){
|
||||||
|
logger.error("No Scope available. FallbackPersistence will be used");
|
||||||
|
File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
|
||||||
|
return new FallbackPersistence(fallbackFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
AccountingPersistenceBackend persistence = persistencePersistenceBackends.get(scope);
|
||||||
|
if(persistence==null){
|
||||||
|
|
||||||
|
ScopeBean bean = new ScopeBean(scope);
|
||||||
|
if(bean.is(Type.VRE)){
|
||||||
|
bean = bean.enclosingScope();
|
||||||
|
}
|
||||||
|
String name = bean.name();
|
||||||
|
|
||||||
|
File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
|
||||||
|
FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
|
||||||
|
try {
|
||||||
|
ServiceLoader<AccountingPersistenceBackend> serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
|
||||||
|
for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
|
||||||
|
if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
|
||||||
|
logger.debug("Testing {}", foundPersistenceClassName);
|
||||||
|
AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
|
||||||
|
foundPersistence.prepareConnection(configuration);
|
||||||
|
/*
|
||||||
|
* Uncomment the following line of code if you want to try
|
||||||
|
* to create a test UsageRecord before setting the
|
||||||
|
* foundPersistence as default
|
||||||
|
*
|
||||||
|
* foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
|
||||||
|
*/
|
||||||
|
persistence = foundPersistence;
|
||||||
|
logger.debug("{} will be used.", foundPersistenceClassName);
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
|
||||||
|
}
|
||||||
|
} if(persistence==null){
|
||||||
|
persistence = fallbackPersistence;
|
||||||
|
}
|
||||||
|
} catch(Exception e){
|
||||||
|
logger.error("Unable to instance a Persistence Implementation. Using fallback as default",
|
||||||
|
e);
|
||||||
|
persistence = fallbackPersistence;
|
||||||
|
}
|
||||||
|
persistence.setAggregationScheduler(AggregationScheduler.getInstance());
|
||||||
|
persistence.setFallback(fallbackPersistence);
|
||||||
|
persistencePersistenceBackends.put(scope, persistence);
|
||||||
|
}
|
||||||
|
|
||||||
|
return persistence;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -3,120 +3,18 @@
|
||||||
*/
|
*/
|
||||||
package org.gcube.accounting.persistence;
|
package org.gcube.accounting.persistence;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.ServiceLoader;
|
|
||||||
|
|
||||||
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.common.scope.impl.ScopeBean;
|
|
||||||
import org.gcube.common.scope.impl.ScopeBean.Type;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class AccountingPersistenceFactory {
|
public class AccountingPersistenceFactory {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceFactory.class);
|
public static void setFallbackLocation(String path){
|
||||||
|
AccountingPersistenceBackendFactory.setFallbackLocation(path);
|
||||||
public final static String HOME_SYSTEM_PROPERTY = "user.home";
|
|
||||||
|
|
||||||
private static final String ACCOUTING_FALLBACK_FILENAME = "accountingFallback.log";
|
|
||||||
|
|
||||||
private static String fallbackLocation;
|
|
||||||
|
|
||||||
private static Map<String, AccountingPersistence> persistences;
|
|
||||||
|
|
||||||
static {
|
|
||||||
persistences = new HashMap<String, AccountingPersistence>();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static File file(File file) throws IllegalArgumentException {
|
|
||||||
|
|
||||||
if(!file.isDirectory()){
|
|
||||||
file = file.getParentFile();
|
|
||||||
}
|
|
||||||
|
|
||||||
//create folder structure if not exist
|
|
||||||
if (!file.exists())
|
|
||||||
file.mkdirs();
|
|
||||||
|
|
||||||
return file;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized static void setFallbackLocation(String path){
|
|
||||||
if(fallbackLocation == null){
|
|
||||||
if(path==null){
|
|
||||||
path = System.getProperty(HOME_SYSTEM_PROPERTY);
|
|
||||||
}
|
|
||||||
file(new File(path));
|
|
||||||
fallbackLocation = path;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AccountingPersistence getPersistence() {
|
public static AccountingPersistence getPersistence() {
|
||||||
String scope = ScopeProvider.instance.get();
|
return AccountingPersistence.getInstance();
|
||||||
if(scope==null){
|
|
||||||
logger.error("No Scope available. FallbackPersistence will be used");
|
|
||||||
File fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
|
|
||||||
return new FallbackPersistence(fallbackFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
AccountingPersistence persistence = persistences.get(scope);
|
|
||||||
if(persistence==null){
|
|
||||||
|
|
||||||
ScopeBean bean = new ScopeBean(scope);
|
|
||||||
if(bean.is(Type.VRE)){
|
|
||||||
bean = bean.enclosingScope();
|
|
||||||
}
|
|
||||||
String name = bean.name();
|
|
||||||
|
|
||||||
File fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
|
|
||||||
FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
|
|
||||||
try {
|
|
||||||
ServiceLoader<AccountingPersistence> serviceLoader = ServiceLoader.load(AccountingPersistence.class);
|
|
||||||
for (AccountingPersistence foundPersistence : serviceLoader) {
|
|
||||||
if(foundPersistence.getClass().isInstance(FallbackPersistence.class)){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
String foundPersistenceClassName = foundPersistence.getClass().getSimpleName();
|
|
||||||
logger.debug("Testing {}", foundPersistenceClassName);
|
|
||||||
AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration(foundPersistenceClassName);
|
|
||||||
foundPersistence.prepareConnection(configuration);
|
|
||||||
/*
|
|
||||||
* Uncomment the following line of code if you want to try
|
|
||||||
* to create a test UsageRecord before setting the
|
|
||||||
* foundPersistence as default
|
|
||||||
*
|
|
||||||
* foundPersistence.accountWithFallback(TestUsageRecord.createTestServiceUsageRecord());
|
|
||||||
*/
|
|
||||||
persistence = foundPersistence;
|
|
||||||
logger.debug("{} will be used.", foundPersistenceClassName);
|
|
||||||
break;
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundPersistence.getClass().getSimpleName()), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(persistence==null){
|
|
||||||
persistence = fallbackPersistence;
|
|
||||||
}
|
|
||||||
} catch(Exception e){
|
|
||||||
logger.error("Unable to instance a Persistence Implementation. Using fallback as default",
|
|
||||||
e);
|
|
||||||
persistence = fallbackPersistence;
|
|
||||||
}
|
|
||||||
persistence.setAggregationScheduler(AggregationScheduler.getInstance());
|
|
||||||
persistence.setFallback(fallbackPersistence);
|
|
||||||
persistences.put(scope, persistence);
|
|
||||||
}
|
|
||||||
|
|
||||||
return persistence;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ import org.gcube.accounting.datamodel.UsageRecord;
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
|
||||||
*/
|
*/
|
||||||
public class FallbackPersistence extends AccountingPersistence {
|
public class FallbackPersistence extends AccountingPersistenceBackend {
|
||||||
|
|
||||||
private File accountingFallbackFile;
|
private File accountingFallbackFile;
|
||||||
|
|
||||||
|
|
|
@ -26,16 +26,16 @@ public class AccountingPersistenceTest {
|
||||||
public static final long timeout = 5000;
|
public static final long timeout = 5000;
|
||||||
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
|
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
public static AccountingPersistence getPersistence(){
|
public static AccountingPersistenceBackend getPersistence(){
|
||||||
ScopeProvider.instance.set(GCUBE_DEVNEXT_SCOPE);
|
ScopeProvider.instance.set(GCUBE_DEVNEXT_SCOPE);
|
||||||
AccountingPersistenceFactory.setFallbackLocation(null);
|
AccountingPersistenceBackendFactory.setFallbackLocation(null);
|
||||||
return AccountingPersistenceFactory.getPersistence();
|
return AccountingPersistenceBackendFactory.getPersistenceBackend();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void singleTestNoScope() throws Exception {
|
public void singleTestNoScope() throws Exception {
|
||||||
AccountingPersistenceFactory.setFallbackLocation(null);
|
AccountingPersistenceBackendFactory.setFallbackLocation(null);
|
||||||
final AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence();
|
final AccountingPersistenceBackend persistence = AccountingPersistenceBackendFactory.getPersistenceBackend();
|
||||||
Assert.assertTrue(persistence instanceof FallbackPersistence);
|
Assert.assertTrue(persistence instanceof FallbackPersistence);
|
||||||
StressTestUtility.stressTest(new TestOperation() {
|
StressTestUtility.stressTest(new TestOperation() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -50,7 +50,7 @@ public class AccountingPersistenceTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void singleTest() throws Exception {
|
public void singleTest() throws Exception {
|
||||||
final AccountingPersistence persistence = getPersistence();
|
final AccountingPersistenceBackend persistence = getPersistence();
|
||||||
StressTestUtility.stressTest(new TestOperation() {
|
StressTestUtility.stressTest(new TestOperation() {
|
||||||
@Override
|
@Override
|
||||||
public void operate(int i) {
|
public void operate(int i) {
|
||||||
|
@ -64,7 +64,7 @@ public class AccountingPersistenceTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void stressTestNoAggregation() throws Exception {
|
public void stressTestNoAggregation() throws Exception {
|
||||||
final AccountingPersistence persistence = getPersistence();
|
final AccountingPersistenceBackend persistence = getPersistence();
|
||||||
StressTestUtility.stressTest(new TestOperation() {
|
StressTestUtility.stressTest(new TestOperation() {
|
||||||
@Override
|
@Override
|
||||||
public void operate(int i) {
|
public void operate(int i) {
|
||||||
|
@ -76,7 +76,7 @@ public class AccountingPersistenceTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void stressTestWithAggregation() throws Exception {
|
public void stressTestWithAggregation() throws Exception {
|
||||||
final AccountingPersistence persistence = getPersistence();
|
final AccountingPersistenceBackend persistence = getPersistence();
|
||||||
|
|
||||||
StressTestUtility.stressTest(new TestOperation() {
|
StressTestUtility.stressTest(new TestOperation() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue