refs #3121: New Accounting: IS-Collector doesn't start correctly after upgrade to ghn 6.0.0

https://support.d4science.org/issues/3121

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@128070 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2016-04-13 16:16:47 +00:00
parent 2cb47cd34c
commit 1e1dad281e
3 changed files with 105 additions and 98 deletions

View File

@ -4,8 +4,6 @@
package org.gcube.documentstore.persistence;
import java.io.File;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
@ -32,22 +30,9 @@ public abstract class PersistenceBackendFactory {
private static String fallbackLocation;
private static Map<String, PersistenceBackend> persistenceBackends;
private static Map<String, Long> fallbackLastCheck;
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
/**
* @return the fallbackLastCheck
*/
protected static Long getFallbackLastCheck(String scope) {
return fallbackLastCheck.get(scope);
}
static {
persistenceBackends = new HashMap<String, PersistenceBackend>();
fallbackLastCheck = new HashMap<String, Long>();
}
public static void addRecordPackage(Package packageObject) {
RecordUtility.addRecordPackage(packageObject);
}
@ -140,67 +125,6 @@ public abstract class PersistenceBackendFactory {
return null;
};
protected static PersistenceBackend rediscoverPersistenceBackend(PersistenceBackend actual, String context){
context = sanitizeContext(context);
Long now = Calendar.getInstance().getTimeInMillis();
Long lastCheckTimestamp = fallbackLastCheck.get(context);
logger.debug("Last check for context {} was {}", context, lastCheckTimestamp);
boolean myTurn = false;
synchronized (persistenceBackends) {
if( (lastCheckTimestamp + FALLBACK_RETRY_TIME) <= now ){
logger.debug("The {} for context {} is {}. Is time to rediscover if there is another possibility.",
PersistenceBackend.class.getSimpleName(), context, actual.getClass().getSimpleName());
logger.trace("Renewing Last check Timestamp. The next one will be {}", now);
fallbackLastCheck.put(context, now);
myTurn=true;
logger.debug("I win. It is my turn to rediscover {} in context {}",
PersistenceBackend.class.getSimpleName(), context);
}
}
if(myTurn){
PersistenceBackend discoveredPersistenceBackend = discoverPersistenceBackend(context);
synchronized (persistenceBackends) {
if(discoveredPersistenceBackend!=null){
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
*/
discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
// Removing timestamp which is no more needed
fallbackLastCheck.remove(context);
persistenceBackends.put(context, discoveredPersistenceBackend);
/*
* Not needed because close has no effect. Removed to
* prevent problem in cases of future changes.
* try {
* actual.close();
* } catch (Exception e) {
* logger.error("Error closing {} for scope {} which has been substituted with {}.",
* actual.getClass().getSimpleName(), scope,
* discoveredPersistenceBackend.getClass().getSimpleName(), e);
* }
*
*/
return discoveredPersistenceBackend;
}
}
}
long nextCheck = (lastCheckTimestamp + FALLBACK_RETRY_TIME) - Calendar.getInstance().getTimeInMillis();
float nextCheckInSec = nextCheck/1000;
logger.debug("The {} for context {} is going to be used is {}. Next retry in {} msec (about {} sec)",
PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName(), nextCheck, nextCheckInSec);
return actual;
}
public static PersistenceBackend getPersistenceBackend(String context) {
context = sanitizeContext(context);
@ -213,39 +137,73 @@ public abstract class PersistenceBackendFactory {
if(persistence==null){
/*
* Setting FallbackPersistence and unlocking.
* This is used to avoid deadlock on IS node which try to use
* itself to query configuration.
* There will be another thread which will try to discover the
* real persistence.
*/
persistence = createFallback(context);
persistenceBackends.put(context, persistence);
long now = Calendar.getInstance().getTimeInMillis();
try {
Class.forName("org.gcube.informationsystem.collector.impl.utils.Identifier");
// We are on IC so that the rediscovery must be delayed
fallbackLastCheck.put(context, now);
} catch (ClassNotFoundException e) {
/* We are not on IC
* The PersistenceBackend is still to be discovered
* setting the last check advanced in time to force rediscover.
*/
fallbackLastCheck.put(context, ((now - FALLBACK_RETRY_TIME) - 1));
}
new PersistenceBackendRediscover(context,
(FallbackPersistenceBackend) persistence, 100,
FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS);
}
}
if(persistence instanceof FallbackPersistenceBackend){
persistence = rediscoverPersistenceBackend(persistence, context);
}
return persistence;
}
protected static PersistenceBackend rediscoverPersistenceBackend(PersistenceBackend actual, String context){
context = sanitizeContext(context);
logger.debug("The {} for context {} is {}. "
+ "Is time to rediscover if there is another possibility.",
PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName());
PersistenceBackend discoveredPersistenceBackend =
PersistenceBackendFactory.discoverPersistenceBackend(context);
if(discoveredPersistenceBackend!=null){
synchronized (persistenceBackends) {
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
*/
discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
persistenceBackends.put(context, discoveredPersistenceBackend);
/*
* Not needed because close has no effect. Removed to
* prevent problem in cases of future changes.
* try {
* actual.close();
* } catch (Exception e) {
* logger.error("Error closing {} for scope {} which has been substituted with {}.",
* actual.getClass().getSimpleName(), scope,
* discoveredPersistenceBackend.getClass().getSimpleName(), e);
* }
*
*/
return discoveredPersistenceBackend;
}
}
return actual;
}
public static void flush(String context, long timeout, TimeUnit timeUnit){
context = sanitizeContext(context);
PersistenceBackend apb = persistenceBackends.get(context);
PersistenceBackend apb;
synchronized (persistenceBackends) {
apb = persistenceBackends.get(context);
}
try {
logger.debug("Flushing records in context {}", context);
apb.flush(timeout, timeUnit);

View File

@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
*
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class PersistenceBackendMonitor implements Runnable {
class PersistenceBackendMonitor implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendMonitor.class);

View File

@ -0,0 +1,49 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
class PersistenceBackendRediscover implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendRediscover.class);
protected final ScheduledExecutorService scheduler;
protected final String context;
protected final FallbackPersistenceBackend fallbackPersistenceBackend;
public PersistenceBackendRediscover(String context,
FallbackPersistenceBackend fallbackPersistenceBackend,
long initialDelay, long delay, TimeUnit timeUnit){
this.context = context;
this.fallbackPersistenceBackend = fallbackPersistenceBackend;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
}
@Override
public void run() {
PersistenceBackend rediscovered = PersistenceBackendFactory.
rediscoverPersistenceBackend(fallbackPersistenceBackend, context);
if(rediscovered!=fallbackPersistenceBackend){
logger.debug("Another {} was found : {}. "
+ "Shutting down {} Thread for context {}",
PersistenceBackend.class.getSimpleName(),
rediscovered.getClass().getSimpleName(),
PersistenceBackendRediscover.class.getSimpleName(),
context);
scheduler.shutdown();
}
}
}