refs #1746: Separate Accounting Model and generalize solution

https://support.d4science.org/issues/1746

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@121986 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-12-18 16:01:55 +00:00
commit aadec044b3
19 changed files with 1485 additions and 0 deletions

36
.classpath Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

23
.project Normal file
View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>document-store-lib</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

View File

@ -0,0 +1,6 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8

View File

@ -0,0 +1,5 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.7

View File

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

61
pom.xml Normal file
View File

@ -0,0 +1,61 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.gcube.tools</groupId>
<artifactId>maven-parent</artifactId>
<version>1.0.0</version>
</parent>
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Document Store Lib</name>
<description>
Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically.
Discover Database Backend connector dynamically.
Discover Configuration implementation dynamically.
Provide aggregation and fallback facilities.
</description>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<distroDirectory>${project.basedir}/distro</distroDirectory>
</properties>
<scm>
<connection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}</connection>
<developerConnection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}</developerConnection>
<url>https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}</url>
</scm>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
<!-- Test Dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,26 @@
package org.gcube.documentstore.exception;
public class InvalidValueException extends Exception {
/**
* Generated serial Version UID
*/
private static final long serialVersionUID = 4403699127526286772L;
public InvalidValueException() {
super();
}
public InvalidValueException(String message) {
super(message);
}
public InvalidValueException(Throwable cause) {
super(cause);
}
public InvalidValueException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,32 @@
/**
*
*/
package org.gcube.documentstore.exception;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public class NotAggregatableRecordsExceptions extends Exception {
/**
* Generated serial Version UID
*/
private static final long serialVersionUID = -1477792189431118048L;
public NotAggregatableRecordsExceptions() {
super();
}
public NotAggregatableRecordsExceptions(String message) {
super(message);
}
public NotAggregatableRecordsExceptions(Throwable cause) {
super(cause);
}
public NotAggregatableRecordsExceptions(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,75 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class FallbackPersistenceBackend extends PersistenceBackend {
private File fallbackFile;
/**
* @return the fallbackFile
*/
protected File getFallbackFile() {
return fallbackFile;
}
protected FallbackPersistenceBackend(File fallbackFile) {
super(null, AggregationScheduler.newInstance());
this.fallbackFile = fallbackFile;
}
/**
* {@inheritDoc}
*/
@Override
public void prepareConnection(PersistenceBackendConfiguration configuration) {
// Nothing TO DO
}
/**
* {@inheritDoc}
* This method is synchronized on {@link File} used, so any actions which
* has to modify, rename or delete the file must be synchronized on this
* file. To retrieve it use {@link #getFallbackFile()} method.
* This is intended for internal library usage only so that is protected
*/
@Override
protected void reallyAccount(Record record) throws Exception {
printLine(String.valueOf(record));
}
public void printLine(String line) throws Exception {
synchronized (fallbackFile) {
try(FileWriter fw = new FileWriter(fallbackFile, true);
BufferedWriter bw = new BufferedWriter(fw);
PrintWriter out = new PrintWriter(bw)){
out.println(line);
out.flush();
} catch( IOException e ){
throw e;
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws Exception {
// Nothing TO DO
}
}

View File

@ -0,0 +1,180 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public abstract class PersistenceBackend {
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackend.class);
protected FallbackPersistenceBackend fallbackPersistence;
protected AggregationScheduler aggregationScheduler;
protected PersistenceBackendMonitor persistenceBackendMonitor;
/**
* Pool for thread execution
*/
private ExecutorService pool;
protected PersistenceBackend(){
this.pool = Executors.newCachedThreadPool();
if(!(this instanceof FallbackPersistenceBackend)){
this.persistenceBackendMonitor = new PersistenceBackendMonitor(this);
}
}
protected PersistenceBackend(FallbackPersistenceBackend fallback, AggregationScheduler aggregationScheduler){
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = aggregationScheduler;
}
/**
* @return the fallbackPersistence
*/
public FallbackPersistenceBackend getFallbackPersistence() {
return fallbackPersistence;
}
/**
* @param fallback the fallback to set
*/
protected void setFallback(FallbackPersistenceBackend fallback) {
this.fallbackPersistence = fallback;
}
/**
* @return the aggregationScheduler
*/
public AggregationScheduler getAggregationScheduler() {
return aggregationScheduler;
}
/**
* @param aggregationScheduler the aggregationScheduler to set
*/
protected void setAggregationScheduler(AggregationScheduler aggregationScheduler) {
this.aggregationScheduler = aggregationScheduler;
}
/**
* Prepare the connection to persistence.
* This method must be used by implementation class to open
* the connection with the persistence storage, DB, file etc.
* @param configuration The configuration to create the connection
* @throws Exception if fails
*/
protected abstract void prepareConnection(PersistenceBackendConfiguration configuration) throws Exception;
/**
* This method contains the code to save the {@link Record}
*
*/
protected abstract void reallyAccount(Record record) throws Exception;
protected void accountWithFallback(Record... records) {
String persistenceName = this.getClass().getSimpleName();
logger.trace("Going to account {} using {} : {}", Arrays.toString(records), persistenceName, this);
for(Record record : records){
try {
logger.trace("Going to account {} using {} : {}", record, persistenceName, this);
this.reallyAccount(record);
logger.debug("{} accounted succesfully from {}.", record.toString(), persistenceName);
} catch (Exception e) {
try {
String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
logger.error("{} was not accounted succesfully from {}. Trying to use {}.",
record.toString(), persistenceName, fallabackPersistenceName, e);
fallbackPersistence.reallyAccount(record);
logger.debug("{} accounted succesfully from {}",
record.toString(), fallabackPersistenceName);
}catch(Exception ex){
logger.error("{} was not accounted at all", record.toString(), e);
}
}
}
}
protected void accountValidateAggregate(final Record record, boolean validate, boolean aggregate){
try {
logger.debug("Received {} to account : {}", record.getClass().getSimpleName(), record);
if(validate){
record.validate();
logger.trace("{} {} valid", record.getClass().getSimpleName(), record);
}
if(aggregate){
final PersistenceBackend persistence = this;
aggregationScheduler.aggregate(record, new PersistenceExecutor(){
@Override
public void persist(Record... records) throws Exception {
persistence.accountWithFallback(records);
}
});
}else{
this.accountWithFallback(record);
}
} catch (InvalidValueException e) {
logger.error("Error validating {}", record.getClass().getSimpleName(), e);
} catch (Exception e) {
logger.error("Error recording {}", record.getClass().getSimpleName(), e);
}
}
/**
* Persist the {@link #UsageRecord}.
* The Record is validated first, then accounted, in a separated thread.
* So that the program can continue the execution.
* If the persistence fails the class write that the record in a local file
* so that the {@link #UsageRecord} can be recorder later.
* @param usageRecord the {@link #UsageRecord} to persist
* @throws InvalidValueException if the Record Validation Fails
*/
public void account(final Record record) throws InvalidValueException{
Runnable runnable = new Runnable(){
@Override
public void run(){
accountValidateAggregate(record, true, true);
}
};
pool.execute(runnable);
}
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
pool.awaitTermination(timeout, timeUnit);
final PersistenceBackend persistence = this;
aggregationScheduler.flush(new PersistenceExecutor(){
@Override
public void persist(Record... records) throws Exception {
persistence.accountWithFallback(records);
}
});
}
public abstract void close() throws Exception;
}

View File

@ -0,0 +1,78 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public abstract class PersistenceBackendConfiguration {
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendConfiguration.class);
protected Map<String,String> properties;
/**
* Used only for testing purpose
* @return
*/
protected static PersistenceBackendConfiguration getUnconfiguredInstance(){
ServiceLoader<? extends PersistenceBackendConfiguration> serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class);
for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) {
Class<? extends PersistenceBackendConfiguration> configClass = foundConfiguration.getClass();
String foundConfigurationClassName = configClass.getSimpleName();
logger.debug("{} will be used.", foundConfigurationClassName);
return foundConfiguration;
}
return null;
}
public static PersistenceBackendConfiguration getInstance(Class<? extends PersistenceBackend> clz){
ServiceLoader<? extends PersistenceBackendConfiguration> serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class);
for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) {
try {
Class<? extends PersistenceBackendConfiguration> configClass = foundConfiguration.getClass();
String foundConfigurationClassName = configClass.getSimpleName();
logger.debug("Testing {}", foundConfigurationClassName);
@SuppressWarnings("rawtypes")
Class[] configArgTypes = { Class.class };
Constructor<? extends PersistenceBackendConfiguration> configurationConstructor = configClass.getDeclaredConstructor(configArgTypes);
Object[] configArguments = {clz};
PersistenceBackendConfiguration configuration = configurationConstructor.newInstance(configArguments);
logger.debug("{} will be used.", foundConfigurationClassName);
return configuration;
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundConfiguration.getClass().getSimpleName()), e);
}
}
return null;
}
protected PersistenceBackendConfiguration(){
properties = new HashMap<String, String>();
}
public PersistenceBackendConfiguration(Class<? extends PersistenceBackend> clz){
this();
}
public void addProperty(String key, String value) {
properties.put(key, value);
}
public String getProperty(String key) throws Exception {
return properties.get(key);
}
}

