From 15ccd8487ad53d0c93ef0bfe9d989f393c5bc685 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Tue, 17 Nov 2015 08:32:41 +0000 Subject: [PATCH] refs #1352: Create repetitive thread to retry to persist UsageRecords https://support.d4science.org/issues/1352 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-lib@120285 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../datamodel/BasicUsageRecord.java | 67 +++++++++++++++- .../validators/ValidIntegerValidator.java | 2 +- .../validators/ValidLongValidator.java | 2 +- .../AccountingPersistenceBackend.java | 31 ++++--- .../AccountingPersistenceBackendFactory.java | 15 ++-- .../AccountingPersistenceBackendMonitor.java | 80 +++++++++++++++++++ ...e.java => FallbackPersistenceBackend.java} | 30 +++++-- ...countingPersistenceBackendMonitorTest.java | 73 +++++++++++++++++ .../AccountingPersistenceBackendTest.java | 2 +- ...ccountingPersistenceConfigurationTest.java | 45 +++++++++++ 10 files changed, 318 insertions(+), 29 deletions(-) create mode 100644 src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitor.java rename src/main/java/org/gcube/accounting/persistence/{FallbackPersistence.java => FallbackPersistenceBackend.java} (50%) create mode 100644 src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitorTest.java diff --git a/src/main/java/org/gcube/accounting/datamodel/BasicUsageRecord.java b/src/main/java/org/gcube/accounting/datamodel/BasicUsageRecord.java index 307ffb8..8e03192 100644 --- a/src/main/java/org/gcube/accounting/datamodel/BasicUsageRecord.java +++ b/src/main/java/org/gcube/accounting/datamodel/BasicUsageRecord.java @@ -504,17 +504,19 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable { return 1; } + private static final String AGGREGATED_PREFIX = "Aggregated"; + @SuppressWarnings("unchecked") protected static Class getClass(String usageRecordName, boolean aggregated) throws ClassNotFoundException { Class clz = null; - Class utilityClass = org.gcube.accounting.datamodel.usagerecords.JobUsageRecord.class; + Class utilityClass = org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord.class; if(aggregated){ - utilityClass = org.gcube.accounting.aggregation.AggregatedJobUsageRecord.class; + utilityClass = org.gcube.accounting.aggregation.AggregatedServiceUsageRecord.class; } String classCanonicalName = utilityClass.getCanonicalName(); - classCanonicalName = classCanonicalName.replace(utilityClass.getSimpleName(), usageRecordName); + classCanonicalName = classCanonicalName.replace(utilityClass.getSimpleName().replace(AGGREGATED_PREFIX, ""), usageRecordName); try { clz = (Class) Class.forName(classCanonicalName); @@ -540,7 +542,11 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable { boolean aggregated = false; try { aggregated = (Boolean) usageRecordMap.get(AGGREGATED); - }catch(Exception e){} + }catch(Exception e){ + try{ + aggregated = Boolean.parseBoolean((String)usageRecordMap.get(AGGREGATED)); + } catch(Exception e1){} + } Class clz = getClass(className, aggregated); logger.debug("Trying to instantiate {}", clz.getClass().getSimpleName()); @@ -557,4 +563,57 @@ public abstract class BasicUsageRecord implements UsageRecord, Serializable { return usageRecord; } + /* + * IT DOES NOT WORK + * @SuppressWarnings("unchecked") + * public static Map getMapFromString(String serializedMap) throws IOException, ClassNotFoundException { + * ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedMap.getBytes()); + * ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + * return ((Map) objectInputStream.readObject()); + * } + */ + + + private final static String LINE_FREFIX = "{"; + private final static String LINE_SUFFIX = "}"; + private final static String KEY_VALUE_PAIR_SEPARATOR = ","; + private final static String KEY_VALUE_LINKER = "="; + + protected static Map getMapFromString(String serializedMap){ + /* Checking line sanity */ + if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){ + return null; + } + + /* Cleaning prefix and suffix to parse line */ + serializedMap = serializedMap.replace(LINE_FREFIX, ""); + serializedMap = serializedMap.replace(LINE_SUFFIX, ""); + + Map map = new HashMap(); + + String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR); + for (int i=0;i map = getMapFromString(serializedMap); + return getUsageRecord(map); + } } diff --git a/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidIntegerValidator.java b/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidIntegerValidator.java index b4fae9b..db7b864 100644 --- a/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidIntegerValidator.java +++ b/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidIntegerValidator.java @@ -19,7 +19,7 @@ public class ValidIntegerValidator implements FieldAction { if(value instanceof Integer){ return value; } - Integer integerObj = Integer.getInteger((String) value); + Integer integerObj = Integer.valueOf((String) value); if(integerObj!=null){ return integerObj; } diff --git a/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidLongValidator.java b/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidLongValidator.java index 2201ab5..c1c689e 100644 --- a/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidLongValidator.java +++ b/src/main/java/org/gcube/accounting/datamodel/validations/validators/ValidLongValidator.java @@ -20,7 +20,7 @@ public class ValidLongValidator implements FieldAction { return value; } try { - Long longObj = Long.getLong((String) value); + Long longObj = Long.valueOf((String) value); if(longObj!=null){ return longObj; } diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java index e542a68..b4de802 100644 --- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackend.java @@ -22,9 +22,11 @@ public abstract class AccountingPersistenceBackend { private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackend.class); - protected FallbackPersistence fallback; + protected FallbackPersistenceBackend fallbackPersistence; protected AggregationScheduler aggregationScheduler; + protected AccountingPersistenceBackendMonitor accountingPersistenceBackendMonitor; + /** * Pool for thread execution */ @@ -32,19 +34,30 @@ public abstract class AccountingPersistenceBackend { protected AccountingPersistenceBackend(){ this.pool = Executors.newCachedThreadPool(); + if(!(this instanceof FallbackPersistenceBackend)){ + this.accountingPersistenceBackendMonitor = new AccountingPersistenceBackendMonitor(this); + } } - protected AccountingPersistenceBackend(FallbackPersistence fallback, AggregationScheduler aggregationScheduler){ - this.fallback = fallback; + protected AccountingPersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){ + this(); + this.fallbackPersistence = fallback; this.aggregationScheduler = aggregationScheduler; - this.pool = Executors.newCachedThreadPool(); + + } + + /** + * @return the fallbackPersistence + */ + public FallbackPersistenceBackend getFallbackPersistence() { + return fallbackPersistence; } /** * @param fallback the fallback to set */ - protected void setFallback(FallbackPersistence fallback) { - this.fallback = fallback; + protected void setFallback(FallbackPersistenceBackend fallback) { + this.fallbackPersistence = fallback; } /** @@ -76,7 +89,7 @@ public abstract class AccountingPersistenceBackend { */ protected abstract void reallyAccount(UsageRecord usageRecords) throws Exception; - private void accountWithFallback(UsageRecord... usageRecords) { + protected void accountWithFallback(UsageRecord... usageRecords) { String persistenceName = this.getClass().getSimpleName(); logger.trace("Going to account {} using {} : {}", Arrays.toString(usageRecords), persistenceName, this); for(UsageRecord usageRecord : usageRecords){ @@ -85,11 +98,11 @@ public abstract class AccountingPersistenceBackend { this.reallyAccount(usageRecord); logger.debug("{} accounted succesfully from {}.", usageRecord.toString(), persistenceName); } catch (Exception e) { - String fallabackPersistenceName = fallback.getClass().getSimpleName(); + String fallabackPersistenceName = fallbackPersistence.getClass().getSimpleName(); try { logger.error("{} was not accounted succesfully from {}. Trying to use {}.", usageRecord.toString(), persistenceName, fallabackPersistenceName, e); - fallback.reallyAccount(usageRecord); + fallbackPersistence.reallyAccount(usageRecord); logger.debug("{} accounted succesfully from {}", usageRecord.toString(), fallabackPersistenceName); }catch(Exception ex){ diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java index 5e27451..1dad0be 100644 --- a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendFactory.java @@ -8,6 +8,7 @@ import java.util.Calendar; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregation.scheduler.AggregationScheduler; @@ -34,13 +35,17 @@ public abstract class AccountingPersistenceBackendFactory { public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min - protected static Map fallbackLastCheck; + static Map fallbackLastCheck; static { accountingPersistenceBackends = new HashMap(); fallbackLastCheck = new HashMap(); } + protected Set getActiveScopes(){ + return accountingPersistenceBackends.keySet(); + } + private static File file(File file) throws IllegalArgumentException { if(!file.isDirectory()){ file = file.getParentFile(); @@ -87,7 +92,7 @@ public abstract class AccountingPersistenceBackendFactory { return null; }; - protected static FallbackPersistence createFallback(String scope){ + protected static FallbackPersistenceBackend createFallback(String scope){ File fallbackFile = null; if(scope!=null){ ScopeBean bean = new ScopeBean(scope); @@ -97,7 +102,7 @@ public abstract class AccountingPersistenceBackendFactory { }else{ fallbackFile = new File(fallbackLocation, ACCOUTING_FALLBACK_FILENAME); } - FallbackPersistence fallbackPersistence = new FallbackPersistence(fallbackFile); + FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile); fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance()); return fallbackPersistence; } @@ -149,14 +154,14 @@ public abstract class AccountingPersistenceBackendFactory { if(persistence==null){ persistence = discoverAccountingPersistenceBackend(scope); if(persistence==null){ - logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistence.class.getSimpleName()); + logger.warn("Unable to find a usable {}. {} will be used.", AccountingPersistenceBackend.class.getSimpleName(), FallbackPersistenceBackend.class.getSimpleName()); long now = Calendar.getInstance().getTimeInMillis(); fallbackLastCheck.put(scope, now); persistence = createFallback(scope); } accountingPersistenceBackends.put(scope, persistence); - } else if(persistence instanceof FallbackPersistence && fallbackLastCheck.get(scope)!=null){ + } else if(persistence instanceof FallbackPersistenceBackend && fallbackLastCheck.get(scope)!=null){ // Trying to rediscover AccountingPersistenceBackend persistence = rediscoverAccountingPersistenceBackend(persistence, scope); diff --git a/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitor.java b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitor.java new file mode 100644 index 0000000..c4dc33d --- /dev/null +++ b/src/main/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitor.java @@ -0,0 +1,80 @@ +/** + * + */ +package org.gcube.accounting.persistence; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.gcube.accounting.datamodel.BasicUsageRecord; +import org.gcube.accounting.datamodel.UsageRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public class AccountingPersistenceBackendMonitor implements Runnable { + + private final static Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendMonitorTest.class); + + private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION"; + + protected final ScheduledExecutorService scheduler; + + protected final AccountingPersistenceBackend accountingPersistenceBackend; + + public AccountingPersistenceBackendMonitor(AccountingPersistenceBackend accountingPersistenceBackend){ + this.accountingPersistenceBackend = accountingPersistenceBackend; + this.scheduler = Executors.newScheduledThreadPool(1); + this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES); + } + + protected void elaborateFile(File elaborationFile){ + try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) { + for(String line; (line = br.readLine()) != null; ) { + try { + UsageRecord usageRecord = BasicUsageRecord.getUsageRecord(line); + accountingPersistenceBackend.accountWithFallback(usageRecord); + } catch(Exception e){ + logger.error("", e); + // TODO write line on file + } + } + } catch (FileNotFoundException e) { + logger.error("", e); + } catch (IOException e) { + logger.error("", e); + } + } + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + logger.debug("Trying to persist {}s which failed and were persisted using fallback", UsageRecord.class.getSimpleName()); + + FallbackPersistenceBackend fallbackPersistenceBackend = accountingPersistenceBackend.getFallbackPersistence(); + File file = fallbackPersistenceBackend.getAccountingFallbackFile(); + File elaborationFile = null; + synchronized (file) { + if(file.exists()){ + elaborationFile = new File(file.getAbsolutePath()+ELABORATION_FILE_SUFFIX); + file.renameTo(elaborationFile); + } + } + + if(elaborationFile!=null){ + elaborateFile(elaborationFile); + } + + } +} diff --git a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java b/src/main/java/org/gcube/accounting/persistence/FallbackPersistenceBackend.java similarity index 50% rename from src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java rename to src/main/java/org/gcube/accounting/persistence/FallbackPersistenceBackend.java index decc9e2..448a15d 100644 --- a/src/main/java/org/gcube/accounting/persistence/FallbackPersistence.java +++ b/src/main/java/org/gcube/accounting/persistence/FallbackPersistenceBackend.java @@ -15,11 +15,18 @@ import org.gcube.accounting.datamodel.UsageRecord; /** * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ */ -public class FallbackPersistence extends AccountingPersistenceBackend { +public class FallbackPersistenceBackend extends AccountingPersistenceBackend { private File accountingFallbackFile; - protected FallbackPersistence(File accountingFallbackFile) { + /** + * @return the accountingFallbackFile + */ + protected File getAccountingFallbackFile() { + return accountingFallbackFile; + } + + protected FallbackPersistenceBackend(File accountingFallbackFile) { super(null, AggregationScheduler.newInstance()); this.accountingFallbackFile = accountingFallbackFile; } @@ -34,15 +41,22 @@ public class FallbackPersistence extends AccountingPersistenceBackend { /** * {@inheritDoc} + * This method is synchronized on {@link File} used, so any actions which + * has to modify, rename or delete the file must be synchronized on this + * file. To retrieve it use {@link #getAccountingFallbackFile()} method. + * This is intended for internal library usage only so that is protected */ @Override protected void reallyAccount(UsageRecord usageRecord) throws Exception { - try(FileWriter fw = new FileWriter(accountingFallbackFile, true); - BufferedWriter bw = new BufferedWriter(fw); - PrintWriter out = new PrintWriter(bw)){ - out.println(usageRecord); - } catch( IOException e ){ - throw e; + synchronized (accountingFallbackFile) { + try(FileWriter fw = new FileWriter(accountingFallbackFile, true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)){ + out.println(usageRecord); + out.flush(); + } catch( IOException e ){ + throw e; + } } } diff --git a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitorTest.java b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitorTest.java new file mode 100644 index 0000000..f06bef7 --- /dev/null +++ b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendMonitorTest.java @@ -0,0 +1,73 @@ +/** + * + */ +package org.gcube.accounting.persistence; + +import java.util.concurrent.TimeUnit; + +import org.gcube.accounting.datamodel.SingleUsageRecord; +import org.gcube.accounting.datamodel.basetypes.TestUsageRecord; +import org.gcube.accounting.exception.InvalidValueException; +import org.gcube.accounting.testutility.StressTestUtility; +import org.gcube.accounting.testutility.TestOperation; +import org.gcube.common.scope.api.ScopeProvider; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public class AccountingPersistenceBackendMonitorTest { + + private static final Logger logger = LoggerFactory.getLogger(AccountingPersistenceBackendMonitor.class); + + public static final long timeout = 5000; + public static final TimeUnit timeUnit = TimeUnit.MILLISECONDS; + + @Test + public void parsingTest() throws Exception { + ScopeProvider.instance.set("/gcube/devsec"); + + AccountingPersistenceBackendFactory.setFallbackLocation(null); + final AccountingPersistence persistence = AccountingPersistence.getInstance(); + + StressTestUtility.stressTest(new TestOperation() { + + @Override + public void operate(int i) { + SingleUsageRecord usageRecord = null; + switch (i%3) { + case 0: + usageRecord = TestUsageRecord.createTestServiceUsageRecordAutomaticScope(); + break; + case 1: + usageRecord = TestUsageRecord.createTestStorageUsageRecordAutomaticScope(); + break; + case 2: + usageRecord = TestUsageRecord.createTestJobUsageRecordAutomaticScope(); + break; + } + try { + persistence.account(usageRecord); + } catch (InvalidValueException e) { + throw new RuntimeException(e); + } + } + }); + + logger.debug(" START -----------------------------------------------"); + logger.debug("Flushing the buffered records"); + persistence.flush(timeout, timeUnit); + logger.debug(" END -----------------------------------------------"); + + AccountingPersistenceBackend accountingPersistenceBackend = AccountingPersistenceBackendFactory.getPersistenceBackend(); + accountingPersistenceBackend.setFallback((FallbackPersistenceBackend) accountingPersistenceBackend); + AccountingPersistenceBackendMonitor accountingPersistenceBackendMonitor = new AccountingPersistenceBackendMonitor(accountingPersistenceBackend); + + accountingPersistenceBackendMonitor.run(); + + } + +} diff --git a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendTest.java b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendTest.java index dba0290..f320ce0 100644 --- a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendTest.java +++ b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceBackendTest.java @@ -42,7 +42,7 @@ public class AccountingPersistenceBackendTest { public void singleTestNoScope() throws Exception { AccountingPersistenceBackendFactory.setFallbackLocation(null); final AccountingPersistenceBackend persistence = AccountingPersistenceBackendFactory.getPersistenceBackend(); - Assert.assertTrue(persistence instanceof FallbackPersistence); + Assert.assertTrue(persistence instanceof FallbackPersistenceBackend); StressTestUtility.stressTest(new TestOperation() { @Override public void operate(int i) { diff --git a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceConfigurationTest.java b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceConfigurationTest.java index 9b8f730..119efb1 100644 --- a/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceConfigurationTest.java +++ b/src/test/java/org/gcube/accounting/persistence/AccountingPersistenceConfigurationTest.java @@ -3,6 +3,11 @@ */ package org.gcube.accounting.persistence; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; import java.io.StringWriter; import java.net.URL; import java.util.Arrays; @@ -264,4 +269,44 @@ public class AccountingPersistenceConfigurationTest { } + /* + @Test + public void testFiles() throws IOException{ + File file = new File("./aux.txt"); + logger.debug("file : {}", file.getAbsolutePath()); + + String aux = "AUX"; + try(FileWriter fw = new FileWriter(file, true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)){ + out.println(aux); + out.flush(); + } catch( IOException e ){ + throw e; + } + + + File same = file; + logger.debug("same : {}", same.getAbsolutePath()); + boolean moved = same.renameTo(new File(file.getAbsolutePath()+".RENAMED")); + logger.debug("Moved : {}", moved); + + logger.debug("AFTER RENAME"); + logger.debug("file : {}", file.getAbsolutePath()); + logger.debug("same : {}", same.getAbsolutePath()); + + + aux = "DONE"; + try(FileWriter fw = new FileWriter(file, true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)){ + out.println(aux); + out.flush(); + } catch( IOException e ){ + throw e; + } + + } + */ + }