commit aadec044b3b8ff97f1c8253804dd0de8a5eadad5 Author: Luca Frosini Date: Fri Dec 18 16:01:55 2015 +0000 refs #1746: Separate Accounting Model and generalize solution https://support.d4science.org/issues/1746 git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@121986 82a268e6-3cf1-43bd-a215-b396298e98cf diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..e43402f --- /dev/null +++ b/.classpath @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..56f8b28 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + document-store-lib + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..29abf99 --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,6 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding//src/test/java=UTF-8 +encoding//src/test/resources=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..ec4300d --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 +org.eclipse.jdt.core.compiler.compliance=1.7 +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.7 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..06a0bc5 --- /dev/null +++ b/pom.xml @@ -0,0 +1,61 @@ + + 4.0.0 + + org.gcube.tools + maven-parent + 1.0.0 + + + org.gcube.data.publishing + document-store-lib + 1.0.0-SNAPSHOT + Document Store Lib + + Allow to persist data in NoSQL Document Store Databases. + Discover Model dynamically. + Discover Database Backend connector dynamically. + Discover Configuration implementation dynamically. + Provide aggregation and fallback facilities. + + jar + + + UTF-8 + ${project.basedir}/distro + + + + scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId} + scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId} + https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId} + + + + + org.slf4j + slf4j-api + 1.7.5 + + + org.reflections + reflections + 0.9.10 + + + + + junit + junit + 4.11 + test + + + ch.qos.logback + logback-classic + 1.0.13 + test + + + + \ No newline at end of file diff --git a/src/main/java/org/gcube/documentstore/exception/InvalidValueException.java b/src/main/java/org/gcube/documentstore/exception/InvalidValueException.java new file mode 100644 index 0000000..5aabaa5 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/exception/InvalidValueException.java @@ -0,0 +1,26 @@ +package org.gcube.documentstore.exception; + +public class InvalidValueException extends Exception { + + /** + * Generated serial Version UID + */ + private static final long serialVersionUID = 4403699127526286772L; + + public InvalidValueException() { + super(); + } + + public InvalidValueException(String message) { + super(message); + } + + public InvalidValueException(Throwable cause) { + super(cause); + } + + public InvalidValueException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/gcube/documentstore/exception/NotAggregatableRecordsExceptions.java b/src/main/java/org/gcube/documentstore/exception/NotAggregatableRecordsExceptions.java new file mode 100644 index 0000000..8689373 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/exception/NotAggregatableRecordsExceptions.java @@ -0,0 +1,32 @@ +/** + * + */ +package org.gcube.documentstore.exception; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public class NotAggregatableRecordsExceptions extends Exception { + + /** + * Generated serial Version UID + */ + private static final long serialVersionUID = -1477792189431118048L; + + public NotAggregatableRecordsExceptions() { + super(); + } + + public NotAggregatableRecordsExceptions(String message) { + super(message); + } + + public NotAggregatableRecordsExceptions(Throwable cause) { + super(cause); + } + + public NotAggregatableRecordsExceptions(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java new file mode 100644 index 0000000..68898ba --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java @@ -0,0 +1,75 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.aggregation.AggregationScheduler; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public class FallbackPersistenceBackend extends PersistenceBackend { + + private File fallbackFile; + + /** + * @return the fallbackFile + */ + protected File getFallbackFile() { + return fallbackFile; + } + + protected FallbackPersistenceBackend(File fallbackFile) { + super(null, AggregationScheduler.newInstance()); + this.fallbackFile = fallbackFile; + } + + /** + * {@inheritDoc} + */ + @Override + public void prepareConnection(PersistenceBackendConfiguration configuration) { + // Nothing TO DO + } + + /** + * {@inheritDoc} + * This method is synchronized on {@link File} used, so any actions which + * has to modify, rename or delete the file must be synchronized on this + * file. To retrieve it use {@link #getFallbackFile()} method. + * This is intended for internal library usage only so that is protected + */ + @Override + protected void reallyAccount(Record record) throws Exception { + printLine(String.valueOf(record)); + } + + public void printLine(String line) throws Exception { + synchronized (fallbackFile) { + try(FileWriter fw = new FileWriter(fallbackFile, true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)){ + out.println(line); + out.flush(); + } catch( IOException e ){ + throw e; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws Exception { + // Nothing TO DO + } + +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java new file mode 100644 index 0000000..8e18fc8 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java @@ -0,0 +1,180 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.gcube.documentstore.exception.InvalidValueException; +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.aggregation.AggregationScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public abstract class PersistenceBackend { + + private static final Logger logger = LoggerFactory.getLogger(PersistenceBackend.class); + + protected FallbackPersistenceBackend fallbackPersistence; + protected AggregationScheduler aggregationScheduler; + + protected PersistenceBackendMonitor persistenceBackendMonitor; + + /** + * Pool for thread execution + */ + private ExecutorService pool; + + protected PersistenceBackend(){ + this.pool = Executors.newCachedThreadPool(); + if(!(this instanceof FallbackPersistenceBackend)){ + this.persistenceBackendMonitor = new PersistenceBackendMonitor(this); + } + } + + protected PersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){ + this(); + this.fallbackPersistence = fallback; + this.aggregationScheduler = aggregationScheduler; + + } + + /** + * @return the fallbackPersistence + */ + public FallbackPersistenceBackend getFallbackPersistence() { + return fallbackPersistence; + } + + /** + * @param fallback the fallback to set + */ + protected void setFallback(FallbackPersistenceBackend fallback) { + this.fallbackPersistence = fallback; + } + + /** + * @return the aggregationScheduler + */ + public AggregationScheduler getAggregationScheduler() { + return aggregationScheduler; + } + + /** + * @param aggregationScheduler the aggregationScheduler to set + */ + protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) { + this.aggregationScheduler = aggregationScheduler; + } + + /** + * Prepare the connection to persistence. + * This method must be used by implementation class to open + * the connection with the persistence storage, DB, file etc. + * @param configuration The configuration to create the connection + * @throws Exception if fails + */ + protected abstract void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception; + + /** + * This method contains the code to save the {@link Record} + * + */ + protected abstract void reallyAccount(Record record) throws Exception; + + protected void accountWithFallback(Record... records) { + String persistenceName = this.getClass().getSimpleName(); + logger.trace("Going to account {} using {} : {}", Arrays.toString(records), persistenceName, this); + for(Record record : records){ + try { + logger.trace("Going to account {} using {} : {}", record, persistenceName, this); + this.reallyAccount(record); + logger.debug("{} accounted succesfully from {}.", record.toString(), persistenceName); + } catch (Exception e) { + try { + String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName(); + logger.error("{} was not accounted succesfully from {}. Trying to use {}.", + record.toString(), persistenceName, fallabackPersistenceName, e); + fallbackPersistence.reallyAccount(record); + logger.debug("{} accounted succesfully from {}", + record.toString(), fallabackPersistenceName); + }catch(Exception ex){ + logger.error("{} was not accounted at all", record.toString(), e); + } + } + } + } + + + protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){ + try { + logger.debug("Received {} to account : {}", record.getClass().getSimpleName(), record); + if(validate){ + record.validate(); + logger.trace("{} {} valid", record.getClass().getSimpleName(), record); + } + if(aggregate){ + final PersistenceBackend persistence = this; + aggregationScheduler.aggregate(record, new PersistenceExecutor(){ + + @Override + public void persist(Record... records) throws Exception { + persistence.accountWithFallback(records); + } + + }); + }else{ + this.accountWithFallback(record); + } + + } catch (InvalidValueException e) { + logger.error("Error validating {}", record.getClass().getSimpleName(), e); + } catch (Exception e) { + logger.error("Error recording {}", record.getClass().getSimpleName(), e); + } + } + + /** + * Persist the {@link #UsageRecord}. + * The Record is validated first, then accounted, in a separated thread. + * So that the program can continue the execution. + * If the persistence fails the class write that the record in a local file + * so that the {@link #UsageRecord} can be recorder later. + * @param usageRecord the {@link #UsageRecord} to persist + * @throws InvalidValueException if the Record Validation Fails + */ + public void account(final Record record) throws InvalidValueException{ + Runnable runnable = new Runnable(){ + @Override + public void run(){ + accountValidateAggregate(record, true, true); + } + }; + pool.execute(runnable); + + } + + public void flush(long timeout, TimeUnit timeUnit) throws Exception { + pool.awaitTermination(timeout, timeUnit); + + final PersistenceBackend persistence = this; + aggregationScheduler.flush(new PersistenceExecutor(){ + + @Override + public void persist(Record... records) throws Exception { + persistence.accountWithFallback(records); + } + + }); + + } + + public abstract void close() throws Exception; + +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java new file mode 100644 index 0000000..2ea33f1 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendConfiguration.java @@ -0,0 +1,78 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public abstract class PersistenceBackendConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendConfiguration.class); + + protected Map properties; + + /** + * Used only for testing purpose + * @return + */ + protected static PersistenceBackendConfiguration getUnconfiguredInstance(){ + ServiceLoader serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class); + for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) { + Class configClass = foundConfiguration.getClass(); + String foundConfigurationClassName = configClass.getSimpleName(); + logger.debug("{} will be used.", foundConfigurationClassName); + + return foundConfiguration; + } + return null; + } + + public static PersistenceBackendConfiguration getInstance(Class clz){ + ServiceLoader serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class); + for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) { + try { + Class configClass = foundConfiguration.getClass(); + String foundConfigurationClassName = configClass.getSimpleName(); + logger.debug("Testing {}", foundConfigurationClassName); + + @SuppressWarnings("rawtypes") + Class[] configArgTypes = { Class.class }; + Constructor configurationConstructor = configClass.getDeclaredConstructor(configArgTypes); + Object[] configArguments = {clz}; + PersistenceBackendConfiguration configuration = configurationConstructor.newInstance(configArguments); + + logger.debug("{} will be used.", foundConfigurationClassName); + + return configuration; + } catch (Exception e) { + logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundConfiguration.getClass().getSimpleName()), e); + } + } + return null; + } + + protected PersistenceBackendConfiguration(){ + properties = new HashMap(); + } + + public PersistenceBackendConfiguration(Class clz){ + this(); + } + + public void addProperty(String key, String value) { + properties.put(key, value); + } + + public String getProperty(String key) throws Exception { + return properties.get(key); + } +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java new file mode 100644 index 0000000..454cd1b --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendFactory.java @@ -0,0 +1,232 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import java.io.File; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.TimeUnit; + +import org.gcube.documentstore.records.aggregation.AggregationScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public abstract class PersistenceBackendFactory { + + private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class); + + public final static String HOME_SYSTEM_PROPERTY = "user.home"; + + private static final String FALLBACK_FILENAME = "fallback.log"; + + private static String fallbackLocation; + + private static Map persistenceBackends; + private static Map fallbackLastCheck; + + public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min + + /** + * @return the fallbackLastCheck + */ + protected static Long getFallbackLastCheck(String scope) { + return fallbackLastCheck.get(scope); + } + + static { + persistenceBackends = new HashMap(); + fallbackLastCheck = new HashMap(); + } + + private static File file(File file) throws IllegalArgumentException { + if(!file.isDirectory()){ + file = file.getParentFile(); + } + // Create folder structure if not exist + if (!file.exists()) { + file.mkdirs(); + } + return file; + } + + public synchronized static void setFallbackLocation(String path){ + if(fallbackLocation == null){ + if(path==null){ + path = System.getProperty(HOME_SYSTEM_PROPERTY); + } + file(new File(path)); + fallbackLocation = path; + } + } + + protected static String sanitizeContext(String context){ + return context.replace("/", "_"); + } + + protected static FallbackPersistenceBackend createFallback(String context){ + logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context); + File fallbackFile = null; + if(context!=null){ + String sanitized = sanitizeContext(context); + fallbackFile = new File(fallbackLocation, String.format("%s.%s", sanitized, FALLBACK_FILENAME)); + }else{ + fallbackFile = new File(fallbackLocation, FALLBACK_FILENAME); + } + FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile); + fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance()); + return fallbackPersistence; + } + + protected static PersistenceBackend discoverPersistenceBackend(String context){ + logger.debug("Discovering {} for scope {}", + PersistenceBackend.class.getSimpleName(), context); + ServiceLoader serviceLoader = ServiceLoader.load(PersistenceBackend.class); + for (PersistenceBackend found : serviceLoader) { + Class foundClass = found.getClass(); + try { + String foundClassName = foundClass.getSimpleName(); + logger.debug("Testing {}", foundClassName); + + PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass); + if(configuration==null){ + continue; + } + found.prepareConnection(configuration); + + logger.debug("{} will be used.", foundClassName); + found.setAggregationScheduler(AggregationScheduler.newInstance()); + found.setFallback(createFallback(context)); + return found; + } catch (Exception e) { + logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e); + } + } + return null; + }; + + protected static PersistenceBackend rediscoverPersistenceBackend(PersistenceBackend actual, String context){ + Long now = Calendar.getInstance().getTimeInMillis(); + Long lastCheckTimestamp = fallbackLastCheck.get(context); + logger.debug("Last check for context {} was {}", context, lastCheckTimestamp); + boolean myTurn = false; + synchronized (persistenceBackends) { + if( (lastCheckTimestamp + FALLBACK_RETRY_TIME) <= now ){ + logger.debug("The {} for context {} is {}. Is time to rediscover if there is another possibility.", + PersistenceBackend.class.getSimpleName(), context, actual.getClass().getSimpleName()); + logger.trace("Renewing Last check Timestamp. The next one will be {}", now); + fallbackLastCheck.put(context, now); + myTurn=true; + logger.debug("I win. It is my turn to rediscover {} in context {}", + PersistenceBackend.class.getSimpleName(), context); + } + } + + if(myTurn){ + PersistenceBackend discoveredPersistenceBackend = discoverPersistenceBackend(context); + + synchronized (persistenceBackends) { + if(discoveredPersistenceBackend!=null){ + /* + * Passing the aggregator to the new PersistenceBackend + * so that the buffered records will be persisted with the + * new method + * + */ + discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler()); + + // Removing timestamp which is no more needed + fallbackLastCheck.remove(context); + persistenceBackends.put(context, discoveredPersistenceBackend); + + /* + * Not needed because close has no effect. Removed to + * prevent problem in cases of future changes. + * try { + * actual.close(); + * } catch (Exception e) { + * logger.error("Error closing {} for scope {} which has been substituted with {}.", + * actual.getClass().getSimpleName(), scope, + * discoveredPersistenceBackend.getClass().getSimpleName(), e); + * } + * + */ + return discoveredPersistenceBackend; + } + } + + } + + long nextCheck = (lastCheckTimestamp + FALLBACK_RETRY_TIME) - Calendar.getInstance().getTimeInMillis(); + float nextCheckInSec = nextCheck/1000; + logger.debug("The {} for context {} is going to be used is {}. Next retry in {} msec (about {} sec)", + PersistenceBackend.class.getSimpleName(), context, + actual.getClass().getSimpleName(), nextCheck, nextCheckInSec); + + return actual; + } + + public static PersistenceBackend getPersistenceBackend(String context) { + + if(context==null){ + logger.error("No Context available. FallbackPersistence will be used"); + return createFallback(null); + } + + PersistenceBackend persistence = null; + logger.debug("Going to synchronized block in getPersistenceBackend"); + synchronized (persistenceBackends) { + persistence = persistenceBackends.get(context); + logger.debug("{} {}", PersistenceBackend.class.getSimpleName(), persistence); + if(persistence==null){ + /* + * Setting FallbackPersistence and unlocking. + * This is used to avoid deadlock on IS node which try to use + * itself to query configuration. + */ + persistence = createFallback(context); + persistenceBackends.put(context, persistence); + long now = Calendar.getInstance().getTimeInMillis(); + /* The PersistenceBackend is still to be discovered + * setting the last check advanced in time to force rediscover. + */ + fallbackLastCheck.put(context, ((now - FALLBACK_RETRY_TIME) - 1)); + } + } + + if(persistence instanceof FallbackPersistenceBackend){ + persistence = rediscoverPersistenceBackend(persistence, context); + } + + return persistence; + } + + public static void flush(String context, long timeout, TimeUnit timeUnit){ + PersistenceBackend apb = persistenceBackends.get(context); + try { + logger.debug("Flushing records in context {}", context); + apb.flush(timeout, timeUnit); + }catch(Exception e){ + logger.error("Unable to flush records in context {} with {}", context, apb); + } + } + + /** + * @param timeout + * @param timeUnit + * @throws Exception + */ + public static void flushAll(long timeout, TimeUnit timeUnit) { + for(String context : persistenceBackends.keySet()){ + flush(context, timeout, timeUnit); + } + } + + +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java new file mode 100644 index 0000000..02d097a --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackendMonitor.java @@ -0,0 +1,95 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.RecordUtility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +public class PersistenceBackendMonitor implements Runnable { + + private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendMonitor.class); + + private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION"; + private final static String ELABORATION_FILE_NOT_DELETED_SUFFIX = ".ELABORATION.NOT-DELETED"; + + protected final ScheduledExecutorService scheduler; + + protected final PersistenceBackend persistenceBackend; + + public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){ + this.persistenceBackend = persistenceBackend; + this.scheduler = Executors.newScheduledThreadPool(1); + this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES); + } + + protected void elaborateFile(File elaborationFile){ + try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) { + for(String line; (line = br.readLine()) != null; ) { + try { + Record record = RecordUtility.getRecord(line); + persistenceBackend.accountWithFallback(record); + } catch(Exception e){ + logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e); + FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence(); + try { + fallbackPersistenceBackend.printLine(line); + } catch (Exception e1) { + logger.error("Was not possible Line {} will be lost", line, e1); + } + } + } + } catch (FileNotFoundException e) { + logger.error("", e); + } catch (IOException e) { + logger.error("", e); + } + } + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + logger.debug("Trying to persist {}s which failed and were persisted using fallback", Record.class.getSimpleName()); + + FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence(); + File file = fallbackPersistenceBackend.getFallbackFile(); + File elaborationFile = null; + synchronized (file) { + if(file.exists()){ + elaborationFile = new File(file.getAbsolutePath()+ELABORATION_FILE_SUFFIX); + file.renameTo(elaborationFile); + } + } + + if(elaborationFile!=null){ + synchronized (elaborationFile) { + elaborateFile(elaborationFile); + boolean deleted = elaborationFile.delete(); + if(!deleted){ + logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath()); + File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX); + elaborationFile.renameTo(elaborationFileNotDeleted); + } + } + + } + + } +} diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceExecutor.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceExecutor.java new file mode 100644 index 0000000..e78261e --- /dev/null +++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceExecutor.java @@ -0,0 +1,16 @@ +/** + * + */ +package org.gcube.documentstore.persistence; + +import org.gcube.documentstore.records.Record; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public interface PersistenceExecutor { + + public void persist(Record... records)throws Exception; + +} diff --git a/src/main/java/org/gcube/documentstore/records/AggregatedRecord.java b/src/main/java/org/gcube/documentstore/records/AggregatedRecord.java new file mode 100644 index 0000000..9b3bbf6 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/records/AggregatedRecord.java @@ -0,0 +1,60 @@ +/** + * + */ +package org.gcube.documentstore.records; + +import java.util.Calendar; + +import org.gcube.documentstore.exception.InvalidValueException; +import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions; + + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +@SuppressWarnings("rawtypes") +public interface AggregatedRecord extends Record { + + /** + * KEY : Indicate that this {@link Record} is an aggregation + */ + public static final String AGGREGATED = "aggregated"; + + /** + * KEY : Indicate The Number of {@link AggregatedRecord} + */ + public static final String OPERATION_COUNT = "operationCount"; + + /** + * KEY : Represent the left end of the time interval covered by this + * {@link AggregatedRecord}. The value will be recorded in UTC milliseconds + * from the epoch. + */ + public static final String START_TIME = "startTime"; + + /** + * KEY : Represent the right end of the time interval covered by this + * {@link AggregatedRecord}. The value will be recorded in UTC milliseconds + * from the epoch. + */ + public static final String END_TIME = "endTime"; + + public int getOperationCount(); + + public void setOperationCount(int operationCount) throws InvalidValueException; + + public Calendar getStartTime(); + + public void setStartTime(Calendar startTime) throws InvalidValueException; + + public Calendar getEndTime(); + + public void setEndTime(Calendar endTime) throws InvalidValueException; + + public A aggregate(A record) throws NotAggregatableRecordsExceptions; + + public A aggregate(R record) throws NotAggregatableRecordsExceptions; + + public Class getAggregable(); + +} diff --git a/src/main/java/org/gcube/documentstore/records/Record.java b/src/main/java/org/gcube/documentstore/records/Record.java new file mode 100644 index 0000000..e6335a7 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/records/Record.java @@ -0,0 +1,115 @@ +/** + * + */ +package org.gcube.documentstore.records; + +import java.io.Serializable; +import java.util.Calendar; +import java.util.Map; +import java.util.Set; + +import org.gcube.documentstore.exception.InvalidValueException; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public interface Record extends Comparable, Serializable { + + /** + * KEY : The unique identifier for the {@link Record} + * The ID SHOULD automatically created by the implementation class. + */ + public static final String ID = "id"; + + /** + * KEY : The instant when the {@link Record} was created. + * The value MUST be recorded in UTC milliseconds from the epoch. + */ + public static final String CREATION_TIME = "creationTime"; + + /** + * KEY : The Type of the represented {@link Record} + */ + public static final String RECORD_TYPE = "recordType"; + + /** + * @return a Set containing the keys of required fields + */ + public Set getRequiredFields(); + + /** + * Return the {@link Record} Type + * @return {@link Record} Type + */ + public String getRecordType(); + + /** + * Return the unique id for this {@link Record} + * @return {@link Record} Unique ID + */ + public String getId(); + + /** + * The ID SHOULD be automatically created by the implementation Class. + * Set the ID only if you really know what you are going to do. + * Set the unique id for this {@link Record} + * @param id Unique ID + * @throws InvalidValueException + */ + public void setId(String id) throws InvalidValueException; + + /** + * Return the instant when this {@link Record} was created. + * @return the creation time for this {@link Record} + */ + public Calendar getCreationTime(); + + /** + * The CreationTime is automatically created by the implementation Class. + * Set the CreationTime only if you really know what you are going to do. + * Set the instant when this {@link Record} was created. + * @param creationTime Creation Time + * @throws InvalidValueException + */ + public void setCreationTime(Calendar creationTime) throws InvalidValueException; + + /** + * Return all resource-specific properties. The returned Map is a copy of + * the internal representation. Any modification to the returned Map MUST + * not affect the object + * @return a Map containing the properties + */ + public Map> getResourceProperties(); + + /** + * Set all resource-specific properties, replacing existing ones + */ + public void setResourceProperties(Map> resourceSpecificProperties) throws InvalidValueException; + + /** + * Return the value of the given resource property. + * @param key the key of the requested property + * @return the value of the given resource property + */ + public Comparable getResourceProperty(String key); + + /** + * Set the value of the given resource property. + * If the key has the value of one of the predefined property, the value + * is validated. + * @param key the key of the requested property + * @param value the value of the given resource property + */ + public void setResourceProperty(String key, Comparable value) throws InvalidValueException; + + + /** + * Validate the Resource Record. + * The validation check if all the Required Field are set and has valid + * value. + * @throws InvalidValueException + */ + public void validate() throws InvalidValueException; + +} diff --git a/src/main/java/org/gcube/documentstore/records/RecordUtility.java b/src/main/java/org/gcube/documentstore/records/RecordUtility.java new file mode 100644 index 0000000..2d2777c --- /dev/null +++ b/src/main/java/org/gcube/documentstore/records/RecordUtility.java @@ -0,0 +1,205 @@ +/** + * + */ +package org.gcube.documentstore.records; + +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + */ +@SuppressWarnings({ "rawtypes" }) +public abstract class RecordUtility { + + private static Logger logger = LoggerFactory.getLogger(RecordUtility.class); + + private final static String LINE_FREFIX = "{"; + private final static String LINE_SUFFIX = "}"; + private final static String KEY_VALUE_PAIR_SEPARATOR = ","; + private final static String KEY_VALUE_LINKER = "="; + + protected static Map> recordClassesFound; + + protected static Map> aggregatedRecordClassesFound; + + /** + * @return the recordClassesFound + */ + public static Map> getRecordClassesFound() { + return recordClassesFound; + } + + /** + * @return the aggregatedRecordClassesFound + */ + public static Map> getAggregatedRecordClassesFound() { + return aggregatedRecordClassesFound; + } + + static { + recordClassesFound = new HashMap<>(); + aggregatedRecordClassesFound = new HashMap<>(); + + Reflections recordClassesReflections = new Reflections(); + Set> recordClasses = recordClassesReflections.getSubTypesOf(Record.class); + for(Class cls : recordClasses){ + if(Modifier.isAbstract(cls.getModifiers())){ + continue; + } + + String discoveredRecordType; + try { + Record record = cls.newInstance(); + if(record instanceof AggregatedRecord){ + continue; + } + discoveredRecordType = record.getRecordType(); + + if(!recordClassesFound.containsKey(discoveredRecordType)){ + recordClassesFound.put(discoveredRecordType, cls); + } + + } catch (InstantiationException | IllegalAccessException e) { + continue; + } + } + + aggregatedRecordClassesFound = new HashMap<>(); + Reflections aggregatedRecordReflections = new Reflections(); + Set> aggregatedRecordClasses = aggregatedRecordReflections.getSubTypesOf(AggregatedRecord.class); + for(Class cls : aggregatedRecordClasses){ + if(Modifier.isAbstract(cls.getModifiers())){ + continue; + } + + String discoveredRecordType; + try { + discoveredRecordType = cls.newInstance().getRecordType(); + if(!aggregatedRecordClassesFound.containsKey(discoveredRecordType)){ + aggregatedRecordClassesFound.put(discoveredRecordType, cls); + } + } catch (InstantiationException | IllegalAccessException e) { + logger.error("Unable to instantiate found {} class ({})", + AggregatedRecord.class.getSimpleName(), cls.getSimpleName(), e); + continue; + } + } + + } + + public static Class getAggregatedRecordClass(String recordType) throws ClassNotFoundException { + if(getAggregatedRecordClassesFound().containsKey(recordType)){ + return getAggregatedRecordClassesFound().get(recordType); + } + logger.error("Unable to find {} class for {}.", + AggregatedRecord.class.getSimpleName(), recordType); + throw new ClassNotFoundException(); + } + + public static Class getRecordClass(String recordType) throws ClassNotFoundException { + if(recordClassesFound.containsKey(recordType)){ + return recordClassesFound.get(recordType); + } + logger.error("Unable to find {} class for {}.", + Record.class.getSimpleName(), recordType); + throw new ClassNotFoundException(); + } + + protected static Class getClass(String recordType, boolean aggregated) throws ClassNotFoundException { + if(aggregated){ + return RecordUtility.getAggregatedRecordClass(recordType); + } + return getRecordClass(recordType); + } + + /* + * IT DOES NOT WORK + * @SuppressWarnings("unchecked") + * public static Map getMapFromString(String serializedMap) throws IOException, ClassNotFoundException { + * ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedMap.getBytes()); + * ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + * return ((Map) objectInputStream.readObject()); + * } + */ + + protected static Map> getMapFromString(String serializedMap){ + /* Checking line sanity */ + if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){ + return null; + } + + /* Cleaning prefix and suffix to parse line */ + serializedMap = serializedMap.replace(LINE_FREFIX, ""); + serializedMap = serializedMap.replace(LINE_SUFFIX, ""); + + Map> map = new HashMap>(); + + String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR); + for (int i=0;i value = keyValue[1].trim(); + map.put(key, value); + + } + + return map; + } + + /** + * Create a Record from a Map serialized using toString() + * @param serializedMap the String representation of Map + * @return the Record + * @throws Exception if deserialization fails + */ + public static Record getRecord(String serializedMap) throws Exception { + Map> map = getMapFromString(serializedMap); + return getRecord(map); + } + + /** + * Create a Record from a Map + * @param recordMap the Map + * @return the Record + * @throws Exception if deserialization fails + */ + public static Record getRecord(Map> recordMap) throws Exception { + + String className = (String) recordMap.get(Record.RECORD_TYPE); + boolean aggregated = false; + try { + aggregated = (Boolean) recordMap.get(AggregatedRecord.AGGREGATED); + }catch(Exception e){ + try{ + aggregated = Boolean.parseBoolean((String)recordMap.get(AggregatedRecord.AGGREGATED)); + } catch(Exception e1){} + } + + Class clz = getClass(className, aggregated); + logger.debug("Trying to instantiate {}", clz); + + Class[] usageRecordArgTypes = { Map.class }; + Constructor usageRecordConstructor = clz.getDeclaredConstructor(usageRecordArgTypes); + Object[] usageRecordArguments = {recordMap}; + + Record record = usageRecordConstructor.newInstance(usageRecordArguments); + + logger.debug("Created {} : {}", Record.class.getSimpleName(), record); + + return record; + } + + +} diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java new file mode 100644 index 0000000..dd5f6bc --- /dev/null +++ b/src/main/java/org/gcube/documentstore/records/aggregation/AggregationScheduler.java @@ -0,0 +1,169 @@ +package org.gcube.documentstore.records.aggregation; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions; +import org.gcube.documentstore.persistence.PersistenceExecutor; +import org.gcube.documentstore.records.AggregatedRecord; +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.RecordUtility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + */ +public abstract class AggregationScheduler { + + public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class); + + public static AggregationScheduler newInstance(){ + return new BufferAggregationScheduler(); + } + + protected int totalBufferedRecords; + protected Map> bufferedRecords; + + protected AggregationScheduler(){ + this.bufferedRecords = new HashMap>(); + this.totalBufferedRecords = 0; + } + + @SuppressWarnings("rawtypes") + protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception{ + String recordType = record.getRecordType(); + Class clz = RecordUtility.getAggregatedRecordClass(recordType); + Class[] argTypes = { record.getClass() }; + Constructor constructor = clz.getDeclaredConstructor(argTypes); + Object[] arguments = {record}; + return constructor.newInstance(arguments); + } + + @SuppressWarnings("rawtypes") + public static AggregatedRecord getAggregatedRecord(Record record) throws Exception { + AggregatedRecord aggregatedRecord; + if(record instanceof AggregatedRecord){ + // the record is already an aggregated version + aggregatedRecord = (AggregatedRecord) record; + }else{ + aggregatedRecord = instantiateAggregatedRecord(record); + } + + return aggregatedRecord; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected void madeAggregation(Record record){ + String recordType = record.getRecordType(); + + List records; + + if(this.bufferedRecords.containsKey(recordType)){ + records = this.bufferedRecords.get(recordType); + boolean found = false; + + for(Record bufferedRecord : records){ + if(!(bufferedRecord instanceof AggregatedRecord)){ + continue; + } + + try { + AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord; + logger.trace("Trying to use {} for aggregation.", bufferedAggregatedRecord); + + if(record instanceof AggregatedRecord){ + // TODO check compatibility using getAggrergable + bufferedAggregatedRecord.aggregate((AggregatedRecord) record); + }else{ + bufferedAggregatedRecord.aggregate((Record) record); + } + + logger.trace("Aggregated Record is {}", bufferedAggregatedRecord); + found = true; + break; + } catch(NotAggregatableRecordsExceptions e) { + logger.trace("{} is not usable for aggregation", bufferedRecord); + } + } + + if(!found){ + records.add(record); + totalBufferedRecords++; + return; + } + + + }else{ + records = new ArrayList(); + try { + records.add(getAggregatedRecord(record)); + } catch (Exception e) { + records.add(record); + } + totalBufferedRecords++; + this.bufferedRecords.put(recordType, records); + } + + } + + public void flush(PersistenceExecutor persistenceExecutor) throws Exception{ + aggregate(null, persistenceExecutor, true); + } + + protected abstract void schedulerSpecificClear(); + + protected void clear(){ + totalBufferedRecords=0; + bufferedRecords.clear(); + schedulerSpecificClear(); + } + + protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception { + if(record!=null){ + logger.trace("Trying to aggregate {}", record); + madeAggregation(record); + } + + if(isTimeToPersist() || forceFlush){ + Record[] recordToPersist = new Record[totalBufferedRecords]; + int i = 0; + Collection> values = bufferedRecords.values(); + for(List records : values){ + for(Record thisRecord: records){ + recordToPersist[i] = thisRecord; + i++; + } + } + + logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist)); + persistenceExecutor.persist(recordToPersist); + + clear(); + } + } + + + + /** + * Get an usage records and try to aggregate with other buffered + * Usage Record. + * @param singleRecord the Usage Record To Buffer + * @return true if is time to persist the buffered Usage Record + * @throws Exception if fails + */ + public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception { + logger.trace("Going to aggregate {}", record); + aggregate(record, persistenceExecutor, false); + } + + + protected abstract boolean isTimeToPersist(); + +} diff --git a/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java new file mode 100644 index 0000000..98aa7e2 --- /dev/null +++ b/src/main/java/org/gcube/documentstore/records/aggregation/BufferAggregationScheduler.java @@ -0,0 +1,67 @@ +/** + * + */ +package org.gcube.documentstore.records.aggregation; + +import java.util.Calendar; + +/** + * @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/ + * + * This class implements a Simple Buffer with timeout strategy. + * It buffer a predefined number of Records before invoking a persistence. + */ +public class BufferAggregationScheduler extends AggregationScheduler { + + /** + * Define the MAX number of Record to buffer. + * TODO Get from configuration + */ + protected final static int MAX_RECORDS_NUMBER = 15; + + /** + * The Max amount of time elapsed form last record before after that + * the buffered record are persisted even if + * TODO Get from configuration + */ + protected final static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*5; // 5 min + + protected boolean firstOfBuffer; + protected long firstBufferedTime; + + protected BufferAggregationScheduler(){ + super(); + this.firstOfBuffer = true; + } + + @Override + protected void schedulerSpecificClear(){ + firstOfBuffer = true; + } + + + /** + * {@inheritDoc} + */ + @Override + public boolean isTimeToPersist(){ + long now = Calendar.getInstance().getTimeInMillis(); + + if(firstOfBuffer){ + firstOfBuffer = false; + firstBufferedTime = now; + } + + if(totalBufferedRecords >= MAX_RECORDS_NUMBER){ + return true; + } + + if((now - firstBufferedTime) >= OLD_RECORD_MAX_TIME_ELAPSED){ + return true; + } + + return false; + } + + +}