Merged from private branch. Refactored document-store-lib to use jackson for marshalling/unmarshalling. Fixes #9035

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@152679 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-09-05 14:58:47 +00:00
parent ed60687189
commit c9a24f51ce
20 changed files with 517 additions and 577 deletions

View File

@ -7,26 +7,32 @@ import java.util.concurrent.ThreadFactory;
public class ExecutorUtils {
public static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingScheduledThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
public static final ScheduledExecutorService scheduler;
public static ExecutorService threadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingAggregationThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
public static final ExecutorService threadPool;
static {
scheduler = Executors.newScheduledThreadPool(50, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingScheduledThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
threadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {
private int counter = 0;
private static final String prefix = "AccountingAggregationThread";
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
});
}
}

View File

@ -17,32 +17,29 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* This class is used as scheduled thread to check if some records
* where persisted in fallback file to retry
* to persist them with the discovered PersistenceBackend
* @author Luca Frosini (ISTI - CNR)
*/
public class PersistenceBackendMonitor implements Runnable {
public class FallbackMonitor implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(PersistenceBackendMonitor.class);
private final static Logger logger = LoggerFactory.getLogger(FallbackMonitor.class);
private final static String ELABORATION_FILE_SUFFIX = ".ELABORATION";
private final static String ELABORATION_FILE_NOT_DELETED_SUFFIX = ".ELABORATION.NOT-DELETED";
//protected final ScheduledExecutorService scheduler;
protected final PersistenceBackend persistenceBackend;
public final static int INITIAL_DELAY = 1;
public final static int DELAY = 10;
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){
this.persistenceBackend = persistenceBackend;
//this.scheduler = Executors.newSingleThreadScheduledExecutor();
//this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
public FallbackMonitor(PersistenceBackend persistenceBackend){
this(persistenceBackend,true);
}
public PersistenceBackendMonitor(PersistenceBackend persistenceBackend, boolean schedule){
public FallbackMonitor(PersistenceBackend persistenceBackend, boolean schedule){
this.persistenceBackend = persistenceBackend;
if(schedule){
ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
@ -53,7 +50,7 @@ public class PersistenceBackendMonitor implements Runnable {
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
Record record = RecordUtility.getRecord( line);
Record record = RecordUtility.getRecord(line);
persistenceBackend.accountWithFallback(record);
} catch(Exception e){
logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e);

View File

@ -67,8 +67,6 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
*/
@Override
protected void reallyAccount(Record record) throws Exception {
String marshalled = DSMapper.marshal(record);
logger.debug("reallyAccount:{}",marshalled);
printLine(marshalled);
@ -97,6 +95,11 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
public boolean isConnectionActive() throws Exception{
return true;
}
@Override
protected void clean() throws Exception {
// Nothing TO DO
}
}

View File

@ -5,7 +5,6 @@ package org.gcube.documentstore.persistence;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.Record;
@ -21,32 +20,37 @@ public abstract class PersistenceBackend {
private static final Logger logger = LoggerFactory.getLogger(PersistenceBackend.class);
/**
* Max Time Period Used during while the usage of FallbackPersistenceBackend is forced
*/
public static final long MAX_TIME_TO_FALLBACK = 1000 * 60 * 30; // 30 min;
/**
* Max Times of Retry before forcing the usage of FallbackPersistenceBackend
*/
public static final int MAX_FALLBACK_RETRY = 3;
protected boolean forceFallbackUse;
protected int fallbackUseCounter;
protected long fallbackUseStartTime;
protected FallbackPersistenceBackend fallbackPersistence;
protected AggregationScheduler aggregationScheduler;
protected PersistenceBackendMonitor persistenceBackendMonitor;
public static final Integer MAX_FALLBACK = 3; // max fallback with reload a
// configuration
protected static Integer countFallback;
// add to control a timeout execption
protected boolean timeoutFallback = false;
protected long timerToFallback;
public static final long MAX_TIME_TO_FALLBACK = 1000 * 60 * 60; // 60 min;
protected FallbackMonitor persistenceBackendMonitor;
protected PersistenceBackend() {
if (!(this instanceof FallbackPersistenceBackend)) {
this.persistenceBackendMonitor = new PersistenceBackendMonitor(this);
this.persistenceBackendMonitor = new FallbackMonitor(this);
}
countFallback = 0;
forceFallbackUse = false;
fallbackUseCounter = 0;
fallbackUseStartTime = 0;
}
protected PersistenceBackend(FallbackPersistenceBackend fallback) {
this();
this.fallbackPersistence = fallback;
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this), "FALLBACK");
this.aggregationScheduler = AggregationScheduler.newInstance(new DefaultPersitenceExecutor(this));
}
/**
@ -99,7 +103,7 @@ public abstract class PersistenceBackend {
protected abstract void openConnection() throws Exception;
/**
* This method is used to open db connection
* This method is used to close db connection
*
* @throws Exception
*/
@ -110,18 +114,22 @@ public abstract class PersistenceBackend {
*
* @throws Exception
*/
public abstract void close() throws Exception;
public void close() throws Exception {
flush();
closeConnection();
}
/**
* This method is used to close db and clean the configuration. It is used
* when there are too much TimeoutException trying to persist records.
*
* This method is used to allow PersistenceBackend implementations
* to clean a global status if any (or to renew it) when to much
* exceptions occurs trying to persist Records.
* @throws Exception
*/
protected void closeAndClean() throws Exception {};
protected abstract void clean() throws Exception;
/**
* Check the Connection state
*
* @return true if the connection is active, false otherwise
* @throws Exception
*/
@ -140,63 +148,74 @@ public abstract class PersistenceBackend {
*/
protected void accountWithFallback(Record... records) throws Exception {
String persistenceName = this.getClass().getSimpleName();
String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
this.openConnection();
for (Record record : records) {
String recordString = null;
try {
// old code
// this.reallyAccount(record);
long now = Calendar.getInstance().getTimeInMillis();
if ((timeoutFallback)) {
recordString = record.toString();
if (forceFallbackUse) {
logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, recordString);
fallbackPersistence.reallyAccount(record);
logger.trace("accountWithFallback for timeout, now:{} and timerToFallback:{}", now,
timerToFallback);
if ((now - timerToFallback) > MAX_TIME_TO_FALLBACK) {
logger.debug("accountWithFallback MAX_TIME_TO_FALLBACK is conclused");
timeoutFallback = false;
timerToFallback = 0;
long now = Calendar.getInstance().getTimeInMillis();
long diff = now - fallbackUseStartTime;
logger.trace("{} forced use started at {}. {} seconds were elapsed",
fallbackPersistenceName, fallbackUseStartTime, (long) diff/1000);
if (diff > MAX_TIME_TO_FALLBACK) {
logger.debug("The time to force the usage of {} is terminated. Trying to restore the use of {}",
fallbackPersistenceName, persistenceName);
forceFallbackUse = false;
fallbackUseCounter = 0;
fallbackUseStartTime = 0;
}
} else {
this.reallyAccount(record);
logger.trace("accountWithFallback {} accounted succesfully from {}.", record.toString(),
persistenceName);
timeoutFallback = false;
timerToFallback = 0;
logger.trace("{} accounted succesfully from {}.", record.toString(), persistenceName);
}
} catch (Exception e) {
// TODO Insert Renew HERE
if ((!(this instanceof FallbackPersistenceBackend)) && e.getCause() != null
&& e.getCause() instanceof TimeoutException) {
logger.warn("accountWithFallback TimeoutException number:{} to {}.", countFallback, MAX_FALLBACK);
countFallback++;
if (countFallback.equals(MAX_FALLBACK)) {
timeoutFallback = true;
timerToFallback = Calendar.getInstance().getTimeInMillis();
logger.trace("accountWithFallback Going to account {} in fallback for too many timeout",
record);
// old code
// PersistenceBackendFactory.renew(this);
countFallback = 0;
// disconnect
this.closeAndClean();
}
} else {
logger.trace("accountWithFallback Fallback is same instance:{}" + this.getClass().getSimpleName());
}
} catch (Throwable t) {
try {
String fallabackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
logger.error("accountWithFallback {} was not accounted succesfully from {}. Trying to use {}.",
record.toString(), persistenceName, fallabackPersistenceName, e);
logger.warn("{} was not accounted succesfully using {}. Trying to use {}.", recordString,
persistenceName, fallbackPersistenceName, t);
fallbackPersistence.reallyAccount(record);
logger.trace("accountWithFallback {} accounted succesfully from {}", record.toString(),
fallabackPersistenceName);
} catch (Exception ex) {
logger.error("accountWithFallback {} was not accounted at all", record.toString(), e);
logger.trace("{} accounted succesfully from {}", recordString, fallbackPersistenceName);
} catch (Throwable th) {
logger.error("{} was not accounted at all", recordString, t);
}
if (!(this instanceof FallbackPersistenceBackend) ){
// && (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
fallbackUseCounter++;
logger.warn("Exception number is {}. Max Retry number is {}. After that the use of {} will be forced",
fallbackUseCounter, MAX_FALLBACK_RETRY, fallbackPersistenceName);
if (fallbackUseCounter == MAX_FALLBACK_RETRY) {
forceFallbackUse = true;
fallbackUseStartTime = Calendar.getInstance().getTimeInMillis();
logger.trace("Going to force {} for too many Exceptions",
fallbackPersistenceName);
aggregationScheduler.flush(new DefaultPersitenceExecutor(fallbackPersistence));
this.close();
this.clean();
}
}
}
}
this.closeConnection();
}
@ -211,8 +230,6 @@ public abstract class PersistenceBackend {
logger.trace("Received {} to account : {}", record.getClass().getSimpleName(), record);
if (validate) {
record.validate();
// logger.trace("{} {} valid",
// record.getClass().getSimpleName(), record);
}
if (aggregate) {
try {
@ -253,7 +270,18 @@ public abstract class PersistenceBackend {
ExecutorUtils.threadPool.execute(runnable);
}
/**
* Use {@link PersistenceBackend#flush()} instead
* @param timeout
* @param timeUnit
* @throws Exception
*/
@Deprecated
public void flush(long timeout, TimeUnit timeUnit) throws Exception {
flush();
}
public void flush() throws Exception {
aggregationScheduler.flush(new DefaultPersitenceExecutor(this));
}

View File

@ -21,22 +21,6 @@ public abstract class PersistenceBackendConfiguration {
protected Map<String,String> properties;
/**
* Used only for testing purpose
* @return
*/
protected static PersistenceBackendConfiguration getUnconfiguredInstance(){
ServiceLoader<? extends PersistenceBackendConfiguration> serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class);
for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) {
Class<? extends PersistenceBackendConfiguration> configClass = foundConfiguration.getClass();
String foundConfigurationClassName = configClass.getSimpleName();
logger.trace("{} getUnconfiguredInstance will be used.", foundConfigurationClassName);
return foundConfiguration;
}
return null;
}
public static PersistenceBackendConfiguration getInstance(Class<? extends PersistenceBackend> clz){
ServiceLoader<? extends PersistenceBackendConfiguration> serviceLoader = ServiceLoader.load(PersistenceBackendConfiguration.class);
for (PersistenceBackendConfiguration foundConfiguration : serviceLoader) {

View File

@ -6,9 +6,7 @@ package org.gcube.documentstore.persistence;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.records.RecordUtility;
@ -117,8 +115,6 @@ public abstract class PersistenceBackendFactory {
File fallbackFile = getFallbackFile(context);
logger.trace("{} for context {} is {}", FallbackPersistenceBackend.class.getSimpleName(), context, fallbackFile.getAbsolutePath());
FallbackPersistenceBackend fallbackPersistence = new FallbackPersistenceBackend(fallbackFile);
//TODO: VERIFY (Aggregation creation for fallback is already done on FallbackPersistenceBackend constructor)
//fallbackPersistence.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(fallbackPersistence)));
return fallbackPersistence;
}
@ -152,11 +148,14 @@ public abstract class PersistenceBackendFactory {
logger.trace("{} will be used.", foundClassName);
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found),configuration,"NON FALLBACK"));
if (fallback!=null)
found.setAggregationScheduler(AggregationScheduler.newInstance(new DefaultPersitenceExecutor(found)));
if(fallback!=null) {
found.setFallback(fallback);
else
} else {
found.setFallback(createFallback(context));
}
return found;
}
catch (Exception e) {
@ -180,6 +179,7 @@ public abstract class PersistenceBackendFactory {
//logger.trace("[getPersistenceBackend]{} {} in context {}", PersistenceBackend.class.getSimpleName(), persistence,context);
if(persistence==null){
logger.trace("[getPersistenceBackend]{} {} in context {}", PersistenceBackend.class.getSimpleName(), persistence,context);
/*
* Setting FallbackPersistence and unlocking.
* There will be another thread which will try to discover the
@ -206,7 +206,53 @@ public abstract class PersistenceBackendFactory {
return persistence;
}
private static PersistenceBackend switchPersistenceBackend(PersistenceBackend actual, PersistenceBackend target, String context){
synchronized (persistenceBackends) {
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
*/
// target.setAggregationScheduler(actual.getAggregationScheduler());
persistenceBackends.put(context, target);
try {
actual.close();
} catch (Exception e) {
logger.error("Error closing {} for context {} which has been substituted reset to {}.",
actual.getClass().getSimpleName(), context,
target.getClass().getSimpleName(), e);
}
return target;
}
}
protected static PersistenceBackend resetToFallbackPersistenceBackend(PersistenceBackend actual, String context){
context = sanitizeContext(context);
logger.debug("The {} for context {} is {}. "
+ "It will be switched to {}.",
PersistenceBackend.class.getSimpleName(), context,
actual.getClass().getSimpleName(),
FallbackPersistenceBackend.class.getSimpleName());
if(actual!=null && !(actual instanceof FallbackPersistenceBackend)){
FallbackPersistenceBackend fallbackPersistenceBackend = actual.getFallbackPersistence();
switchPersistenceBackend(actual, fallbackPersistenceBackend, context);
}
return actual;
}
protected static PersistenceBackend rediscoverPersistenceBackend(FallbackPersistenceBackend actual, String context){
context = sanitizeContext(context);
logger.debug("The {} for context {} is {}. "
@ -218,32 +264,9 @@ public abstract class PersistenceBackendFactory {
PersistenceBackendFactory.discoverPersistenceBackend(context, actual);
if(discoveredPersistenceBackend!=null){
synchronized (persistenceBackends) {
/*
* Passing the aggregator to the new PersistenceBackend
* so that the buffered records will be persisted with the
* new method
*
*/
//discoveredPersistenceBackend.setAggregationScheduler(actual.getAggregationScheduler());
persistenceBackends.put(context, discoveredPersistenceBackend);
/*
* Not needed because close has no effect. Removed to
* prevent problem in cases of future changes.
* try {
* actual.close();
* } catch (Exception e) {
* logger.error("Error closing {} for scope {} which has been substituted with {}.",
* actual.getClass().getSimpleName(), scope,
* discoveredPersistenceBackend.getClass().getSimpleName(), e);
* }
*
*/
return discoveredPersistenceBackend;
}
switchPersistenceBackend(actual, discoveredPersistenceBackend, context);
}
return actual;
@ -251,38 +274,17 @@ public abstract class PersistenceBackendFactory {
/**
* Not used
* @param persistenceBackend
* Use {@link PersistenceBackendFactory#flush()} instead
* @param context
* @param timeout
* @param timeUnit
*/
protected synchronized static void renew(PersistenceBackend persistenceBackend){
String context = null;
logger.trace("Renew a configuration : {}", PersistenceBackend.class.getSimpleName());
Set<Entry<String, PersistenceBackend>> entrySet = persistenceBackends.entrySet();
for(Entry<String, PersistenceBackend> entry : entrySet){
if(entry.getValue() == persistenceBackend){
context = entry.getKey();
}
return;
}
FallbackPersistenceBackend fallbackPersistenceBackend =
persistenceBackend.getFallbackPersistence();
try {
persistenceBackend.close();
} catch (Exception e) {
logger.error("Error while closing {} : {}",
PersistenceBackend.class.getSimpleName(),
persistenceBackend, e);
}
persistenceBackends.put(context, fallbackPersistenceBackend);
new PersistenceBackendRediscover(context,
fallbackPersistenceBackend, INITIAL_DELAY,
FALLBACK_RETRY_TIME, TimeUnit.MILLISECONDS);
}
@Deprecated
public static void flush(String context, long timeout, TimeUnit timeUnit){
flush(context);
}
public static void flush(String context){
context = sanitizeContext(context);
PersistenceBackend apb;
@ -292,28 +294,44 @@ public abstract class PersistenceBackendFactory {
try {
logger.debug("Flushing records in context {}", context);
apb.flush(timeout, timeUnit);
apb.flush();
}catch(Exception e){
logger.error("Unable to flush records in context {} with {}", context, apb, e);
}
}
/**
* Use {@link PersistenceBackendFactory#flushAll()} instead
* @param timeout
* @param timeUnit
* @throws Exception
*/
@Deprecated
public static void flushAll(long timeout, TimeUnit timeUnit) {
flushAll();
}
public static void flushAll() {
for(String context : persistenceBackends.keySet()){
flush(context, timeout, timeUnit);
flush(context);
}
}
/**
*
*/
public static void shutdown() {
//disconnect the persistence and clean
for(String context : persistenceBackends.keySet()){
context = sanitizeContext(context);
PersistenceBackend apb;
synchronized (persistenceBackends) {
apb = persistenceBackends.get(context);
}
try {
logger.debug("Flushing records in context {}", context);
apb.close();
}catch(Exception e){
logger.error("Unable to flush records in context {} with {}", context, apb, e);
}
}
//shutdown the scheduler
ExecutorUtils.scheduler.shutdown();
try {
@ -329,21 +347,7 @@ public abstract class PersistenceBackendFactory {
} catch (InterruptedException e) {
logger.error("Unable to shutdown the threadPool", e);
}
//disconnect the persistence and clean
for(String context : persistenceBackends.keySet()){
context = sanitizeContext(context);
PersistenceBackend apb;
synchronized (persistenceBackends) {
apb = persistenceBackends.get(context);
}
try {
logger.debug("Flushing records in context {}", context);
apb.closeAndClean();
}catch(Exception e){
logger.error("Unable to flush records in context {} with {}", context, apb, e);
}
}
}
}

View File

@ -27,11 +27,8 @@ class PersistenceBackendRediscover implements Runnable {
public PersistenceBackendRediscover(String context,
FallbackPersistenceBackend fallbackPersistenceBackend,
long initialDelay, long delay, TimeUnit timeUnit){
this.context = context;
this.fallbackPersistenceBackend = fallbackPersistenceBackend;
//this.scheduler = Executors.newSingleThreadScheduledExecutor();
//this.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
scheduledThread = ExecutorUtils.scheduler.scheduleAtFixedRate(this, initialDelay, delay, timeUnit);
}
@ -49,9 +46,8 @@ class PersistenceBackendRediscover implements Runnable {
context);
//scheduler.shutdown();
scheduledThread.cancel(true);
}else{
logger.trace("{} for contaxt {} is still a {}. We will see if next time we will be more lucky.",
logger.trace("{} for context {} is still a {}. We will see if next time we will be more lucky.",
PersistenceBackend.class.getSimpleName(),
context,
FallbackPersistenceBackend.class.getSimpleName());

View File

@ -15,7 +15,6 @@ import org.gcube.documentstore.records.implementation.RequiredField;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Record> extends Record {
/**

View File

@ -14,8 +14,6 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
public class CustomMapDeserializer extends StdDeserializer<Map<String, Serializable>>{
private static final long serialVersionUID = 1L;
protected CustomMapDeserializer() {

View File

@ -12,6 +12,7 @@ public class IdentifiableDeserializableModule extends SimpleModule {
private static final long serialVersionUID = -6210999408282132552L;
public IdentifiableDeserializableModule() {
super();
addDeserializer(Serializable.class, new StringDeserializer());
}
}

View File

@ -12,6 +12,8 @@ import java.util.SortedSet;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.implementation.RequiredField;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@ -22,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = Record.RECORD_TYPE)
public interface Record extends Comparable<Record>, Serializable {
public static final String RECORD_TYPE = "recordType";
/**
* KEY : The unique identifier for the {@link Record}
* The ID SHOULD automatically created by the implementation class.
@ -35,15 +39,7 @@ public interface Record extends Comparable<Record>, Serializable {
*/
@RequiredField
public static final String CREATION_TIME = "creationTime";
/**
* KEY : The Type of the represented {@link Record}
*/
@RequiredField
@JsonIgnore
public static final String RECORD_TYPE = "recordType";
/**
* @return a Set containing the keys of required fields
* The returned Set MUST be a copy of the internal representation.
@ -117,13 +113,12 @@ public interface Record extends Comparable<Record>, Serializable {
* not affect the object
* @return a Map containing the properties
*/
@JsonAnyGetter
public Map<String, Serializable> getResourceProperties();
/**
* Set all resource-specific properties, replacing existing ones
*/
public void setResourceProperties(Map<String, ? extends Serializable> resourceSpecificProperties) throws InvalidValueException;
/**
@ -131,7 +126,6 @@ public interface Record extends Comparable<Record>, Serializable {
* @param key the key of the requested property
* @return the value of the given resource property
*/
public Serializable getResourceProperty(String key);
/**
@ -141,9 +135,16 @@ public interface Record extends Comparable<Record>, Serializable {
* @param key the key of the requested property
* @param value the value of the given resource property
*/
@JsonAnySetter
public void setResourceProperty(String key, Serializable value) throws InvalidValueException;
/**
* Remove a property from Record.
* This API is intended for intern use only.
* @param key the key of the requested property to remove
*/
public void removeResourceProperty(String key);
/**
* Validate the Resource Record.
* The validation check if all the Required Field are set and has valid
@ -152,5 +153,5 @@ public interface Record extends Comparable<Record>, Serializable {
*/
@JsonIgnore
public void validate() throws InvalidValueException;
}

View File

@ -3,8 +3,6 @@
*/
package org.gcube.documentstore.records;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.HashSet;
@ -12,7 +10,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.gcube.documentstore.exception.InvalidValueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,10 +20,12 @@ public class RecordUtility {
private static Logger logger = LoggerFactory.getLogger(RecordUtility.class);
/*
private final static String LINE_FREFIX = "{";
private final static String LINE_SUFFIX = "}";
private final static String KEY_VALUE_PAIR_SEPARATOR = ",";
private final static String KEY_VALUE_LINKER = "=";
*/
protected static Set<Package> recordPackages;
protected static Map<String, Class<? extends Record>> recordClassesFound;
@ -184,13 +183,14 @@ public class RecordUtility {
* }
*/
/*
protected static Map<String, ? extends Serializable> getMapFromString(String serializedMap){
/* Checking line sanity */
/* Checking line sanity * /
if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
return null;
}
/* Cleaning prefix and suffix to parse line */
/* Cleaning prefix and suffix to parse line * /
serializedMap = serializedMap.replace(LINE_FREFIX, "");
serializedMap = serializedMap.replace(LINE_SUFFIX, "");
@ -210,6 +210,7 @@ public class RecordUtility {
return map;
}
*/
protected static final String INVALID = "invalid";
@ -220,12 +221,13 @@ public class RecordUtility {
* @throws Exception if deserialization fails
*/
@SuppressWarnings("unchecked")
public static <R extends Record> R getRecord(String serializedMapOrValidJSON) throws Exception {
//verify if serializedMap is a json (new serializable or old method)
public static <R extends Record> R getRecord(String json) throws Exception {
return (R) DSMapper.unmarshal(Record.class, json);
/* verify if serializedMap is a json (new serializable or old method)
if (DSMapper.isJSONValid(serializedMapOrValidJSON)){
logger.debug("Unmarshal record with jackson");
return (R) DSMapper.unmarshal(Record.class, serializedMapOrValidJSON);
}
else{
//old method
@ -244,15 +246,18 @@ public class RecordUtility {
else
return null;
}
*/
}
/**
/* *
* Create a Record from a Map
* @param recordMap the Map
* @return the Record
* @throws Exception if deserialization fails
*/
* /
@SuppressWarnings("unchecked")
/*
* @SuppressWarnings("unchecked")
public static Record getRecord(Map<String, ? extends Serializable> recordMap) throws Exception {
String className = (String) recordMap.get(Record.RECORD_TYPE);
@ -262,12 +267,12 @@ public class RecordUtility {
* with usageRecordType instead recordType property.
*
* TODO Remove when all old fallback files has been elaborated
*/
* /
if(className == null){
className = (String) recordMap.get("usageRecordType");
((Map<String, Serializable>) recordMap).put(Record.RECORD_TYPE, className);
}
/* END of Patch */
/* END of Patch * /
boolean aggregated = false;
try {
@ -291,7 +296,7 @@ public class RecordUtility {
return record;
}
*/

View File

@ -1,27 +0,0 @@
package org.gcube.documentstore.records;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
//@XmlAccessorType(XmlAccessType.NONE)
public class SerializableList<String> {
@XmlElement
private List<String> valuesList = new ArrayList<String>();
protected SerializableList(){}
public SerializableList(List<String> valuesList) {
super();
this.valuesList = valuesList;
}
public List<String> getValuesList() {
return valuesList;
}
}

View File

@ -1,106 +0,0 @@
package org.gcube.documentstore.records.aggregation;
/**
* @author Alessandro Pieve (ISTI - CNR)
*/
public class AggregationConfig {
/**
* Define the MAX number of Record to buffer. TODO Get from configuration
*/
protected static final int MAX_RECORDS_NUMBER = 1000;
/**
* The Max amount of time elapsed form last record before after that the
* buffered record are persisted even if TODO Get from configuration
*/
protected static final long OLD_RECORD_MAX_TIME_ELAPSED = 1000 * 60 * 30; // 30 min
public static final int INITIAL_DELAY = 30;
public static final int DELAY = 30;
public static AggregationConfig getDefaultConfiguration(){
return new AggregationConfig(INITIAL_DELAY, DELAY, MAX_RECORDS_NUMBER, OLD_RECORD_MAX_TIME_ELAPSED);
}
private int initialDelaySet;
private int delaySet;
private int maxRecordsNumberSet;
private long oldRecordMaxTimeElapsedSet;
public AggregationConfig(Integer initialDelaySet, Integer delaySet,
int maxRecordsNumberSet, long oldRecordMaxTimeElapsedSet) {
super();
this.initialDelaySet = initialDelaySet;
this.delaySet = delaySet;
this.maxRecordsNumberSet = maxRecordsNumberSet;
this.oldRecordMaxTimeElapsedSet = oldRecordMaxTimeElapsedSet;
}
public Integer getInitialDelaySet() {
return initialDelaySet;
}
public void setInitialDelaySet(int initialDelaySet) {
this.initialDelaySet = initialDelaySet;
}
public Integer getDelaySet() {
return delaySet;
}
public void setDelaySet(int delaySet) {
this.delaySet = delaySet;
}
public int getMaxRecordsNumberSet() {
return maxRecordsNumberSet;
}
public void setMaxRecordsNumberSet(int maxRecordsNumberSet) {
this.maxRecordsNumberSet = maxRecordsNumberSet;
}
public long getOldRecordMaxTimeElapsedSet() {
return oldRecordMaxTimeElapsedSet;
}
public void setOldRecordMaxTimeElapsedSet(long oldRecordMaxTimeElapsedSet) {
this.oldRecordMaxTimeElapsedSet = oldRecordMaxTimeElapsedSet;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + delaySet;
result = prime * result + initialDelaySet;
result = prime * result + maxRecordsNumberSet;
result = prime
* result
+ (int) (oldRecordMaxTimeElapsedSet ^ (oldRecordMaxTimeElapsedSet >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AggregationConfig other = (AggregationConfig) obj;
if (delaySet != other.delaySet)
return false;
if (initialDelaySet != other.initialDelaySet)
return false;
if (maxRecordsNumberSet != other.maxRecordsNumberSet)
return false;
if (oldRecordMaxTimeElapsedSet != other.oldRecordMaxTimeElapsedSet)
return false;
return true;
}
@Override
public String toString() {
return "AggregationConfig [initialDelaySet=" + initialDelaySet
+ ", delaySet=" + delaySet + ", maxRecordsNumberSet="
+ maxRecordsNumberSet + ", oldRecordMaxTimeElapsedSet="
+ oldRecordMaxTimeElapsedSet + "]";
}
}

View File

@ -0,0 +1,131 @@
package org.gcube.documentstore.records.aggregation;
import java.util.concurrent.TimeUnit;
/**
* @author Alessandro Pieve (ISTI - CNR)
* @author Luca Frosini (ISTI - CNR)
*/
public class AggregationConfiguration {
/**
* Define the default MAX number of Record to buffer.
*/
public static final int DEFAULT_MAX_RECORDS_NUMBER = 1000;
/**
* Define the default Max amount of time elapsed from the time the first
* record where buffered
*/
public static final long DEFAULT_MAX_TIME_ELAPSED = 1000 * 60 * 30; // 30 minutes in millisec
public static final int DEFAULT_INITIAL_DELAY = 30; // in TIME_UNIT
public static final int DEFAULT_DELAY = 30; // in TIME_UNIT
public static final TimeUnit TIME_UNIT = TimeUnit.MINUTES;
public static AggregationConfiguration getDefaultConfiguration() {
return new AggregationConfiguration(DEFAULT_INITIAL_DELAY, DEFAULT_DELAY, DEFAULT_MAX_RECORDS_NUMBER,
DEFAULT_MAX_TIME_ELAPSED);
}
protected int initialDelay;
protected int delay;
protected int maxRecordsNumber;
protected long maxTimeElapsed;
public AggregationConfiguration(int initialDelay, int delay, int maxRecordsNumber, long maxTimeElapsed) {
super();
this.initialDelay = initialDelay;
this.delay = delay;
this.maxRecordsNumber = maxRecordsNumber;
this.maxTimeElapsed = maxTimeElapsed;
}
public int getInitialDelay() {
if(initialDelay > 0){
return initialDelay;
}else{
return DEFAULT_INITIAL_DELAY;
}
}
public void setInitialDelay(int initialDelay) {
this.initialDelay = initialDelay;
}
public int getDelay() {
if(delay > 0){
return delay;
}else{
return DEFAULT_DELAY;
}
}
public void setDelay(int delay) {
this.delay = delay;
}
public int getMaxRecordsNumber() {
if(maxRecordsNumber > 0){
return maxRecordsNumber;
}else{
return DEFAULT_MAX_RECORDS_NUMBER;
}
}
public void setMaxRecordsNumber(int maxRecordsNumber) {
this.maxRecordsNumber = maxRecordsNumber;
}
public long getMaxTimeElapsed() {
if(maxTimeElapsed > 0){
return maxTimeElapsed;
}else{
return DEFAULT_MAX_TIME_ELAPSED;
}
}
public void setMaxTimeElapsed(long maxTimeElapsed) {
this.maxTimeElapsed = maxTimeElapsed;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + delay;
result = prime * result + initialDelay;
result = prime * result + maxRecordsNumber;
result = prime * result + (int) (maxTimeElapsed ^ (maxTimeElapsed >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AggregationConfiguration other = (AggregationConfiguration) obj;
if (delay != other.delay)
return false;
if (initialDelay != other.initialDelay)
return false;
if (maxRecordsNumber != other.maxRecordsNumber)
return false;
if (maxTimeElapsed != other.maxTimeElapsed)
return false;
return true;
}
@Override
public String toString() {
return "AggregationConfig [initialDelay=" + initialDelay + ", delay=" + delay + ", maxRecordsNumber="
+ maxRecordsNumber + ", maxTimeElapsed=" + maxTimeElapsed + "]";
}
}

View File

@ -1,5 +1,7 @@
package org.gcube.documentstore.records.aggregation;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
@ -22,7 +24,6 @@ import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.implementation.ConfigurationGetPropertyValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,123 +33,81 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AggregationScheduler implements Runnable {
public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected final PersistenceExecutor persistenceExecutor;
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
public static final Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
public static final String AGGREGATION_SCHEDULER_TIME = "AggregationSchedulerTime";
public static final String BUFFER_RECORD_TIME = "BufferRecordTime";
public static final String BUFFER_RECORD_NUMBER = "BufferRecordNumber";
public static final Integer RANDOM_INIT_START=5;
public static final int RANDOM_INIT_START = 5;
/**
* The Max amount of time for reload a configuration Get from
* configuration
* The Max amount of time for reload a configuration Get from configuration
*/
public static long TIME_RELOAD_CONFIGURATION = 720 ; //720 minutes (12 Hours)
public static final long TIME_RELOAD_CONFIGURATION = 720; // 720 minutes (12 Hours)
public static final String CONFIG_DIRECTORY_NAME = "config";
public static final String FILE_NAME = "accounting.properties";
public static final File AGGREGATION_PROPERTIES_FILE;
static {
File file = new File(".");
file = new File(file, CONFIG_DIRECTORY_NAME);
AGGREGATION_PROPERTIES_FILE = new File(file, FILE_NAME);
}
protected int totalBufferedRecords;
protected Map<String, List<Record>> bufferedRecords;
protected final PersistenceExecutor persistenceExecutor;
public boolean changeConfiguration = false;
private String name;
private AggregationConfiguration config;
private AggregationConfig config;
// Schedule for flush and reload configuration
protected ScheduledFuture<?> futureFlush;
protected ScheduledFuture<?> futureReload;
//Schedule for flush and reload configuration
protected ScheduledFuture<?> futureFlush = null;
protected ScheduledFuture<?> futureReload = null;
public static AggregationScheduler newInstance(
PersistenceExecutor persistenceExecutor, String name) {
return new BufferAggregationScheduler(persistenceExecutor, name);
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor) {
return new BufferAggregationScheduler(persistenceExecutor);
}
public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor,
PersistenceBackendConfiguration configuration, String name)throws NumberFormatException, Exception {
AggregationConfig config = CheckConfiguration(configuration);
BufferAggregationScheduler bas = new BufferAggregationScheduler(persistenceExecutor, config, name );
return bas;
PersistenceBackendConfiguration configuration) throws NumberFormatException, Exception {
AggregationConfiguration config = CheckConfiguration(configuration);
return new BufferAggregationScheduler(persistenceExecutor, config);
}
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, String name) {
this(persistenceExecutor, AggregationConfig.getDefaultConfiguration(), name);
protected AggregationScheduler(PersistenceExecutor persistenceExecutor) {
this(persistenceExecutor, AggregationConfiguration.getDefaultConfiguration());
}
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfig config, String name) {
logger.trace("+++++ AggregationScheduler");
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfiguration config) {
this.config = config;
this.name = name;
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceExecutor = persistenceExecutor;
schedule();
reloadConfiguration();
reloadConfiguration();
}
private void schedule() {
logger.trace("+++++ schedule");
/*
logger.trace("[{}] reloadConfiguration",agScheduler.name );
PersistenceBackendConfiguration configuration=getConfiguration();
try {
AggregationConfig agConf = CheckConfiguration(configuration);
if (!agScheduler.config.equals(agConf)) {
logger.trace("[{}] reloadConfiguration changeConfiguration "
+ "old config:{} newconfig:{}",agScheduler.name,agScheduler.config.toString(),agConf.toString());
agScheduler.setConfig(agConf);
agScheduler.run();
agScheduler.schedule();
}
else{
logger.trace("[{}] reloadConfiguration no changeConfiguration",agScheduler.name );
}
} catch (IOException e) {
logger.warn("error retrieving configuration",e);
}
*/
if (futureFlush!=null)
if (futureFlush != null) {
futureFlush.cancel(false);
if ((config.getInitialDelaySet() == 0) || (config.getDelaySet() == 0)) {
logger.trace("+++++ getInitialDelaySet");
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT);
}
else{
Random random = new Random();
Integer randStart= Math.abs(random.nextInt(RANDOM_INIT_START));
logger.trace("+++++ else "+config.getInitialDelaySet());
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,randStart, config.getDelaySet(), TIME_UNIT);
//futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,config.getInitialDelaySet()+randStart, config.getDelaySet(), TIME_UNIT);
}
logger.trace("[{}] AggregationScheduler- Thread scheduler created in {} ",name, this.toString());
logger.trace("[{}] AggregationScheduler- Load configuration every {}",name, TIME_RELOAD_CONFIGURATION);
logger.trace("[{}] AggregationScheduler- Aggregated for max record {}", name, config.getMaxRecordsNumberSet());
logger.trace("[{}] AggregationScheduler- Aggregated for max time {}", name, config.getOldRecordMaxTimeElapsedSet());
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this, config.getInitialDelay(), config.getDelay(), AggregationConfiguration.TIME_UNIT);
}
@SuppressWarnings("rawtypes")
protected static AggregatedRecord instantiateAggregatedRecord(Record record)
throws Exception {
protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception {
String recordType = record.getRecordType();
Class<? extends AggregatedRecord> clz = RecordUtility.getAggregatedRecordClass(recordType);
Class[] argTypes = { record.getClass() };
@ -158,8 +117,7 @@ public abstract class AggregationScheduler implements Runnable {
}
@SuppressWarnings("rawtypes")
public static AggregatedRecord getAggregatedRecord(Record record)
throws Exception {
public static AggregatedRecord getAggregatedRecord(Record record) throws Exception {
AggregatedRecord aggregatedRecord;
if (record instanceof AggregatedRecord) {
@ -195,11 +153,11 @@ public abstract class AggregationScheduler implements Runnable {
} else {
bufferedAggregatedRecord.aggregate((Record) record);
}
logger.trace("Aggregated Record is {}",bufferedAggregatedRecord);
logger.trace("Aggregated Record is {}", bufferedAggregatedRecord);
found = true;
break;
} catch (NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation",bufferedRecord);
logger.trace("{} is not usable for aggregation", bufferedRecord);
}
}
@ -226,7 +184,7 @@ public abstract class AggregationScheduler implements Runnable {
}
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
aggregate(null, persistenceExecutor, true);
}
@ -238,23 +196,22 @@ public abstract class AggregationScheduler implements Runnable {
schedulerSpecificClear();
}
protected synchronized void aggregate(Record record,
PersistenceExecutor persistenceExecutor, boolean forceFlush)
throws Exception {
protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush)
throws Exception {
if (record != null) {
madeAggregation(record);
}
if (isTimeToPersist(this.config.getMaxRecordsNumberSet(), this.config.getOldRecordMaxTimeElapsedSet())|| forceFlush) {
if (isTimeToPersist(config.getMaxRecordsNumber(), config.getMaxTimeElapsed())
|| forceFlush) {
reallyFlush(persistenceExecutor);
}
}
protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception {
protected void reallyFlush(PersistenceExecutor persistenceExecutor)
throws Exception {
if (totalBufferedRecords == 0) {
return;
}
@ -267,7 +224,7 @@ public abstract class AggregationScheduler implements Runnable {
i++;
}
}
logger.trace("[{}]reallyFlush It is time to persist buffered records {}",name, Arrays.toString(recordToPersist));
logger.trace("It is time to persist buffered records {}", Arrays.toString(recordToPersist));
persistenceExecutor.persist(recordToPersist);
clear();
}
@ -282,35 +239,34 @@ public abstract class AggregationScheduler implements Runnable {
* @throws Exception
* if fails
*/
public void aggregate(Record record, PersistenceExecutor persistenceExecutor)
throws Exception {
public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception {
aggregate(record, persistenceExecutor, false);
}
protected abstract boolean isTimeToPersist(int maxRecordNumber,
long oldRecordMaxTime);
protected abstract boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime);
/**
* reloadConfiguration
*
* @throws Exception
*/
protected void reloadConfiguration() {
Random random = new Random();
Integer randStart= Math.abs(random.nextInt(RANDOM_INIT_START));
futureReload = ExecutorUtils.scheduler.scheduleAtFixedRate(new ReloaderThread(this),TIME_RELOAD_CONFIGURATION+randStart, TIME_RELOAD_CONFIGURATION, TIME_UNIT);
Integer randStart = Math.abs(random.nextInt(RANDOM_INIT_START));
futureReload = ExecutorUtils.scheduler.scheduleAtFixedRate(new ReloaderThread(this),
TIME_RELOAD_CONFIGURATION + randStart, TIME_RELOAD_CONFIGURATION, TimeUnit.MINUTES);
}
/**
* Get Configuration (used from reload configuration)
*
* @return
*/
protected PersistenceBackendConfiguration getConfiguration(){
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader
.load(PersistenceBackend.class);
protected PersistenceBackendConfiguration getConfiguration() {
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
PersistenceBackendConfiguration configuration = null;
for (PersistenceBackend found : serviceLoader) {
Class<? extends PersistenceBackend> foundClass = found
.getClass();
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.trace("getConfiguration - foundClassName {}", foundClassName);
@ -320,36 +276,57 @@ public abstract class AggregationScheduler implements Runnable {
}
logger.debug("{} will be used.", foundClassName);
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",foundClass.getSimpleName()), e);
logger.error(
String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",
foundClass.getSimpleName()),
e);
}
}
return configuration;
}
public static Properties getPropertiesFromFile() throws IOException {
Properties properties = null;
protected static AggregationConfig CheckConfiguration(PersistenceBackendConfiguration configuration) throws IOException{
logger.trace("Looking for properties in file " + AGGREGATION_PROPERTIES_FILE.getAbsolutePath());
try (FileInputStream inputStream = new FileInputStream(AGGREGATION_PROPERTIES_FILE)) {
if (inputStream != null) {
properties = new Properties();
properties.load(inputStream);
}
} catch (Exception e) {
logger.trace(
"ConfigurationGetPropertyValues -property file error on input stream" + e.getLocalizedMessage());
}
return properties;
}
protected static AggregationConfiguration CheckConfiguration(PersistenceBackendConfiguration configuration)
throws IOException {
Integer delay = null;
Integer maxRecordNumber = null;
Integer maxRecordTime = null;
try {
ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
Properties prop = properties.getPropValues();
Properties properties = AggregationScheduler.getPropertiesFromFile();
if (prop != null) {
if (properties != null) {
// get value from properties file
logger.trace("Configuration from properties file");
try {
delay = Integer.parseInt(prop.getProperty("delay"));
delay = Integer.parseInt(properties.getProperty("delay"));
} catch (Exception e) {
logger.trace("Configuration from properties file, not found a delay value");
}
try {
maxRecordNumber = Integer.parseInt(prop.getProperty("maxrecordnumber"));
maxRecordNumber = Integer.parseInt(properties.getProperty("maxrecordnumber"));
} catch (Exception e) {
logger.trace("Configuration from properties file, not found a maxRecordNumber value");
}
try {
maxRecordTime = Integer.parseInt(prop.getProperty("maxtimenumber")) * 1000 * 60;
maxRecordTime = Integer.parseInt(properties.getProperty("maxtimenumber")) * 1000 * 60;
} catch (Exception e) {
logger.trace("Configuration from properties file, not found a maxRecordTime value");
}
@ -376,41 +353,37 @@ public abstract class AggregationScheduler implements Runnable {
}
}
} catch (Exception e) {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",e.getLocalizedMessage()), e);
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.",
e.getLocalizedMessage()), e);
}
AggregationConfig config = AggregationConfig.getDefaultConfiguration();
AggregationConfiguration config = AggregationConfiguration.getDefaultConfiguration();
if (delay != null) {
config.setDelaySet(delay);
config.setInitialDelaySet(delay);
}
config.setDelay(delay);
config.setInitialDelay(delay);
}
if (maxRecordNumber != null) {
config.setMaxRecordsNumberSet(maxRecordNumber);
}
config.setMaxRecordsNumber(maxRecordNumber);
}
if (maxRecordTime != null) {
config.setOldRecordMaxTimeElapsedSet(maxRecordTime);
}
config.setMaxTimeElapsed(maxRecordTime);
}
return config;
}
public AggregationConfig getConfig() {
public AggregationConfiguration getConfig() {
return config;
}
public void setConfig(AggregationConfig newConfig) {
public void setConfig(AggregationConfiguration newConfig) {
this.config = newConfig;
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
this.flush(persistenceExecutor);
try {
this.flush(persistenceExecutor);
} catch (Exception e) {
logger.error("Error flushing Buffered Records", e);
}
@ -426,29 +399,27 @@ public abstract class AggregationScheduler implements Runnable {
this.agScheduler = agScheduler;
}
public void run() {
logger.trace("[{}] reloadConfiguration",agScheduler.name );
PersistenceBackendConfiguration configuration=getConfiguration();
public void run() {
PersistenceBackendConfiguration configuration = getConfiguration();
try {
AggregationConfig agConf = CheckConfiguration(configuration);
AggregationConfiguration agConf = CheckConfiguration(configuration);
if (!agScheduler.config.equals(agConf)) {
logger.trace("[{}] reloadConfiguration changeConfiguration "
+ "old config:{} newconfig:{}",agScheduler.name,agScheduler.config.toString(),agConf.toString());
logger.trace("reloadConfiguration changeConfiguration " + "old config:{} newconfig:{}",
agScheduler.config.toString(), agConf.toString());
agScheduler.setConfig(agConf);
agScheduler.run();
agScheduler.schedule();
}
else{
logger.trace("[{}] reloadConfiguration no changeConfiguration",agScheduler.name );
} else {
logger.trace("reloadConfiguration no changeConfiguration");
}
} catch (IOException e) {
logger.warn("error retrieving configuration",e);
logger.warn("error retrieving configuration", e);
}
}
}
public void shutdown(){
public void shutdown() {
futureReload.cancel(false);
this.run();
futureFlush.cancel(true);

View File

@ -95,7 +95,7 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
@SuppressWarnings("rawtypes")
Comparable thisValueComparable = (Comparable) thisValue;
if(recordValueComparable.compareTo(thisValueComparable)!=0){
logger.trace("{} != {}", recordValueComparable, thisValueComparable);
logger.trace("{} : {} != {}", field, recordValueComparable, thisValueComparable);
return false;
}
}else{
@ -116,7 +116,7 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
Set<String> propertyKeys = t.getResourceProperties().keySet();
for(String propertyName : propertyKeys){
if(!neededFields.contains(propertyName)){
t.getResourceProperties().remove(propertyName);
t.removeResourceProperty(propertyName);
}
}
}

View File

@ -19,13 +19,13 @@ public class BufferAggregationScheduler extends AggregationScheduler {
protected long firstBufferedTime;
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor, String name){
super(persistenceExecutor, name);
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor){
super(persistenceExecutor);
this.firstOfBuffer = true;
}
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfig config, String name){
super(persistenceExecutor, config, name);
public BufferAggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfiguration config){
super(persistenceExecutor, config);
this.firstOfBuffer = true;
}

View File

@ -33,14 +33,12 @@ import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@JsonTypeName(value="Record")
public class AbstractRecord implements Record {
public abstract class AbstractRecord implements Record {
/**
* Generated Serial Version UID
@ -55,9 +53,6 @@ public class AbstractRecord implements Record {
@ValidLong
protected static final String CREATION_TIME = Record.CREATION_TIME;
@NotEmpty
protected static final String RECORD_TYPE = Record.RECORD_TYPE;
/** resource-specific properties */
@JsonDeserialize(using = CustomMapDeserializer.class)
@ -158,12 +153,10 @@ public class AbstractRecord implements Record {
properties.removeAll(this.getComputedFields());
properties.remove(Record.ID);
properties.remove(Record.CREATION_TIME);
properties.remove(Record.RECORD_TYPE);
return properties;
}
protected void cleanExtraFields(){
Set<String> neededFields = this.requiredFields;
neededFields.addAll(this.aggregatedFields);
@ -199,7 +192,6 @@ public class AbstractRecord implements Record {
public AbstractRecord(){
init();
this.resourceProperties.put(ID, UUID.randomUUID().toString());
this.setRecordType();
Calendar calendar = Calendar.getInstance();
this.resourceProperties.put(CREATION_TIME, calendar.getTimeInMillis());
}
@ -238,23 +230,6 @@ public class AbstractRecord implements Record {
return new HashSet<String>(aggregatedFields);
}
@JsonIgnore
@Override
public String getRecordType() {
return (String) this.resourceProperties.get(RECORD_TYPE);
}
protected String giveMeRecordType(){
return null;
};
@JsonIgnore
protected void setRecordType(){
//this.resourceProperties.put(RECORD_TYPE, this.getClass().getSimpleName());
this.resourceProperties.put(RECORD_TYPE, this.giveMeRecordType());
}
/**
* {@inheritDoc}
*/
@ -302,7 +277,7 @@ public class AbstractRecord implements Record {
/**
* {@inheritDoc}
*/
//insert here for discovery auto
//insert here for discovery auto
//@JsonAnyGetter
@Override
public Map<String, Serializable> getResourceProperties() {
@ -330,6 +305,11 @@ public class AbstractRecord implements Record {
return this.resourceProperties.get(key);
}
@Override
public void removeResourceProperty(String key) {
this.resourceProperties.remove(key);
}
/**
* {@inheritDoc}
*/

View File

@ -1,31 +0,0 @@
package org.gcube.documentstore.records.implementation;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConfigurationGetPropertyValues {
private static final Logger logger = LoggerFactory.getLogger(ConfigurationGetPropertyValues.class);
public Properties getPropValues() throws IOException {
Properties prop = null;
String propFileName = "./config/accounting.properties";
logger.trace("property file search"+propFileName);
logger.trace("find a properties in :"+new File(".").getAbsolutePath());
try (FileInputStream inputStream= new FileInputStream(propFileName)){
if (inputStream != null) {
prop=new Properties();
prop.load(inputStream);
}
}catch (Exception e) {
logger.trace("ConfigurationGetPropertyValues -property file error on input stream"+e.getLocalizedMessage());
}
return prop;
}
}