View File

@ -0,0 +1,232 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.io.File;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.aggregation.AggregationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class PersistenceBackendFactory {
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackendFactory.class);
public final static String HOME_SYSTEM_PROPERTY = "user.home";
private static final String FALLBACK_FILENAME = "fallback.log";
private static String fallbackLocation;
private static Map<String, PersistenceBackend> persistenceBackends;
private static Map<String, Long> fallbackLastCheck;
public static final long FALLBACK_RETRY_TIME = 1000*60*10; // 10 min
/**
* @return the fallbackLastCheck
*/
protected static Long getFallbackLastCheck(String scope) {
return fallbackLastCheck.get(scope);
}
static {
persistenceBackends = new HashMap<String, PersistenceBackend>();
fallbackLastCheck = new HashMap<String, Long>();
}
private static File file(File file) throws IllegalArgumentException {
if(!file.isDirectory()){
file = file.getParentFile();
}
// Create folder structure if not exist
if (!file.exists()) {
file.mkdirs();
}
return file;
}
public synchronized static void setFallbackLocation(String path){
if(fallbackLocation == null){
if(path==null){
path = System.getProperty(HOME_SYSTEM_PROPERTY);
}
file(new File(path));
fallbackLocation = path;
}
}
protected static String sanitizeContext(String context){
return context.replace("/", "_");
}
protected static FallbackPersistenceBackend createFallback(String context){
logger.debug("Creating {} for context {}", FallbackPersistenceBackend.class.getSimpleName(), context);
File fallbackFile = null;
if(context!=null){
String sanitized = sanitizeContext(context);
fallbackFile = new File(fallbackLocation, String.format("%s.%s", sanitized, FALLBACK_FILENAME));
}else{
fallbackFile = new File(fallbackLocation, FALLBACK_FILENAME);
}
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance());
return fallbackPersistence;
}
protected static PersistenceBackend discoverPersistenceBackend(String context){
logger.debug("Discovering {} for scope {}",
PersistenceBackend.class.getSimpleName(), context);
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
for (PersistenceBackend found : serviceLoader) {
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.debug("Testing {}", foundClassName);
PersistenceBackendConfiguration configuration = PersistenceBackendConfiguration.getInstance(foundClass);
if(configuration==null){
continue;
}
found.prepareConnection(configuration);
logger.debug("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance());
found.setFallback(createFallback(context));
return found;
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e);
}
}
return null;
};
protected static PersistenceBackend rediscoverPersistenceBackend(PersistenceBackend actual, String context){
Long now = Calendar.getInstance().getTimeInMillis();
Long lastCheckTimestamp = fallbackLastCheck.get(context);
logger.debug("Last check for context {} was {}", context, lastCheckTimestamp);
boolean myTurn = false;
synchronized (persistenceBackends) {
if( (lastCheckTimestamp + FALLBACK_RETRY_TIME) <= now ){
logger.debug("The {} for context {} is {}. Is time to rediscover if there is another possibility.",
PersistenceBackend.class.getSimpleName(), context, actual.getClass().getSimpleName());
logger.trace("Renewing Last check Timestamp. The next one will be {}", now);
fallbackLastCheck.put(context, now);
myTurn=true;
logger.debug("I win. It is my turn to rediscover {} in context {}",
PersistenceBackend.class.getSimpleName(), context);
}
}
if(myTurn){
PersistenceBackend discoveredPersistenceBackend = discoverPersistenceBackend(context);
synchronized (persistenceBackends) {
if(discoveredPersistenceBackend!=null){
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
*/
discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
// Removing timestamp which is no more needed
fallbackLastCheck.remove(context);
persistenceBackends.put(context, discoveredPersistenceBackend);
/*
* Not needed because close has no effect. Removed to
* prevent problem in cases of future changes.
* try {
* actual.close();
* } catch (Exception e) {
* logger.error("Error closing {} for scope {} which has been substituted with {}.",
* actual.getClass().getSimpleName(), scope,
* discoveredPersistenceBackend.getClass().getSimpleName(), e);
* }
*
*/
return discoveredPersistenceBackend;
}
}
}
long nextCheck = (lastCheckTimestamp + FALLBACK_RETRY_TIME) - Calendar.getInstance().getTimeInMillis();
float nextCheckInSec = nextCheck/1000;
logger.debug("The {} for context {} is going to be used is {}. Next retry in {} msec (about {} sec)",
PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName(), nextCheck, nextCheckInSec);
return actual;
}
public static PersistenceBackend getPersistenceBackend(String context) {
if(context==null){
logger.error("No Context available. FallbackPersistence will be used");
return createFallback(null);
}
PersistenceBackend persistence = null;
logger.debug("Going to synchronized block in getPersistenceBackend");
synchronized (persistenceBackends) {
persistence = persistenceBackends.get(context);
logger.debug("{} {}", PersistenceBackend.class.getSimpleName(), persistence);
if(persistence==null){
/*
* Setting FallbackPersistence and unlocking.
* This is used to avoid deadlock on IS node which try to use
* itself to query configuration.
*/
persistence = createFallback(context);
persistenceBackends.put(context, persistence);
long now = Calendar.getInstance().getTimeInMillis();
/* The PersistenceBackend is still to be discovered
* setting the last check advanced in time to force rediscover.
*/
fallbackLastCheck.put(context, ((now - FALLBACK_RETRY_TIME) - 1));
}
}
if(persistence instanceof FallbackPersistenceBackend){
persistence = rediscoverPersistenceBackend(persistence, context);
}
return persistence;
}
public static void flush(String context, long timeout, TimeUnit timeUnit){
PersistenceBackend apb = persistenceBackends.get(context);
try {
logger.debug("Flushing records in context {}", context);
apb.flush(timeout, timeUnit);
}catch(Exception e){
logger.error("Unable to flush records in context {} with {}", context, apb);
}
}
/**
* @param timeout
* @param timeUnit
* @throws Exception
*/
public static void flushAll(long timeout, TimeUnit timeUnit) {
for(String context : persistenceBackends.keySet()){
flush(context, timeout, timeUnit);
}
}
}

