refs #1352: Create repetitive thread to retry to persist UsageRecords

https://support.d4science.org/issues/1352

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@120285 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-11-17 08:32:41 +00:00
parent 901f0ce0c6
commit 15ccd8487a
10 changed files with 318 additions and 29 deletions

View File

@ -504,17 +504,19 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable {
return 1;
}
private static final String AGGREGATED_PREFIX = "Aggregated";
@SuppressWarnings("unchecked")
protected static Class<? extends UsageRecord> getClass(String usageRecordName, boolean aggregated) throws ClassNotFoundException {
Class<? extends UsageRecord> clz = null;
Class<? extends UsageRecord> utilityClass = org.gcube.accounting.datamodel.usagerecords.JobUsageRecord.class;
Class<? extends UsageRecord> utilityClass = org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord.class;
if(aggregated){
utilityClass = org.gcube.accounting.aggregation.AggregatedJobUsageRecord.class;
utilityClass = org.gcube.accounting.aggregation.AggregatedServiceUsageRecord.class;
}
String classCanonicalName = utilityClass.getCanonicalName();
classCanonicalName = classCanonicalName.replace(utilityClass.getSimpleName(), usageRecordName);
classCanonicalName = classCanonicalName.replace(utilityClass.getSimpleName().replace(AGGREGATED_PREFIX, ""), usageRecordName);
try {
clz = (Class<? extends UsageRecord>) Class.forName(classCanonicalName);
@ -540,7 +542,11 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable {
boolean aggregated = false;
try {
aggregated = (Boolean) usageRecordMap.get(AGGREGATED);
}catch(Exception e){}
}catch(Exception e){
try{
aggregated = Boolean.parseBoolean((String)usageRecordMap.get(AGGREGATED));
} catch(Exception e1){}
}
Class<? extends UsageRecord> clz = getClass(className, aggregated);
logger.debug("Trying to instantiate {}", clz.getClass().getSimpleName());
@ -557,4 +563,57 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable {
return usageRecord;
}
/*
* IT DOES NOT WORK
* @SuppressWarnings("unchecked")
* public static Map<String,Serializable> getMapFromString(String serializedMap) throws IOException, ClassNotFoundException {
* ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedMap.getBytes());
* ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
* return ((Map<String,Serializable>) objectInputStream.readObject());
* }
*/
private final static String LINE_FREFIX = "{";
private final static String LINE_SUFFIX = "}";
private final static String KEY_VALUE_PAIR_SEPARATOR = ",";
private final static String KEY_VALUE_LINKER = "=";
protected static Map<String, Serializable> getMapFromString(String serializedMap){
/* Checking line sanity */
if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
return null;
}
/* Cleaning prefix and suffix to parse line */
serializedMap = serializedMap.replace(LINE_FREFIX, "");
serializedMap = serializedMap.replace(LINE_SUFFIX, "");
Map<String, Serializable> map = new HashMap<String, Serializable>();
String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
for (int i=0;i<pairs.length;i++) {
String pair = pairs[i];
pair.trim();
String[] keyValue = pair.split(KEY_VALUE_LINKER);
String key = keyValue[0].trim();
Serializable value = keyValue[1].trim();
map.put(key, value);
}
return map;
}
/**
*
* @param serializedMap
* @return
* @throws Exception
*/
public static UsageRecord getUsageRecord(String serializedMap) throws Exception {
Map<String,Serializable> map = getMapFromString(serializedMap);
return getUsageRecord(map);
}
}

View File

