Added isConnectionActive() abstract method in PersistenceBackend

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@150548 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-06-26 15:46:49 +00:00
parent c29985f35b
commit 51720268b2
5 changed files with 112 additions and 106 deletions

View File

@ -1,7 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE xml>
<ReleaseNotes>
<Changeset component="org.gcube.data-publishing.document-store-lib.2-0-0" date="${buildDate}">
<Changeset component="org.gcube.data-publishing.document-store-lib.2-1-0" date="${buildDate}">
<Change>Added abstract method isConnectionActive() in PersistenceBackend</Change>
</Changeset>
<Changeset component="org.gcube.data-publishing.document-store-lib.2-0-0" date="2017-06-07">
<Change>Added Jackson support on Usage Record model to allow to use it for marshalling and unmarshalling</Change>
<Change>Marsahlling and unmarshalling use Jackson</Change>
</Changeset>

View File

@ -9,7 +9,7 @@
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<name>Document Store Lib</name>
<description>Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically.

View File

@ -93,7 +93,10 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
}
@Override
public boolean isConnectionActive() throws Exception{
return true;
}
}

View File

@ -26,27 +26,24 @@ public abstract class PersistenceBackend {
protected PersistenceBackendMonitor persistenceBackendMonitor;
public static final Integer MAX_FALLBACK = 3; //max fallback with reload a configuration
public static final Integer MAX_FALLBACK = 3; // max fallback with reload a
// configuration
protected static Integer countFallback;
//add to control a timeout execption
protected boolean timeoutFallback=false;
// add to control a timeout execption
protected boolean timeoutFallback = false;
protected long timerToFallback;
public static final long MAX_TIME_TO_FALLBACK=1000*60*60; //60 min;
public static final long MAX_TIME_TO_FALLBACK = 1000 * 60 * 60; // 60 min;
protected boolean closed;
protected PersistenceBackend(){
if(!(this instanceof FallbackPersistenceBackend)){
protected PersistenceBackend() {
if (!(this instanceof FallbackPersistenceBackend)) {
this.persistenceBackendMonitor = new PersistenceBackendMonitor(this);
}
countFallback=0;
closed = true;
countFallback = 0;
}
protected PersistenceBackend(FallbackPersistenceBackend fallback){
this();
protected PersistenceBackend(FallbackPersistenceBackend fallback) {
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this), "FALLBACK");
@ -60,7 +57,8 @@ public abstract class PersistenceBackend {
}
/**
* @param fallback the fallback to set
* @param fallback
* the fallback to set
*/
protected void setFallback(FallbackPersistenceBackend fallback) {
this.fallbackPersistence = fallback;
@ -74,113 +72,127 @@ public abstract class PersistenceBackend {
}
/**
* @param aggregationScheduler the aggregationScheduler to set
* @param aggregationScheduler
* the aggregationScheduler to set
*/
protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
this.aggregationScheduler = aggregationScheduler;
}
/**
* Prepare the connection to persistence.
* This method must be used by implementation class to prepare
* the connection with the persistence storage, DB, file etc.
* @param configuration The configuration to create the connection
* @throws Exception if fails
* Prepare the connection to persistence. This method must be used by
* implementation class to prepare the connection with the persistence
* storage, DB, file etc.
*
* @param configuration
* The configuration to create the connection
* @throws Exception
* if fails
*/
protected abstract void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception;
/**
* This method is used to open db connection
* This method is used to open db connection
*
* @throws Exception
*/
protected abstract void openConnection() throws Exception ;
protected abstract void openConnection() throws Exception;
/**
* This method is used to open db connection
* This method is used to open db connection
*
* @throws Exception
*/
protected abstract void closeConnection() throws Exception ;
protected abstract void closeConnection() throws Exception;
/**
* This method is used to close
* This method is used to close
*
* @throws Exception
*/
public abstract void close() throws Exception;
/**
* This method is used to close db and clean the configuration.
* It is used when there are too much TimeoutException trying to persist
* records.
* This method is used to close db and clean the configuration. It is used
* when there are too much TimeoutException trying to persist records.
*
* @throws Exception
*/
protected void closeAndClean() throws Exception {}
;
protected void closeAndClean() throws Exception {};
/**
* Check the Connection state
* @return true if the connection is active, false otherwise
* @throws Exception
*/
public abstract boolean isConnectionActive() throws Exception;
/**
* This method contains the code to save the {@link Record}
*
*/
protected abstract void reallyAccount(Record record) throws Exception;
/***
*
* @param records
* @throws Exception
* @throws Exception
*/
protected void accountWithFallback(Record... records) throws Exception {
String persistenceName = this.getClass().getSimpleName();
this.openConnection();
for(Record record : records){
for (Record record : records) {
try {
//old code
//this.reallyAccount(record);
// old code
// this.reallyAccount(record);
long now = Calendar.getInstance().getTimeInMillis();
if((timeoutFallback)){
if ((timeoutFallback)) {
fallbackPersistence.reallyAccount(record);
logger.trace("accountWithFallback for timeout, now:{} and timerToFallback:{}", now,timerToFallback );
if ((now - timerToFallback) > MAX_TIME_TO_FALLBACK){
logger.trace("accountWithFallback for timeout, now:{} and timerToFallback:{}", now,
timerToFallback);
if ((now - timerToFallback) > MAX_TIME_TO_FALLBACK) {
logger.debug("accountWithFallback MAX_TIME_TO_FALLBACK is conclused");
timeoutFallback=false;
timerToFallback=0;
timeoutFallback = false;
timerToFallback = 0;
}
}
else{
} else {
this.reallyAccount(record);
logger.trace("accountWithFallback {} accounted succesfully from {}.", record.toString(), persistenceName);
timeoutFallback=false;
timerToFallback=0;
logger.trace("accountWithFallback {} accounted succesfully from {}.", record.toString(),
persistenceName);
timeoutFallback = false;
timerToFallback = 0;
}
} catch (Exception e) {
// TODO Insert Renew HERE
if((! (this instanceof FallbackPersistenceBackend)) && e.getCause()!=null && e.getCause() instanceof TimeoutException){
if ((!(this instanceof FallbackPersistenceBackend)) && e.getCause() != null
&& e.getCause() instanceof TimeoutException) {
logger.warn("accountWithFallback TimeoutException number:{} to {}.", countFallback, MAX_FALLBACK);
countFallback++;
if (countFallback.equals(MAX_FALLBACK)){
timeoutFallback=true;
countFallback++;
if (countFallback.equals(MAX_FALLBACK)) {
timeoutFallback = true;
timerToFallback = Calendar.getInstance().getTimeInMillis();
logger.trace("accountWithFallback Going to account {} in fallback for too many timeout", record);
//old code
//PersistenceBackendFactory.renew(this);
countFallback=0;
//disconnect
logger.trace("accountWithFallback Going to account {} in fallback for too many timeout",
record);
// old code
// PersistenceBackendFactory.renew(this);
countFallback = 0;
// disconnect
this.closeAndClean();
}
}
else{
logger.trace("accountWithFallback Fallback is same instance:{}"+this.getClass().getSimpleName());
} else {
logger.trace("accountWithFallback Fallback is same instance:{}" + this.getClass().getSimpleName());
}
try {
String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
logger.error("accountWithFallback {} was not accounted succesfully from {}. Trying to use {}.",
logger.error("accountWithFallback {} was not accounted succesfully from {}. Trying to use {}.",
record.toString(), persistenceName, fallabackPersistenceName, e);
fallbackPersistence.reallyAccount(record);
logger.trace("accountWithFallback {} accounted succesfully from {}",
record.toString(), fallabackPersistenceName);
}catch(Exception ex){
logger.trace("accountWithFallback {} accounted succesfully from {}", record.toString(),
fallabackPersistenceName);
} catch (Exception ex) {
logger.error("accountWithFallback {} was not accounted at all", record.toString(), e);
}
}
@ -194,66 +206,55 @@ public abstract class PersistenceBackend {
* @param validate
* @param aggregate
*/
protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){
protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate) {
try {
logger.trace("Received {} to account : {}", record.getClass().getSimpleName(), record);
if(validate){
if (validate) {
record.validate();
//logger.trace("{} {} valid", record.getClass().getSimpleName(), record);
// logger.trace("{} {} valid",
// record.getClass().getSimpleName(), record);
}
if(aggregate){
if (aggregate) {
try {
aggregationScheduler.aggregate(record, new DefaultPersitenceExecutor(this));
} catch(Exception e){
} catch (Exception e) {
this.accountWithFallback(record);
}
}else{
this.accountWithFallback(record);
} else {
this.accountWithFallback(record);
}
} catch (InvalidValueException e) {
logger.error("Error validating {}", record.getClass().getSimpleName(), e);
} catch (Exception e) {
logger.error("Error recording {}", record.getClass().getSimpleName(), e);
}
}
}
/**
* Persist the {@link #UsageRecord}.
* The Record is validated first, then accounted, in a separated thread.
* So that the program can continue the execution.
* If the persistence fails the class write that the record in a local file
* so that the {@link #UsageRecord} can be recorder later.
* @param usageRecord the {@link #UsageRecord} to persist
* @throws InvalidValueException if the Record Validation Fails
* Persist the {@link #UsageRecord}. The Record is validated first, then
* accounted, in a separated thread. So that the program can continue the
* execution. If the persistence fails the class write that the record in a
* local file so that the {@link #UsageRecord} can be recorder later.
*
* @param usageRecord
* the {@link #UsageRecord} to persist
* @throws InvalidValueException
* if the Record Validation Fails
*/
public void account(final Record record) throws InvalidValueException{
Runnable runnable = new Runnable(){
public void account(final Record record) throws InvalidValueException {
Runnable runnable = new Runnable() {
@Override
public void run(){
public void run() {
accountValidateAggregate(record, true, true);
}
};
};
ExecutorUtils.threadPool.execute(runnable);
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
aggregationScheduler.flush(new DefaultPersitenceExecutor(this));
}
public boolean isOpen(){
return !closed;
}
protected void setOpen(){
this.closed = false;
}
}

View File

@ -149,7 +149,6 @@ public abstract class PersistenceBackendFactory {
}
found.prepareConnection(configuration);
found.setOpen();
logger.trace("{} will be used.", foundClassName);