View File

@ -0,0 +1,95 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class PersistenceBackendMonitor implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendMonitor.class);
private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION";
private final static String ELABORATION_FILE_NOT_DELETED_SUFFIX = ".ELABORATION.NOT-DELETED";
protected final ScheduledExecutorService scheduler;
protected final PersistenceBackend persistenceBackend;
public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){
this.persistenceBackend = persistenceBackend;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
}
protected void elaborateFile(File elaborationFile){
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
Record record = RecordUtility.getRecord(line);
persistenceBackend.accountWithFallback(record);
} catch(Exception e){
logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e);
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
try {
fallbackPersistenceBackend.printLine(line);
} catch (Exception e1) {
logger.error("Was not possible Line {} will be lost", line, e1);
}
}
}
} catch (FileNotFoundException e) {
logger.error("", e);
} catch (IOException e) {
logger.error("", e);
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
logger.debug("Trying to persist {}s which failed and were persisted using fallback", Record.class.getSimpleName());
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
File file = fallbackPersistenceBackend.getFallbackFile();
File elaborationFile = null;
synchronized (file) {
if(file.exists()){
elaborationFile = new File(file.getAbsolutePath()+ELABORATION_FILE_SUFFIX);
file.renameTo(elaborationFile);
}
}
if(elaborationFile!=null){
synchronized (elaborationFile) {
elaborateFile(elaborationFile);
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
}
}
}
}
}