@ -19,7 +19,7 @@ public class ValidIntegerValidator implements FieldAction {
if(value instanceof Integer){
return value;
}
Integer integerObj = Integer.getInteger((String) value);
Integer integerObj = Integer.valueOf((String) value);
if(integerObj!=null){
return integerObj;
}

View File

@ -20,7 +20,7 @@ public class ValidLongValidator implements FieldAction {
return value;
}
try {
Long longObj = Long.getLong((String) value);
Long longObj = Long.valueOf((String) value);
if(longObj!=null){
return longObj;
}

View File

@ -22,9 +22,11 @@ public abstract class AccountingPersistenceBackend {
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackend.class);
protected FallbackPersistence fallback;
protected FallbackPersistenceBackend fallbackPersistence;
protected AggregationScheduler aggregationScheduler;
protected AccountingPersistenceBackendMonitor accountingPersistenceBackendMonitor;
/**
* Pool for thread execution
*/
@ -32,19 +34,30 @@ public abstract class AccountingPersistenceBackend {
protected AccountingPersistenceBackend(){
this.pool = Executors.newCachedThreadPool();
if(!(this instanceof FallbackPersistenceBackend)){
this.accountingPersistenceBackendMonitor = new AccountingPersistenceBackendMonitor(this);
}
}
protected AccountingPersistenceBackend(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){
this.fallback = fallback;
protected AccountingPersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = aggregationScheduler;
this.pool = Executors.newCachedThreadPool();
}
/**
* @return the fallbackPersistence
*/
public FallbackPersistenceBackend getFallbackPersistence() {
return fallbackPersistence;
}
/**
* @param fallback the fallback to set
*/
protected void setFallback(FallbackPersistence fallback) {
this.fallback = fallback;
protected void setFallback(FallbackPersistenceBackend fallback) {
this.fallbackPersistence = fallback;
}
/**
@ -76,7 +89,7 @@ public abstract class AccountingPersistenceBackend {
*/
protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception;
private void accountWithFallback(UsageRecord... usageRecords) {
protected void accountWithFallback(UsageRecord... usageRecords) {
String persistenceName = this.getClass().getSimpleName();
logger.trace("Going to account {} using {} : {}", Arrays.toString(usageRecords), persistenceName, this);
for(UsageRecord usageRecord : usageRecords){
@ -85,11 +98,11 @@ public abstract class AccountingPersistenceBackend {
this.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName);
} catch (Exception e) {
String fallabackPersistenceName = fallback.getClass().getSimpleName();
String fallabackPersistenceName = fallbackPersistence.getClass().getSimpleName();
try {
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
usageRecord.toString(), persistenceName, fallabackPersistenceName, e);
fallback.reallyAccount(usageRecord);
fallbackPersistence.reallyAccount(usageRecord);
logger.debug("{} accounted succesfully from {}",
usageRecord.toString(), fallabackPersistenceName);
}catch(Exception ex){

View File

@ -8,6 +8,7 @@ import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregation.scheduler.AggregationScheduler;
@ -34,13 +35,17 @@ public abstract class AccountingPersistenceBackendFactory {
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
protected static Map<String,Long> fallbackLastCheck;
static Map<String,Long> fallbackLastCheck;
static {
accountingPersistenceBackends = new HashMap<String, AccountingPersistenceBackend>();
fallbackLastCheck = new HashMap<String, Long>();
}
protected Set<String> getActiveScopes(){
return accountingPersistenceBackends.keySet();
}
private static File file(File file) throws IllegalArgumentException {
if(!file.isDirectory()){
file = file.getParentFile();
@ -87,7 +92,7 @@ public abstract class AccountingPersistenceBackendFactory {
return null;
};
protected static FallbackPersistence createFallback(String scope){
protected static FallbackPersistenceBackend createFallback(String scope){
File fallbackFile = null;
if(scope!=null){
ScopeBean bean = new ScopeBean(scope);
@ -97,7 +102,7 @@ public abstract class AccountingPersistenceBackendFactory {
}else{
fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME);
}
FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile);
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
return fallbackPersistence;
}
@ -149,14 +154,14 @@ public abstract class AccountingPersistenceBackendFactory {
if(persistence==null){
persistence = discoverAccountingPersistenceBackend(scope);
if(persistence==null){
logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistence.class.getSimpleName());
logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistenceBackend.class.getSimpleName());
long now = Calendar.getInstance().getTimeInMillis();
fallbackLastCheck.put(scope, now);
persistence = createFallback(scope);
}
accountingPersistenceBackends.put(scope, persistence);
} else if(persistence instanceof FallbackPersistence && fallbackLastCheck.get(scope)!=null){
} else if(persistence instanceof FallbackPersistenceBackend && fallbackLastCheck.get(scope)!=null){
// Trying to rediscover AccountingPersistenceBackend
persistence = rediscoverAccountingPersistenceBackend(persistence, scope);

View File

@ -0,0 +1,80 @@
/**
*
*/
package org.gcube.accounting.persistence;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.BasicUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class AccountingPersistenceBackendMonitor implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendMonitorTest.class);
private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION";
protected final ScheduledExecutorService scheduler;
protected final AccountingPersistenceBackend accountingPersistenceBackend;
public AccountingPersistenceBackendMonitor(AccountingPersistenceBackend accountingPersistenceBackend){
this.accountingPersistenceBackend = accountingPersistenceBackend;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
}
protected void elaborateFile(File elaborationFile){
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
UsageRecord usageRecord = BasicUsageRecord.getUsageRecord(line);
accountingPersistenceBackend.accountWithFallback(usageRecord);
} catch(Exception e){
logger.error("", e);
// TODO write line on file
}
}
} catch (FileNotFoundException e) {
logger.error("", e);
} catch (IOException e) {
logger.error("", e);
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
logger.debug("Trying to persist {}s which failed and were persisted using fallback", UsageRecord.class.getSimpleName());
FallbackPersistenceBackend fallbackPersistenceBackend = accountingPersistenceBackend.getFallbackPersistence();
File file = fallbackPersistenceBackend.getAccountingFallbackFile();
File elaborationFile = null;
synchronized (file) {
if(file.exists()){
elaborationFile = new File(file.getAbsolutePath()+ELABORATION_FILE_SUFFIX);
file.renameTo(elaborationFile);
}
}
if(elaborationFile!=null){
elaborateFile(elaborationFile);
}
}
}

View File

@ -15,11 +15,18 @@ import org.gcube.accounting.datamodel.UsageRecord;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class FallbackPersistence extends AccountingPersistenceBackend {
public class FallbackPersistenceBackend extends AccountingPersistenceBackend {
private File accountingFallbackFile;
protected FallbackPersistence(File accountingFallbackFile) {
/**
* @return the accountingFallbackFile
*/
protected File getAccountingFallbackFile() {
return accountingFallbackFile;
}
protected FallbackPersistenceBackend(File accountingFallbackFile) {
super(null, AggregationScheduler.newInstance());
this.accountingFallbackFile = accountingFallbackFile;
}
@ -34,17 +41,24 @@ public class FallbackPersistence extends AccountingPersistenceBackend {
/**
* {@inheritDoc}
* This method is synchronized on {@link File} used, so any actions which
* has to modify, rename or delete the file must be synchronized on this
* file. To retrieve it use {@link #getAccountingFallbackFile()} method.
* This is intended for internal library usage only so that is protected
*/
@Override
protected void reallyAccount(UsageRecord usageRecord) throws Exception {
synchronized (accountingFallbackFile) {
try(FileWriter fw = new FileWriter(accountingFallbackFile, true);
BufferedWriter bw = new BufferedWriter(fw);
PrintWriter out = new PrintWriter(bw)){
out.println(usageRecord);
out.flush();
} catch( IOException e ){
throw e;
}
}
}
/**
* {@inheritDoc}

View File

@ -0,0 +1,73 @@
/**
*
*/
package org.gcube.accounting.persistence;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.datamodel.SingleUsageRecord;
import org.gcube.accounting.datamodel.basetypes.TestUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.gcube.accounting.testutility.StressTestUtility;
import org.gcube.accounting.testutility.TestOperation;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class AccountingPersistenceBackendMonitorTest {
private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendMonitor.class);
public static final long timeout = 5000;
public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void parsingTest() throws Exception {
ScopeProvider.instance.set("/gcube/devsec");
AccountingPersistenceBackendFactory.setFallbackLocation(null);
final AccountingPersistence persistence = AccountingPersistence.getInstance();
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {
SingleUsageRecord usageRecord = null;
switch (i%3) {
case 0:
usageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope();
break;
case 1:
usageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope();
break;
case 2:
usageRecord = TestUsageRecord.createTestJobUsageRecordAutomaticScope();
break;
}
try {
persistence.account(usageRecord);
} catch (InvalidValueException e) {
throw new RuntimeException(e);
}
}
});
logger.debug(" START -----------------------------------------------");
logger.debug("Flushing the buffered records");
persistence.flush(timeout, timeUnit);
logger.debug(" END -----------------------------------------------");
AccountingPersistenceBackend accountingPersistenceBackend = AccountingPersistenceBackendFactory.getPersistenceBackend();
accountingPersistenceBackend.setFallback((FallbackPersistenceBackend) accountingPersistenceBackend);
AccountingPersistenceBackendMonitor accountingPersistenceBackendMonitor = new AccountingPersistenceBackendMonitor(accountingPersistenceBackend);
accountingPersistenceBackendMonitor.run();
}
}

View File

@ -42,7 +42,7 @@ public class AccountingPersistenceBackendTest {
public void singleTestNoScope() throws Exception {
AccountingPersistenceBackendFactory.setFallbackLocation(null);
final AccountingPersistenceBackend persistence = AccountingPersistenceBackendFactory.getPersistenceBackend();
Assert.assertTrue(persistence instanceof FallbackPersistence);
Assert.assertTrue(persistence instanceof FallbackPersistenceBackend);
StressTestUtility.stressTest(new TestOperation() {
@Override
public void operate(int i) {

View File

@ -3,6 +3,11 @@
*/
package org.gcube.accounting.persistence;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
import java.util.Arrays;
@ -264,4 +269,44 @@ public class AccountingPersistenceConfigurationTest {
}
/*
@Test
public void testFiles() throws IOException{
File file = new File("./aux.txt");
logger.debug("file : {}", file.getAbsolutePath());
String aux = "AUX";
try(FileWriter fw = new FileWriter(file, true);
BufferedWriter bw = new BufferedWriter(fw);
PrintWriter out = new PrintWriter(bw)){
out.println(aux);
out.flush();
} catch( IOException e ){
throw e;
}
File same = file;
logger.debug("same : {}", same.getAbsolutePath());
boolean moved = same.renameTo(new File(file.getAbsolutePath()+".RENAMED"));
logger.debug("Moved : {}", moved);
logger.debug("AFTER RENAME");
logger.debug("file : {}", file.getAbsolutePath());
logger.debug("same : {}", same.getAbsolutePath());
aux = "DONE";
try(FileWriter fw = new FileWriter(file, true);
BufferedWriter bw = new BufferedWriter(fw);
PrintWriter out = new PrintWriter(bw)){
out.println(aux);
out.flush();
} catch( IOException e ){
throw e;
}
}
*/
}