diff --git a/distro/changelog.xml b/distro/changelog.xml index 57980d1..8bc73d0 100644 --- a/distro/changelog.xml +++ b/distro/changelog.xml @@ -1,7 +1,10 @@ - + + Added abstract method isConnectionActive() in PersistenceBackend + + Added Jackson support on Usage Record model to allow to use it for marshalling and unmarshalling Marsahlling and unmarshalling use Jackson diff --git a/pom.xml b/pom.xml index b85d067..5909547 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.gcube.data.publishing document-store-lib - 2.0.0-SNAPSHOT + 2.1.0-SNAPSHOT Document Store Lib Allow to persist data in NoSQL Document Store Databases. Discover Model dynamically. diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java index 1100a14..0073c65 100644 --- a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java @@ -93,7 +93,10 @@ public class FallbackPersistenceBackend extends PersistenceBackend { } - + @Override + public boolean isConnectionActive() throws Exception{ + return true; + } } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java index 822a9f7..03541fa 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java @@ -26,27 +26,24 @@ public abstract class PersistenceBackend { protected PersistenceBackendMonitor persistenceBackendMonitor; - public static final Integer MAX_FALLBACK = 3; //max fallback with reload a configuration + public static final Integer MAX_FALLBACK = 3; // max fallback with reload a + // configuration protected static Integer countFallback; - - //add to control a timeout execption - protected boolean timeoutFallback=false; + + // add to control a timeout execption + protected boolean timeoutFallback = false; protected long timerToFallback; - public static final long MAX_TIME_TO_FALLBACK=1000*60*60; //60 min; + public static final long MAX_TIME_TO_FALLBACK = 1000 * 60 * 60; // 60 min; - - protected boolean closed; - - protected PersistenceBackend(){ - if(!(this instanceof FallbackPersistenceBackend)){ + protected PersistenceBackend() { + if (!(this instanceof FallbackPersistenceBackend)) { this.persistenceBackendMonitor = new PersistenceBackendMonitor(this); } - countFallback=0; - closed = true; + countFallback = 0; } - protected PersistenceBackend(FallbackPersistenceBackend fallback){ - this(); + protected PersistenceBackend(FallbackPersistenceBackend fallback) { + this(); this.fallbackPersistence = fallback; this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this), "FALLBACK"); @@ -60,7 +57,8 @@ public abstract class PersistenceBackend { } /** - * @param fallback the fallback to set + * @param fallback + * the fallback to set */ protected void setFallback(FallbackPersistenceBackend fallback) { this.fallbackPersistence = fallback; @@ -74,113 +72,127 @@ public abstract class PersistenceBackend { } /** - * @param aggregationScheduler the aggregationScheduler to set + * @param aggregationScheduler + * the aggregationScheduler to set */ protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) { this.aggregationScheduler = aggregationScheduler; } /** - * Prepare the connection to persistence. - * This method must be used by implementation class to prepare - * the connection with the persistence storage, DB, file etc. - * @param configuration The configuration to create the connection - * @throws Exception if fails + * Prepare the connection to persistence. This method must be used by + * implementation class to prepare the connection with the persistence + * storage, DB, file etc. + * + * @param configuration + * The configuration to create the connection + * @throws Exception + * if fails */ protected abstract void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception; /** - * This method is used to open db connection + * This method is used to open db connection + * * @throws Exception */ - protected abstract void openConnection() throws Exception ; - + protected abstract void openConnection() throws Exception; + /** - * This method is used to open db connection + * This method is used to open db connection + * * @throws Exception */ - protected abstract void closeConnection() throws Exception ; - + protected abstract void closeConnection() throws Exception; + /** - * This method is used to close + * This method is used to close + * * @throws Exception */ public abstract void close() throws Exception; - + /** - * This method is used to close db and clean the configuration. - * It is used when there are too much TimeoutException trying to persist - * records. + * This method is used to close db and clean the configuration. It is used + * when there are too much TimeoutException trying to persist records. + * * @throws Exception */ - protected void closeAndClean() throws Exception {} - ; + protected void closeAndClean() throws Exception {}; + /** + * Check the Connection state + * @return true if the connection is active, false otherwise + * @throws Exception + */ + public abstract boolean isConnectionActive() throws Exception; + /** * This method contains the code to save the {@link Record} * */ protected abstract void reallyAccount(Record record) throws Exception; - /*** * * @param records - * @throws Exception + * @throws Exception */ protected void accountWithFallback(Record... records) throws Exception { String persistenceName = this.getClass().getSimpleName(); - + this.openConnection(); - - for(Record record : records){ + + for (Record record : records) { try { - //old code - //this.reallyAccount(record); + // old code + // this.reallyAccount(record); long now = Calendar.getInstance().getTimeInMillis(); - if((timeoutFallback)){ + if ((timeoutFallback)) { fallbackPersistence.reallyAccount(record); - logger.trace("accountWithFallback for timeout, now:{} and timerToFallback:{}", now,timerToFallback ); - if ((now - timerToFallback) > MAX_TIME_TO_FALLBACK){ + logger.trace("accountWithFallback for timeout, now:{} and timerToFallback:{}", now, + timerToFallback); + if ((now - timerToFallback) > MAX_TIME_TO_FALLBACK) { logger.debug("accountWithFallback MAX_TIME_TO_FALLBACK is conclused"); - timeoutFallback=false; - timerToFallback=0; + timeoutFallback = false; + timerToFallback = 0; } - } - else{ + } else { this.reallyAccount(record); - logger.trace("accountWithFallback {} accounted succesfully from {}.", record.toString(), persistenceName); - timeoutFallback=false; - timerToFallback=0; + logger.trace("accountWithFallback {} accounted succesfully from {}.", record.toString(), + persistenceName); + timeoutFallback = false; + timerToFallback = 0; } } catch (Exception e) { // TODO Insert Renew HERE - if((! (this instanceof FallbackPersistenceBackend)) && e.getCause()!=null && e.getCause() instanceof TimeoutException){ + if ((!(this instanceof FallbackPersistenceBackend)) && e.getCause() != null + && e.getCause() instanceof TimeoutException) { logger.warn("accountWithFallback TimeoutException number:{} to {}.", countFallback, MAX_FALLBACK); - countFallback++; - if (countFallback.equals(MAX_FALLBACK)){ - timeoutFallback=true; + countFallback++; + if (countFallback.equals(MAX_FALLBACK)) { + timeoutFallback = true; timerToFallback = Calendar.getInstance().getTimeInMillis(); - logger.trace("accountWithFallback Going to account {} in fallback for too many timeout", record); - //old code - //PersistenceBackendFactory.renew(this); - countFallback=0; - //disconnect + logger.trace("accountWithFallback Going to account {} in fallback for too many timeout", + record); + // old code + // PersistenceBackendFactory.renew(this); + countFallback = 0; + // disconnect this.closeAndClean(); } - } - else{ - logger.trace("accountWithFallback Fallback is same instance:{}"+this.getClass().getSimpleName()); + } else { + logger.trace("accountWithFallback Fallback is same instance:{}" + this.getClass().getSimpleName()); } try { String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName(); - logger.error("accountWithFallback {} was not accounted succesfully from {}. Trying to use {}.", + logger.error("accountWithFallback {} was not accounted succesfully from {}. Trying to use {}.", record.toString(), persistenceName, fallabackPersistenceName, e); fallbackPersistence.reallyAccount(record); - logger.trace("accountWithFallback {} accounted succesfully from {}", - record.toString(), fallabackPersistenceName); - }catch(Exception ex){ + logger.trace("accountWithFallback {} accounted succesfully from {}", record.toString(), + fallabackPersistenceName); + } catch (Exception ex) { logger.error("accountWithFallback {} was not accounted at all", record.toString(), e); } } @@ -194,66 +206,55 @@ public abstract class PersistenceBackend { * @param validate * @param aggregate */ - protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){ + protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate) { try { logger.trace("Received {} to account : {}", record.getClass().getSimpleName(), record); - if(validate){ + if (validate) { record.validate(); - //logger.trace("{} {} valid", record.getClass().getSimpleName(), record); + // logger.trace("{} {} valid", + // record.getClass().getSimpleName(), record); } - if(aggregate){ + if (aggregate) { try { aggregationScheduler.aggregate(record, new DefaultPersitenceExecutor(this)); - } catch(Exception e){ + } catch (Exception e) { this.accountWithFallback(record); } - }else{ - this.accountWithFallback(record); + } else { + this.accountWithFallback(record); } } catch (InvalidValueException e) { logger.error("Error validating {}", record.getClass().getSimpleName(), e); } catch (Exception e) { - + logger.error("Error recording {}", record.getClass().getSimpleName(), e); - } + } } /** - * Persist the {@link #UsageRecord}. - * The Record is validated first, then accounted, in a separated thread. - * So that the program can continue the execution. - * If the persistence fails the class write that the record in a local file - * so that the {@link #UsageRecord} can be recorder later. - * @param usageRecord the {@link #UsageRecord} to persist - * @throws InvalidValueException if the Record Validation Fails + * Persist the {@link #UsageRecord}. The Record is validated first, then + * accounted, in a separated thread. So that the program can continue the + * execution. If the persistence fails the class write that the record in a + * local file so that the {@link #UsageRecord} can be recorder later. + * + * @param usageRecord + * the {@link #UsageRecord} to persist + * @throws InvalidValueException + * if the Record Validation Fails */ - public void account(final Record record) throws InvalidValueException{ - Runnable runnable = new Runnable(){ + public void account(final Record record) throws InvalidValueException { + Runnable runnable = new Runnable() { @Override - public void run(){ + public void run() { accountValidateAggregate(record, true, true); } - }; + }; ExecutorUtils.threadPool.execute(runnable); } - public void flush(long timeout, TimeUnit timeUnit) throws Exception { + public void flush(long timeout, TimeUnit timeUnit) throws Exception { aggregationScheduler.flush(new DefaultPersitenceExecutor(this)); } - public boolean isOpen(){ - return !closed; - } - - protected void setOpen(){ - this.closed = false; - } - - - - - - - } diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java index 24e96b9..f1800fe 100644 --- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -149,7 +149,6 @@ public abstract class PersistenceBackendFactory { } found.prepareConnection(configuration); - found.setOpen(); logger.trace("{} will be used.", foundClassName);