2015-10-12 18:03:39 +02:00
/ * *
*
* /
package org.gcube.accounting.persistence ;
import java.io.File ;
2015-11-12 14:26:31 +01:00
import java.util.Calendar ;
2015-10-12 18:03:39 +02:00
import java.util.HashMap ;
import java.util.Map ;
import java.util.ServiceLoader ;
2015-10-14 10:46:13 +02:00
import java.util.concurrent.TimeUnit ;
2015-10-12 18:03:39 +02:00
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler ;
import org.gcube.common.scope.api.ScopeProvider ;
import org.gcube.common.scope.impl.ScopeBean ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
/ * *
* @author Luca Frosini ( ISTI - CNR ) http : //www.lucafrosini.com/
*
* /
public abstract class AccountingPersistenceBackendFactory {
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
private static final Logger logger = LoggerFactory . getLogger ( AccountingPersistenceBackendFactory . class ) ;
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
public final static String HOME_SYSTEM_PROPERTY = " user.home " ;
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
private static final String ACCOUTING_FALLBACK_FILENAME = " accountingFallback.log " ;
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
private static String fallbackLocation ;
2015-11-13 12:24:57 +01:00
private static Map < String , AccountingPersistenceBackend > accountingPersistenceBackends ;
2015-11-17 11:22:41 +01:00
private static Map < String , Long > fallbackLastCheck ;
2015-11-06 17:59:35 +01:00
2015-11-12 16:56:22 +01:00
public static final long FALLBACK_RETRY_TIME = 1000 * 60 * 10 ; // 10 min
2015-11-17 11:22:41 +01:00
/ * *
* @return the fallbackLastCheck
* /
protected static Long getFallbackLastCheck ( String scope ) {
return fallbackLastCheck . get ( scope ) ;
}
2015-11-12 14:26:31 +01:00
2015-10-12 18:03:39 +02:00
static {
2015-11-13 12:24:57 +01:00
accountingPersistenceBackends = new HashMap < String , AccountingPersistenceBackend > ( ) ;
2015-11-12 16:56:22 +01:00
fallbackLastCheck = new HashMap < String , Long > ( ) ;
2015-10-12 18:03:39 +02:00
}
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
private static File file ( File file ) throws IllegalArgumentException {
if ( ! file . isDirectory ( ) ) {
file = file . getParentFile ( ) ;
}
2015-11-12 14:26:31 +01:00
// Create folder structure if not exist
if ( ! file . exists ( ) ) {
2015-10-12 18:03:39 +02:00
file . mkdirs ( ) ;
2015-11-12 14:26:31 +01:00
}
2015-10-12 18:03:39 +02:00
return file ;
}
2015-11-06 17:59:35 +01:00
protected synchronized static void setFallbackLocation ( String path ) {
if ( fallbackLocation = = null ) {
if ( path = = null ) {
path = System . getProperty ( HOME_SYSTEM_PROPERTY ) ;
2015-10-12 18:03:39 +02:00
}
2015-11-06 17:59:35 +01:00
file ( new File ( path ) ) ;
fallbackLocation = path ;
2015-10-12 18:03:39 +02:00
}
}
2015-11-06 17:59:35 +01:00
2015-11-17 11:22:41 +01:00
protected static FallbackPersistenceBackend createFallback ( String scope ) {
2015-11-18 17:20:11 +01:00
logger . debug ( " Creating {} for scope {} " , FallbackPersistenceBackend . class . getSimpleName ( ) , scope ) ;
2015-11-17 11:22:41 +01:00
File fallbackFile = null ;
if ( scope ! = null ) {
ScopeBean bean = new ScopeBean ( scope ) ;
/* if(bean.is(Type.VRE)){ bean = bean.enclosingScope(); } */
String name = bean . name ( ) ;
fallbackFile = new File ( fallbackLocation , String . format ( " %s.%s " , name , ACCOUTING_FALLBACK_FILENAME ) ) ;
} else {
fallbackFile = new File ( fallbackLocation , ACCOUTING_FALLBACK_FILENAME ) ;
}
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend ( fallbackFile ) ;
fallbackPersistence . setAggregationScheduler ( AggregationScheduler . newInstance ( ) ) ;
return fallbackPersistence ;
}
2015-11-13 11:43:18 +01:00
protected static AccountingPersistenceBackend discoverAccountingPersistenceBackend ( String scope ) {
2015-11-18 17:20:11 +01:00
logger . debug ( " Discovering {} for scope {} " ,
AccountingPersistenceBackend . class . getSimpleName ( ) , scope ) ;
2015-11-13 11:43:18 +01:00
ServiceLoader < AccountingPersistenceBackend > serviceLoader = ServiceLoader . load ( AccountingPersistenceBackend . class ) ;
for ( AccountingPersistenceBackend foundPersistence : serviceLoader ) {
try {
String foundPersistenceClassName = foundPersistence . getClass ( ) . getSimpleName ( ) ;
logger . debug ( " Testing {} " , foundPersistenceClassName ) ;
AccountingPersistenceConfiguration configuration = new AccountingPersistenceConfiguration ( foundPersistenceClassName ) ;
foundPersistence . prepareConnection ( configuration ) ;
/ *
* Uncomment the following line of code if you want to try
* to create a test UsageRecord before setting the
* foundPersistence as default
*
* foundPersistence . accountWithFallback ( TestUsageRecord . createTestServiceUsageRecord ( ) ) ;
* /
logger . debug ( " {} will be used. " , foundPersistenceClassName ) ;
foundPersistence . setAggregationScheduler ( AggregationScheduler . newInstance ( ) ) ;
return foundPersistence ;
} catch ( Exception e ) {
logger . error ( String . format ( " %s not initialized correctly. It will not be used. Trying the next one if any. " , foundPersistence . getClass ( ) . getSimpleName ( ) ) , e ) ;
}
}
return null ;
} ;
2015-11-13 12:33:50 +01:00
protected static AccountingPersistenceBackend rediscoverAccountingPersistenceBackend ( AccountingPersistenceBackend actual , String scope ) {
2015-11-18 12:25:06 +01:00
Long now = Calendar . getInstance ( ) . getTimeInMillis ( ) ;
2015-11-13 12:33:50 +01:00
Long lastCheckTimestamp = fallbackLastCheck . get ( scope ) ;
2015-11-18 17:20:11 +01:00
logger . debug ( " Last check for scope {} was {} " , scope , lastCheckTimestamp ) ;
2015-11-18 17:42:15 +01:00
boolean myTurn = false ;
synchronized ( accountingPersistenceBackends ) {
if ( ( lastCheckTimestamp + FALLBACK_RETRY_TIME ) < = now ) {
logger . debug ( " The {} for scope {} is {}. Is time to rediscover if there is another possibility. " ,
2015-11-13 12:33:50 +01:00
AccountingPersistenceBackend . class . getSimpleName ( ) , scope , actual . getClass ( ) . getSimpleName ( ) ) ;
2015-11-18 17:42:15 +01:00
logger . trace ( " Renewing Last check Timestamp. The next one will be {} " , now ) ;
fallbackLastCheck . put ( scope , now ) ;
myTurn = true ;
logger . debug ( " I win. It is my turn to rediscover {} in scope {} " ,
AccountingPersistenceBackend . class . getSimpleName ( ) , scope ) ;
}
}
if ( myTurn ) {
2015-11-18 12:30:35 +01:00
AccountingPersistenceBackend discoveredPersistenceBackend = discoverAccountingPersistenceBackend ( scope ) ;
2015-11-13 12:33:50 +01:00
2015-11-18 12:30:35 +01:00
synchronized ( accountingPersistenceBackends ) {
2015-11-17 12:15:16 +01:00
if ( discoveredPersistenceBackend ! = null ) {
2015-11-18 12:55:14 +01:00
/ *
* Passing the aggregator to the new AccountingPersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
* /
2015-11-17 12:15:16 +01:00
discoveredPersistenceBackend . setAggregationScheduler ( actual . getAggregationScheduler ( ) ) ;
2015-11-18 12:55:14 +01:00
// Removing timestamp which is no more needed
2015-11-17 12:15:16 +01:00
fallbackLastCheck . remove ( scope ) ;
accountingPersistenceBackends . put ( scope , discoveredPersistenceBackend ) ;
2015-11-18 12:55:14 +01:00
/ *
* Not needed because close has no effect . Removed to
* prevent problem in cases of future changes .
* try {
* actual . close ( ) ;
* } catch ( Exception e ) {
* logger . error ( " Error closing {} for scope {} which has been substituted with {}. " ,
* actual . getClass ( ) . getSimpleName ( ) , scope ,
* discoveredPersistenceBackend . getClass ( ) . getSimpleName ( ) , e ) ;
* }
*
* /
2015-11-17 12:15:16 +01:00
return discoveredPersistenceBackend ;
}
2015-11-13 12:33:50 +01:00
}
2015-11-17 12:15:16 +01:00
2015-11-13 12:33:50 +01:00
}
2015-11-17 11:58:16 +01:00
long nextCheck = ( lastCheckTimestamp + FALLBACK_RETRY_TIME ) - Calendar . getInstance ( ) . getTimeInMillis ( ) ;
float nextCheckInSec = nextCheck / 1000 ;
logger . debug ( " The {} for scope {} is going to be used is {}. Next retry in {} msec (about {} sec) " ,
AccountingPersistenceBackend . class . getSimpleName ( ) , scope ,
actual . getClass ( ) . getSimpleName ( ) , nextCheck , nextCheckInSec ) ;
2015-11-13 12:33:50 +01:00
return actual ;
}
2015-11-13 11:43:18 +01:00
2015-11-17 18:22:22 +01:00
protected static AccountingPersistenceBackend getPersistenceBackend ( ) {
2015-10-12 18:03:39 +02:00
String scope = ScopeProvider . instance . get ( ) ;
2015-11-12 14:26:31 +01:00
2015-10-12 18:03:39 +02:00
if ( scope = = null ) {
logger . error ( " No Scope available. FallbackPersistence will be used " ) ;
2015-11-13 11:43:18 +01:00
return createFallback ( null ) ;
2015-10-12 18:03:39 +02:00
}
2015-11-06 17:59:35 +01:00
2015-11-17 11:22:41 +01:00
AccountingPersistenceBackend persistence = null ;
2015-11-18 17:20:11 +01:00
logger . debug ( " Going to synchronized block in getPersistenceBackend " ) ;
2015-11-18 12:30:35 +01:00
synchronized ( accountingPersistenceBackends ) {
2015-11-17 11:22:41 +01:00
persistence = accountingPersistenceBackends . get ( scope ) ;
2015-11-18 17:20:11 +01:00
logger . debug ( " {} {} " , AccountingPersistenceBackend . class . getSimpleName ( ) , persistence ) ;
2015-11-13 11:43:18 +01:00
if ( persistence = = null ) {
2015-11-17 11:22:41 +01:00
/ *
* Setting FallbackPersistence and unlocking .
* This is used to avoid deadlock on IS node which try to use
* itself to query configuration .
* /
2015-11-13 11:43:18 +01:00
persistence = createFallback ( scope ) ;
2015-11-17 11:22:41 +01:00
accountingPersistenceBackends . put ( scope , persistence ) ;
long now = Calendar . getInstance ( ) . getTimeInMillis ( ) ;
/ * The AccountingPersistenceBackend is still to be discovered
* setting the last check advanced in time to force rediscover .
* /
fallbackLastCheck . put ( scope , ( ( now - FALLBACK_RETRY_TIME ) - 1 ) ) ;
2015-11-12 14:26:31 +01:00
}
2015-11-17 11:22:41 +01:00
}
2015-11-17 12:15:16 +01:00
if ( persistence instanceof FallbackPersistenceBackend ) {
persistence = rediscoverAccountingPersistenceBackend ( persistence , scope ) ;
2015-10-12 18:03:39 +02:00
}
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
return persistence ;
}
2015-10-14 10:46:13 +02:00
/ * *
* @param timeout
* @param timeUnit
* @throws Exception
* /
2015-11-12 14:26:31 +01:00
public static void flushAll ( long timeout , TimeUnit timeUnit ) {
2015-11-13 12:24:57 +01:00
for ( String scope : accountingPersistenceBackends . keySet ( ) ) {
AccountingPersistenceBackend apb = accountingPersistenceBackends . get ( scope ) ;
2015-10-14 10:46:13 +02:00
try {
logger . debug ( " Flushing records in scope {} " , scope ) ;
apb . flush ( timeout , timeUnit ) ;
} catch ( Exception e ) {
logger . error ( " Unable to flush records in scope {} with {} " , scope , apb ) ;
}
}
}
2015-11-06 17:59:35 +01:00
2015-10-12 18:03:39 +02:00
}