View File

@ -0,0 +1,16 @@
/**
*
*/
package org.gcube.documentstore.persistence;
import org.gcube.documentstore.records.Record;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface PersistenceExecutor {
public void persist(Record... records)throws Exception;
}

View File

@ -0,0 +1,60 @@
/**
*
*/
package org.gcube.documentstore.records;
import java.util.Calendar;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
@SuppressWarnings("rawtypes")
public interface AggregatedRecord<A extends AggregatedRecord, R extends Record> extends Record {
/**
* KEY : Indicate that this {@link Record} is an aggregation
*/
public static final String AGGREGATED = "aggregated";
/**
* KEY : Indicate The Number of {@link AggregatedRecord}
*/
public static final String OPERATION_COUNT = "operationCount";
/**
* KEY : Represent the left end of the time interval covered by this
* {@link AggregatedRecord}. The value will be recorded in UTC milliseconds
* from the epoch.
*/
public static final String START_TIME = "startTime";
/**
* KEY : Represent the right end of the time interval covered by this
* {@link AggregatedRecord}. The value will be recorded in UTC milliseconds
* from the epoch.
*/
public static final String END_TIME = "endTime";
public int getOperationCount();
public void setOperationCount(int operationCount) throws InvalidValueException;
public Calendar getStartTime();
public void setStartTime(Calendar startTime) throws InvalidValueException;
public Calendar getEndTime();
public void setEndTime(Calendar endTime) throws InvalidValueException;
public A aggregate(A record) throws NotAggregatableRecordsExceptions;
public A aggregate(R record) throws NotAggregatableRecordsExceptions;
public Class<R> getAggregable();
}

