refs #1282: New Accounting: IS-Collector doesn't start correctly after upgrade to ghn 6.0.0
https://support.d4science.org/issues/1282 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@120286 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
15ccd8487a
commit
c5134be2b6
|
@ -8,8 +8,8 @@ import java.util.Calendar;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
import org.gcube.common.scope.api.ScopeProvider;
|
||||||
|
@ -31,21 +31,24 @@ public abstract class AccountingPersistenceBackendFactory {
|
||||||
|
|
||||||
private static String fallbackLocation;
|
private static String fallbackLocation;
|
||||||
|
|
||||||
|
private static ReentrantLock lock = new ReentrantLock();
|
||||||
private static Map<String, AccountingPersistenceBackend> accountingPersistenceBackends;
|
private static Map<String, AccountingPersistenceBackend> accountingPersistenceBackends;
|
||||||
|
private static Map<String, Long> fallbackLastCheck;
|
||||||
|
|
||||||
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
|
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
|
||||||
|
|
||||||
static Map<String,Long> fallbackLastCheck;
|
/**
|
||||||
|
* @return the fallbackLastCheck
|
||||||
|
*/
|
||||||
|
protected static Long getFallbackLastCheck(String scope) {
|
||||||
|
return fallbackLastCheck.get(scope);
|
||||||
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
accountingPersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
|
accountingPersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
|
||||||
fallbackLastCheck = new HashMap<String, Long>();
|
fallbackLastCheck = new HashMap<String, Long>();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Set<String> getActiveScopes(){
|
|
||||||
return accountingPersistenceBackends.keySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static File file(File file) throws IllegalArgumentException {
|
private static File file(File file) throws IllegalArgumentException {
|
||||||
if(!file.isDirectory()){
|
if(!file.isDirectory()){
|
||||||
file = file.getParentFile();
|
file = file.getParentFile();
|
||||||
|
@ -67,6 +70,21 @@ public abstract class AccountingPersistenceBackendFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static FallbackPersistenceBackend createFallback(String scope){
|
||||||
|
File fallbackFile = null;
|
||||||
|
if(scope!=null){
|
||||||
|
ScopeBean bean = new ScopeBean(scope);
|
||||||
|
/* if(bean.is(Type.VRE)){ bean = bean.enclosingScope(); } */
|
||||||
|
String name = bean.name();
|
||||||
|
fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
|
||||||
|
}else{
|
||||||
|
fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
|
||||||
|
}
|
||||||
|
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
|
||||||
|
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
|
||||||
|
return fallbackPersistence;
|
||||||
|
}
|
||||||
|
|
||||||
protected static AccountingPersistenceBackend discoverAccountingPersistenceBackend(String scope){
|
protected static AccountingPersistenceBackend discoverAccountingPersistenceBackend(String scope){
|
||||||
ServiceLoader<AccountingPersistenceBackend> serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
|
ServiceLoader<AccountingPersistenceBackend> serviceLoader = ServiceLoader.load(AccountingPersistenceBackend.class);
|
||||||
for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
|
for (AccountingPersistenceBackend foundPersistence : serviceLoader) {
|
||||||
|
@ -92,26 +110,11 @@ public abstract class AccountingPersistenceBackendFactory {
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
|
|
||||||
protected static FallbackPersistenceBackend createFallback(String scope){
|
|
||||||
File fallbackFile = null;
|
|
||||||
if(scope!=null){
|
|
||||||
ScopeBean bean = new ScopeBean(scope);
|
|
||||||
/* if(bean.is(Type.VRE)){ bean = bean.enclosingScope(); } */
|
|
||||||
String name = bean.name();
|
|
||||||
fallbackFile = new File(fallbackLocation, String.format("%s.%s", name, ACCOUTING_FALLBACK_FILENAME));
|
|
||||||
}else{
|
|
||||||
fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
|
|
||||||
}
|
|
||||||
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
|
|
||||||
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
|
|
||||||
return fallbackPersistence;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static AccountingPersistenceBackend rediscoverAccountingPersistenceBackend(AccountingPersistenceBackend actual, String scope){
|
protected static AccountingPersistenceBackend rediscoverAccountingPersistenceBackend(AccountingPersistenceBackend actual, String scope){
|
||||||
long now = Calendar.getInstance().getTimeInMillis();
|
long now = Calendar.getInstance().getTimeInMillis();
|
||||||
Long lastCheckTimestamp = fallbackLastCheck.get(scope);
|
Long lastCheckTimestamp = fallbackLastCheck.get(scope);
|
||||||
|
|
||||||
if(lastCheckTimestamp <= (now + FALLBACK_RETRY_TIME)){
|
if( (lastCheckTimestamp + FALLBACK_RETRY_TIME) <= now ){
|
||||||
logger.debug("The {} for scope {} is {}. Is time to rediscover if there is another possibility.",
|
logger.debug("The {} for scope {} is {}. Is time to rediscover if there is another possibility.",
|
||||||
AccountingPersistenceBackend.class.getSimpleName(), scope, actual.getClass().getSimpleName());
|
AccountingPersistenceBackend.class.getSimpleName(), scope, actual.getClass().getSimpleName());
|
||||||
|
|
||||||
|
@ -139,6 +142,8 @@ public abstract class AccountingPersistenceBackendFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug("The {} for scope {} is going to be used is {}. Next retry in {} msec",
|
||||||
|
AccountingPersistenceBackend.class.getSimpleName(), scope, actual.getClass().getSimpleName(), FALLBACK_RETRY_TIME);
|
||||||
return actual;
|
return actual;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,21 +155,36 @@ public abstract class AccountingPersistenceBackendFactory {
|
||||||
return createFallback(null);
|
return createFallback(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
AccountingPersistenceBackend persistence = accountingPersistenceBackends.get(scope);
|
lock.lock();
|
||||||
if(persistence==null){
|
AccountingPersistenceBackend persistence = null;
|
||||||
persistence = discoverAccountingPersistenceBackend(scope);
|
try {
|
||||||
|
persistence = accountingPersistenceBackends.get(scope);
|
||||||
if(persistence==null){
|
if(persistence==null){
|
||||||
logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistenceBackend.class.getSimpleName());
|
/*
|
||||||
long now = Calendar.getInstance().getTimeInMillis();
|
* Setting FallbackPersistence and unlocking.
|
||||||
fallbackLastCheck.put(scope, now);
|
* This is used to avoid deadlock on IS node which try to use
|
||||||
|
* itself to query configuration.
|
||||||
|
*/
|
||||||
persistence = createFallback(scope);
|
persistence = createFallback(scope);
|
||||||
|
accountingPersistenceBackends.put(scope, persistence);
|
||||||
|
long now = Calendar.getInstance().getTimeInMillis();
|
||||||
|
/* The AccountingPersistenceBackend is still to be discovered
|
||||||
|
* setting the last check advanced in time to force rediscover.
|
||||||
|
*/
|
||||||
|
fallbackLastCheck.put(scope, ((now - FALLBACK_RETRY_TIME) - 1));
|
||||||
}
|
}
|
||||||
accountingPersistenceBackends.put(scope, persistence);
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
} else if(persistence instanceof FallbackPersistenceBackend && fallbackLastCheck.get(scope)!=null){
|
}
|
||||||
// Trying to rediscover AccountingPersistenceBackend
|
|
||||||
persistence = rediscoverAccountingPersistenceBackend(persistence, scope);
|
lock.lock();
|
||||||
|
try {
|
||||||
|
persistence = accountingPersistenceBackends.get(scope);
|
||||||
|
if(persistence instanceof FallbackPersistenceBackend){
|
||||||
|
persistence = rediscoverAccountingPersistenceBackend(persistence, scope);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return persistence;
|
return persistence;
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class AccountingPersistenceBackendTest {
|
||||||
public void testScopeRecheck() throws Exception {
|
public void testScopeRecheck() throws Exception {
|
||||||
ScopeProvider.instance.set("/fakeScope");
|
ScopeProvider.instance.set("/fakeScope");
|
||||||
AccountingPersistenceBackendFactory.getPersistenceBackend();
|
AccountingPersistenceBackendFactory.getPersistenceBackend();
|
||||||
Long firstCheck = AccountingPersistenceBackendFactory.fallbackLastCheck.get(ScopeProvider.instance.get());
|
Long firstCheck = AccountingPersistenceBackendFactory.getFallbackLastCheck(ScopeProvider.instance.get());
|
||||||
logger.debug("First Check Time {}", firstCheck);
|
logger.debug("First Check Time {}", firstCheck);
|
||||||
|
|
||||||
long startTime = Calendar.getInstance().getTimeInMillis();
|
long startTime = Calendar.getInstance().getTimeInMillis();
|
||||||
|
@ -115,7 +115,7 @@ public class AccountingPersistenceBackendTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
AccountingPersistenceBackendFactory.getPersistenceBackend();
|
AccountingPersistenceBackendFactory.getPersistenceBackend();
|
||||||
Long secondCheck = AccountingPersistenceBackendFactory.fallbackLastCheck.get(ScopeProvider.instance.get());
|
Long secondCheck = AccountingPersistenceBackendFactory.getFallbackLastCheck(ScopeProvider.instance.get());
|
||||||
logger.debug("Second Check Time {}", secondCheck);
|
logger.debug("Second Check Time {}", secondCheck);
|
||||||
|
|
||||||
Assert.assertNotEquals(firstCheck, secondCheck);
|
Assert.assertNotEquals(firstCheck, secondCheck);
|
||||||
|
|
Loading…
Reference in New Issue