View File

@ -0,0 +1,115 @@
/**
*
*/
package org.gcube.documentstore.records;
import java.io.Serializable;
import java.util.Calendar;
import java.util.Map;
import java.util.Set;
import org.gcube.documentstore.exception.InvalidValueException;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public interface Record extends Comparable<Record>, Serializable {
/**
* KEY : The unique identifier for the {@link Record}
* The ID SHOULD automatically created by the implementation class.
*/
public static final String ID = "id";
/**
* KEY : The instant when the {@link Record} was created.
* The value MUST be recorded in UTC milliseconds from the epoch.
*/
public static final String CREATION_TIME = "creationTime";
/**
* KEY : The Type of the represented {@link Record}
*/
public static final String RECORD_TYPE = "recordType";
/**
* @return a Set containing the keys of required fields
*/
public Set<String> getRequiredFields();
/**
* Return the {@link Record} Type
* @return {@link Record} Type
*/
public String getRecordType();
/**
* Return the unique id for this {@link Record}
* @return {@link Record} Unique ID
*/
public String getId();
/**
* The ID SHOULD be automatically created by the implementation Class.
* Set the ID only if you really know what you are going to do.
* Set the unique id for this {@link Record}
* @param id Unique ID
* @throws InvalidValueException
*/
public void setId(String id) throws InvalidValueException;
/**
* Return the instant when this {@link Record} was created.
* @return the creation time for this {@link Record}
*/
public Calendar getCreationTime();
/**
* The CreationTime is automatically created by the implementation Class.
* Set the CreationTime only if you really know what you are going to do.
* Set the instant when this {@link Record} was created.
* @param creationTime Creation Time
* @throws InvalidValueException
*/
public void setCreationTime(Calendar creationTime) throws InvalidValueException;
/**
* Return all resource-specific properties. The returned Map is a copy of
* the internal representation. Any modification to the returned Map MUST
* not affect the object
* @return a Map containing the properties
*/
public Map<String, Comparable<? extends Serializable>> getResourceProperties();
/**
* Set all resource-specific properties, replacing existing ones
*/
public void setResourceProperties(Map<String, Comparable<? extends Serializable>> resourceSpecificProperties) throws InvalidValueException;
/**
* Return the value of the given resource property.
* @param key the key of the requested property
* @return the value of the given resource property
*/
public Comparable<? extends Serializable> getResourceProperty(String key);
/**
* Set the value of the given resource property.
* If the key has the value of one of the predefined property, the value
* is validated.
* @param key the key of the requested property
* @param value the value of the given resource property
*/
public void setResourceProperty(String key, Comparable<? extends Serializable> value) throws InvalidValueException;
/**
* Validate the Resource Record.
* The validation check if all the Required Field are set and has valid
* value.
* @throws InvalidValueException
*/
public void validate() throws InvalidValueException;
}

View File

@ -0,0 +1,205 @@
/**
*
*/
package org.gcube.documentstore.records;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
@SuppressWarnings({ "rawtypes" })
public abstract class RecordUtility {
private static Logger logger = LoggerFactory.getLogger(RecordUtility.class);
private final static String LINE_FREFIX = "{";
private final static String LINE_SUFFIX = "}";
private final static String KEY_VALUE_PAIR_SEPARATOR = ",";
private final static String KEY_VALUE_LINKER = "=";
protected static Map<String, Class<? extends Record>> recordClassesFound;
protected static Map<String, Class<? extends AggregatedRecord>> aggregatedRecordClassesFound;
/**
* @return the recordClassesFound
*/
public static Map<String, Class<? extends Record>> getRecordClassesFound() {
return recordClassesFound;
}
/**
* @return the aggregatedRecordClassesFound
*/
public static Map<String, Class<? extends AggregatedRecord>> getAggregatedRecordClassesFound() {
return aggregatedRecordClassesFound;
}
static {
recordClassesFound = new HashMap<>();
aggregatedRecordClassesFound = new HashMap<>();
Reflections recordClassesReflections = new Reflections();
Set<Class<? extends Record>> recordClasses = recordClassesReflections.getSubTypesOf(Record.class);
for(Class<? extends Record> cls : recordClasses){
if(Modifier.isAbstract(cls.getModifiers())){
continue;
}
String discoveredRecordType;
try {
Record record = cls.newInstance();
if(record instanceof AggregatedRecord){
continue;
}
discoveredRecordType = record.getRecordType();
if(!recordClassesFound.containsKey(discoveredRecordType)){
recordClassesFound.put(discoveredRecordType, cls);
}
} catch (InstantiationException | IllegalAccessException e) {
continue;
}
}
aggregatedRecordClassesFound = new HashMap<>();
Reflections aggregatedRecordReflections = new Reflections();
Set<Class<? extends AggregatedRecord>> aggregatedRecordClasses = aggregatedRecordReflections.getSubTypesOf(AggregatedRecord.class);
for(Class<? extends AggregatedRecord> cls : aggregatedRecordClasses){
if(Modifier.isAbstract(cls.getModifiers())){
continue;
}
String discoveredRecordType;
try {
discoveredRecordType = cls.newInstance().getRecordType();
if(!aggregatedRecordClassesFound.containsKey(discoveredRecordType)){
aggregatedRecordClassesFound.put(discoveredRecordType, cls);
}
} catch (InstantiationException | IllegalAccessException e) {
logger.error("Unable to instantiate found {} class ({})",
AggregatedRecord.class.getSimpleName(), cls.getSimpleName(), e);
continue;
}
}
}
public static Class<? extends AggregatedRecord> getAggregatedRecordClass(String recordType) throws ClassNotFoundException {
if(getAggregatedRecordClassesFound().containsKey(recordType)){
return getAggregatedRecordClassesFound().get(recordType);
}
logger.error("Unable to find {} class for {}.",
AggregatedRecord.class.getSimpleName(), recordType);
throw new ClassNotFoundException();
}
public static Class<? extends Record> getRecordClass(String recordType) throws ClassNotFoundException {
if(recordClassesFound.containsKey(recordType)){
return recordClassesFound.get(recordType);
}
logger.error("Unable to find {} class for {}.",
Record.class.getSimpleName(), recordType);
throw new ClassNotFoundException();
}
protected static Class<? extends Record> getClass(String recordType, boolean aggregated) throws ClassNotFoundException {
if(aggregated){
return RecordUtility.getAggregatedRecordClass(recordType);
}
return getRecordClass(recordType);
}
/*
* IT DOES NOT WORK
* @SuppressWarnings("unchecked")
* public static Map<String,Serializable> getMapFromString(String serializedMap) throws IOException, ClassNotFoundException {
* ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedMap.getBytes());
* ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
* return ((Map<String,Serializable>) objectInputStream.readObject());
* }
*/
protected static Map<String, Comparable<? extends Serializable>> getMapFromString(String serializedMap){
/* Checking line sanity */
if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
return null;
}
/* Cleaning prefix and suffix to parse line */
serializedMap = serializedMap.replace(LINE_FREFIX, "");
serializedMap = serializedMap.replace(LINE_SUFFIX, "");
Map<String, Comparable<? extends Serializable>> map = new HashMap<String, Comparable<? extends Serializable>>();
String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
for (int i=0;i<pairs.length;i++) {
String pair = pairs[i];
pair.trim();
String[] keyValue = pair.split(KEY_VALUE_LINKER);
String key = keyValue[0].trim();
Comparable<? extends Serializable> value = keyValue[1].trim();
map.put(key, value);
}
return map;
}
/**
* Create a Record from a Map serialized using toString()
* @param serializedMap the String representation of Map
* @return the Record
* @throws Exception if deserialization fails
*/
public static Record getRecord(String serializedMap) throws Exception {
Map<String,Comparable<? extends Serializable>> map = getMapFromString(serializedMap);
return getRecord(map);
}
/**
* Create a Record from a Map
* @param recordMap the Map
* @return the Record
* @throws Exception if deserialization fails
*/
public static Record getRecord(Map<String, Comparable<? extends Serializable>> recordMap) throws Exception {
String className = (String) recordMap.get(Record.RECORD_TYPE);
boolean aggregated = false;
try {
aggregated = (Boolean) recordMap.get(AggregatedRecord.AGGREGATED);
}catch(Exception e){
try{
aggregated = Boolean.parseBoolean((String)recordMap.get(AggregatedRecord.AGGREGATED));
} catch(Exception e1){}
}
Class<? extends Record> clz = getClass(className, aggregated);
logger.debug("Trying to instantiate {}", clz);
Class[] usageRecordArgTypes = { Map.class };
Constructor<? extends Record> usageRecordConstructor = clz.getDeclaredConstructor(usageRecordArgTypes);
Object[] usageRecordArguments = {recordMap};
Record record = usageRecordConstructor.newInstance(usageRecordArguments);
logger.debug("Created {} : {}", Record.class.getSimpleName(), record);
return record;
}
}

View File

@ -0,0 +1,169 @@
package org.gcube.documentstore.records.aggregation;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
*/
public abstract class AggregationScheduler {
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static AggregationScheduler newInstance(){
return new BufferAggregationScheduler();
}
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected AggregationScheduler(){
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
}
@SuppressWarnings("rawtypes")
protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception{
String recordType = record.getRecordType();
Class<? extends AggregatedRecord> clz = RecordUtility.getAggregatedRecordClass(recordType);
Class[] argTypes = { record.getClass() };
Constructor<? extends AggregatedRecord> constructor = clz.getDeclaredConstructor(argTypes);
Object[] arguments = {record};
return constructor.newInstance(arguments);
}
@SuppressWarnings("rawtypes")
public static AggregatedRecord getAggregatedRecord(Record record) throws Exception {
AggregatedRecord aggregatedRecord;
if(record instanceof AggregatedRecord){
// the record is already an aggregated version
aggregatedRecord = (AggregatedRecord) record;
}else{
aggregatedRecord = instantiateAggregatedRecord(record);
}
return aggregatedRecord;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(Record record){
String recordType = record.getRecordType();
List<Record> records;
if(this.bufferedRecords.containsKey(recordType)){
records = this.bufferedRecords.get(recordType);
boolean found = false;
for(Record bufferedRecord : records){
if(!(bufferedRecord instanceof AggregatedRecord)){
continue;
}
try {
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
logger.trace("Trying to use {} for aggregation.", bufferedAggregatedRecord);
if(record instanceof AggregatedRecord){
// TODO check compatibility using getAggrergable
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
}else{
bufferedAggregatedRecord.aggregate((Record) record);
}
logger.trace("Aggregated Record is {}", bufferedAggregatedRecord);
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation", bufferedRecord);
}
}
if(!found){
records.add(record);
totalBufferedRecords++;
return;
}
}else{
records = new ArrayList<Record>();
try {
records.add(getAggregatedRecord(record));
} catch (Exception e) {
records.add(record);
}
totalBufferedRecords++;
this.bufferedRecords.put(recordType, records);
}
}
public void flush(PersistenceExecutor persistenceExecutor) throws Exception{
aggregate(null, persistenceExecutor, true);
}
protected abstract void schedulerSpecificClear();
protected void clear(){
totalBufferedRecords=0;
bufferedRecords.clear();
schedulerSpecificClear();
}
protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
if(record!=null){
logger.trace("Trying to aggregate {}", record);
madeAggregation(record);
}
if(isTimeToPersist() || forceFlush){
Record[] recordToPersist = new Record[totalBufferedRecords];
int i = 0;
Collection<List<Record>> values = bufferedRecords.values();
for(List<Record> records : values){
for(Record thisRecord: records){
recordToPersist[i] = thisRecord;
i++;
}
}
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
}
}
/**
* Get an usage records and try to aggregate with other buffered
* Usage Record.
* @param singleRecord the Usage Record To Buffer
* @return true if is time to persist the buffered Usage Record
* @throws Exception if fails
*/
public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception {
logger.trace("Going to aggregate {}", record);
aggregate(record, persistenceExecutor, false);
}
protected abstract boolean isTimeToPersist();
}

View File

@ -0,0 +1,67 @@
/**
*
*/
package org.gcube.documentstore.records.aggregation;
import java.util.Calendar;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*
* This class implements a Simple Buffer with timeout strategy.
* It buffer a predefined number of Records before invoking a persistence.
*/
public class BufferAggregationScheduler extends AggregationScheduler {
/**
* Define the MAX number of Record to buffer.
* TODO Get from configuration
*/
protected final static int MAX_RECORDS_NUMBER = 15;
/**
* The Max amount of time elapsed form last record before after that
* the buffered record are persisted even if
* TODO Get from configuration
*/
protected final static long OLD_RECORD_MAX_TIME_ELAPSED = 1000*60*5; // 5 min
protected boolean firstOfBuffer;
protected long firstBufferedTime;
protected BufferAggregationScheduler(){
super();
this.firstOfBuffer = true;
}
@Override
protected void schedulerSpecificClear(){
firstOfBuffer = true;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isTimeToPersist(){
long now = Calendar.getInstance().getTimeInMillis();
if(firstOfBuffer){
firstOfBuffer = false;
firstBufferedTime = now;
}
if(totalBufferedRecords >= MAX_RECORDS_NUMBER){
return true;
}
if((now - firstBufferedTime) >= OLD_RECORD_MAX_TIME_ELAPSED){
return true;
}
return false;
}
}