Add jackson for model

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@148473 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2017-05-11 14:44:04 +00:00
parent 2eb5b28866
commit 1a71796218
17 changed files with 520 additions and 70 deletions

19
pom.xml
View File

@ -9,7 +9,7 @@
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<name>Document Store Lib</name>
<description>Allow to persist data in NoSQL Document Store Databases.
Discover Model dynamically.
@ -38,14 +38,14 @@
<version>1.7.5</version>
<scope>provided</scope>
</dependency>
<!--
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.0</version>
</dependency>
<!-- Test Dependency -->
<dependency>
<groupId>junit</groupId>
@ -61,9 +61,6 @@
</dependency>
</dependencies>
<build>

View File

@ -9,13 +9,18 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class FallbackPersistenceBackend extends PersistenceBackend {
private static final Logger logger = LoggerFactory.getLogger(FallbackPersistenceBackend.class);
private File fallbackFile;
/**
@ -62,7 +67,11 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
*/
@Override
protected void reallyAccount(Record record) throws Exception {
printLine(String.valueOf(record));
String marshalled = DSMapper.marshal(record);
logger.debug("reallyAccount:{}",marshalled);
printLine(marshalled);
}
public void printLine(String line) throws Exception {

View File

@ -214,6 +214,7 @@ public abstract class PersistenceBackend {
} catch (InvalidValueException e) {
logger.error("Error validating {}", record.getClass().getSimpleName(), e);
} catch (Exception e) {
logger.error("Error recording {}", record.getClass().getSimpleName(), e);
}
}
@ -230,7 +231,7 @@ public abstract class PersistenceBackend {
public void account(final Record record) throws InvalidValueException{
Runnable runnable = new Runnable(){
@Override
public void run(){
public void run(){
accountValidateAggregate(record, true, true);
}
};

View File

@ -126,9 +126,19 @@ public abstract class PersistenceBackendFactory {
context = sanitizeContext(context);
logger.debug("Discovering {} for scope {}",
PersistenceBackend.class.getSimpleName(), context);
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
logger.trace("discoverPersistenceBackend Found a service loader {}", serviceLoader.toString());
logger.trace("discoverPersistenceBackend Found a service loader with {}", PersistenceBackend.class.toString());
for (PersistenceBackend found : serviceLoader) {
logger.trace("for PersistenceBackend");
logger.trace("Testing before cast {}", found.toString());
Class<? extends PersistenceBackend> foundClass = found.getClass();
try {
String foundClassName = foundClass.getSimpleName();
logger.trace("Testing {}", foundClassName);
@ -154,6 +164,7 @@ public abstract class PersistenceBackendFactory {
logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e);
}
}
logger.trace("Not Found any service loader");
return null;
};

View File

@ -53,7 +53,7 @@ public class PersistenceBackendMonitor implements Runnable {
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
Record record = RecordUtility.getRecord(line);
Record record = RecordUtility.getRecord( line);
persistenceBackend.accountWithFallback(record);
} catch(Exception e){
logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e);

View File

@ -15,12 +15,13 @@ import org.gcube.documentstore.records.implementation.RequiredField;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Record> extends Record {
/**
* KEY : Indicate that this {@link Record} is an aggregation
*/
@RequiredField @AggregatedField
@RequiredField @AggregatedField
public static final String AGGREGATED = "aggregated";
/**

View File

@ -0,0 +1,88 @@
package org.gcube.documentstore.records;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
public class CustomMapDeserializer extends StdDeserializer<Map<String, Serializable>>{
private static final long serialVersionUID = 1L;
protected CustomMapDeserializer() {
super(Map.class);
}
@Override
public Map<String, Serializable> deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException,
JsonProcessingException {
JsonToken currentToken = null;
Map<String, Serializable> toRetunMap = new HashMap<String, Serializable>();
Stack<Map<String, Serializable>> mapsStack = new Stack<Map<String,Serializable>>();
while ((currentToken = jp.nextValue()) != null) {
switch (currentToken) {
case END_ARRAY:
break;
case START_ARRAY:
break;
case START_OBJECT:
mapsStack.push(new HashMap<String, Serializable>());
break;
case FIELD_NAME:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), jp.getText());
else mapsStack.peek().put(jp.getCurrentName(), jp.getText());
break;
case VALUE_STRING:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), jp.getText());
else mapsStack.peek().put(jp.getCurrentName(), jp.getText());
break;
case VALUE_NUMBER_FLOAT:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), jp.getText());
else mapsStack.peek().put(jp.getCurrentName(), jp.getText());
break;
case VALUE_NUMBER_INT:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), jp.getText());
else mapsStack.peek().put(jp.getCurrentName(), jp.getText());
break;
case VALUE_FALSE:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), false);
else mapsStack.peek().put(jp.getCurrentName(), false);
break;
case VALUE_TRUE:
if(mapsStack.isEmpty())
toRetunMap.put(jp.getCurrentName(), true);
else mapsStack.peek().put(jp.getCurrentName(), true);
break;
case END_OBJECT:
if (mapsStack.isEmpty()) break;
if (mapsStack.size()==1)
toRetunMap.put(jp.getCurrentName(), (Serializable)mapsStack.pop());
else{
Map<String, Serializable> tmpMap = mapsStack.pop();
mapsStack.peek().put(jp.getCurrentName(), (Serializable)tmpMap);
}
break;
default:
break;
}
}
return toRetunMap;
}
}

View File

@ -0,0 +1,180 @@
package org.gcube.documentstore.records;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import org.gcube.documentstore.records.implementation.AbstractRecord;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class DSMapper {
protected static final ObjectMapper mapper;
private DSMapper(){}
/**
* @return the ObjectMapper
*/
public static ObjectMapper getObjectMapper() {
return mapper;
}
static {
mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerModule(new IdentifiableDeserializableModule());
mapper.registerSubtypes(Record.class);
mapper.registerSubtypes(AggregatedRecord.class);
mapper.registerSubtypes(AbstractRecord.class);
}
/**
* Write the serialization of a given resource to a given
* {@link OutputStream} .
*
* @param resource the resource
* @param stream the stream in input
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
*/
public static <T extends OutputStream, R extends Record> T marshal(R object, T stream)
throws JsonGenerationException, JsonMappingException, IOException {
mapper.writeValue(stream, object);
return stream;
}
/**
* Write the serialization of a given resource to a given {@link Writer} .
* @param resource the resource
* @param writer the writer in input
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
*/
public static <T extends Writer, R extends Record> T marshal(R object, T writer)
throws JsonGenerationException, JsonMappingException, IOException {
mapper.writeValue(writer, object);
return writer;
}
/**
* Return the String serialization of a given resource
* @param object the resource
* @return the String serialization of a given resource
* @throws JsonProcessingException
*/
public static <R extends Record> String marshal(R object) throws JsonProcessingException {
return mapper.writeValueAsString(object);
}
/**
* Return the String serialization of a given resource
* @param object the resource
* @return the String serialization of a given resource
* @throws JsonProcessingException
*/
public static <R extends Record> String marshalR(R[] object) throws JsonProcessingException {
return mapper.writeValueAsString(object);
}
/**
* Creates a resource of given class from its serialization in a given
* {@link Reader}.
* @param clz the class of the resource
* @param reader the reader
* @return the resource
* @throws JsonParseException
* @throws JsonMappingException
* @throws IOException
*/
public static <R extends Record> R unmarshal(Class<R> clz, Reader reader)
throws JsonParseException, JsonMappingException, IOException {
return mapper.readValue(reader, clz);
}
/**
* Creates a resource of given class from its serialization in a given
* {@link InputStream}.
* @param clz the class of the resource
* @param stream the stream
* @return the resource
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
*/
public static <R extends Record> R unmarshal(Class<R> clz, InputStream stream)
throws JsonParseException, JsonMappingException, IOException {
return mapper.readValue(stream, clz);
}
/**
* Creates a resource of given class from its serialization in a given String
* @param clz the class of the resource
* @param string
* @return the resource
* @throws JsonParseException
* @throws JsonMappingException
* @throws IOException
*/
public static <R extends Record> R unmarshal(Class<R> clz, String string)
throws JsonParseException, JsonMappingException, IOException {
return mapper.readValue(string, clz);
}
public static <R extends Record> List<R> unmarshalList(Class<R> clz , String string)
throws JsonParseException, JsonMappingException, IOException {
JavaType type = mapper.getTypeFactory().constructCollectionType(ArrayList.class, clz) ;
return mapper.readValue(string, type);
}
public static <R extends Record> List<R> unmarshalList(String string)
throws JsonParseException, JsonMappingException, IOException {
JavaType type = mapper.getTypeFactory().constructCollectionType(ArrayList.class, Record.class) ;
return mapper.readValue(string, type);
}
@SuppressWarnings("unchecked")
public static <R extends Record> void registerSubtypes(Class<R>... classes) {
mapper.registerSubtypes(classes);
}
public static boolean isJSONValid(String jsonInString ) {
try {
final ObjectMapper mapperJson = new ObjectMapper();
mapperJson.readTree(jsonInString);
return true;
} catch (IOException e) {
return false;
}
}
}

View File

@ -0,0 +1,17 @@
package org.gcube.documentstore.records;
import java.io.Serializable;
import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
public class IdentifiableDeserializableModule extends SimpleModule {
/**
*
*/
private static final long serialVersionUID = -6210999408282132552L;
public IdentifiableDeserializableModule() {
addDeserializer(Serializable.class, new StringDeserializer());
}
}

View File

@ -12,12 +12,16 @@ import java.util.SortedSet;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.implementation.RequiredField;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = Record.RECORD_TYPE)
public interface Record extends Comparable<Record>, Serializable {
/**
* KEY : The unique identifier for the {@link Record}
* The ID SHOULD automatically created by the implementation class.
@ -28,14 +32,16 @@ public interface Record extends Comparable<Record>, Serializable {
/**
* KEY : The instant when the {@link Record} was created.
* The value MUST be recorded in UTC milliseconds from the epoch.
*/
*/
@RequiredField
public static final String CREATION_TIME = "creationTime";
/**
* KEY : The Type of the represented {@link Record}
*/
@RequiredField
@JsonIgnore
public static final String RECORD_TYPE = "recordType";
/**
@ -43,6 +49,7 @@ public interface Record extends Comparable<Record>, Serializable {
* The returned Set MUST be a copy of the internal representation.
* Any modification to the returned Set MUST not affect the object
*/
@JsonIgnore
public Set<String> getRequiredFields();
/**
@ -50,6 +57,7 @@ public interface Record extends Comparable<Record>, Serializable {
* The returned Set MUST be a copy of the internal representation.
* Any modification to the returned Set MUST not affect the object
*/
@JsonIgnore
public Set<String> getComputedFields();
/**
@ -58,18 +66,22 @@ public interface Record extends Comparable<Record>, Serializable {
* @return
* @throws Exception
*/
@JsonIgnore
public SortedSet<String> getQuerableKeys() throws Exception;
/**
* Return the {@link Record} Type
* @return {@link Record} Type
*/
@JsonIgnore
public String getRecordType();
/**
* Return the unique id for this {@link Record}
* @return {@link Record} Unique ID
*/
@JsonIgnore
public String getId();
/**
@ -79,12 +91,14 @@ public interface Record extends Comparable<Record>, Serializable {
* @param id Unique ID
* @throws InvalidValueException
*/
@JsonIgnore
public void setId(String id) throws InvalidValueException;
/**
* Return the instant when this {@link Record} was created.
* @return the creation time for this {@link Record}
*/
@JsonIgnore
public Calendar getCreationTime();
/**
@ -94,6 +108,7 @@ public interface Record extends Comparable<Record>, Serializable {
* @param creationTime Creation Time
* @throws InvalidValueException
*/
@JsonIgnore
public void setCreationTime(Calendar creationTime) throws InvalidValueException;
/**
@ -102,11 +117,13 @@ public interface Record extends Comparable<Record>, Serializable {
* not affect the object
* @return a Map containing the properties
*/
public Map<String, Serializable> getResourceProperties();
/**
* Set all resource-specific properties, replacing existing ones
*/
public void setResourceProperties(Map<String, ? extends Serializable> resourceSpecificProperties) throws InvalidValueException;
/**
@ -114,6 +131,7 @@ public interface Record extends Comparable<Record>, Serializable {
* @param key the key of the requested property
* @return the value of the given resource property
*/
public Serializable getResourceProperty(String key);
/**
@ -123,6 +141,7 @@ public interface Record extends Comparable<Record>, Serializable {
* @param key the key of the requested property
* @param value the value of the given resource property
*/
public void setResourceProperty(String key, Serializable value) throws InvalidValueException;
/**
@ -131,6 +150,7 @@ public interface Record extends Comparable<Record>, Serializable {
* value.
* @throws InvalidValueException
*/
@JsonIgnore
public void validate() throws InvalidValueException;
}

View File

@ -46,13 +46,21 @@ public class RecordUtility {
try {
List<Class<?>> classes = ReflectionUtility.getClassesForPackage(packageObject);
for(Class<?> clz : classes){
logger.trace("found a class:{}",clz.getSimpleName());
if(Record.class.isAssignableFrom(clz)){
addRecordClass((Class<? extends Record>) clz);
//DISCOVERY a subtype for jackson
DSMapper.registerSubtypes((Class<? extends Record>)clz);
}
if(AggregatedRecord.class.isAssignableFrom(clz)){
logger.trace("addAggregatedRecordClass ({}) ", clz.getName());
addAggregatedRecordClass((Class<? extends AggregatedRecord<?,?>>) clz);
DSMapper.registerSubtypes((Class<? extends AggregatedRecord>)clz);
}
}
} catch (ClassNotFoundException e) {
@ -71,10 +79,9 @@ public class RecordUtility {
if(record instanceof AggregatedRecord){
return;
}
discoveredRecordType = record.getRecordType();
discoveredRecordType = record.getRecordType();
if(!recordClassesFound.containsKey(discoveredRecordType)){
logger.trace("Not containsKey discoveredRecordType:{}, cls:{}",discoveredRecordType.toString(),cls.toString());
//logger.trace("Not containsKey discoveredRecordType:{}, cls:{}",discoveredRecordType.toString(),cls.toString());
recordClassesFound.put(discoveredRecordType, cls);
}
@ -86,17 +93,18 @@ public class RecordUtility {
}
protected static void addAggregatedRecordClass(Class<? extends AggregatedRecord<?,?>> cls){
if(Modifier.isAbstract(cls.getModifiers())){
logger.trace("is abstract cls:{}",cls);
if(Modifier.isAbstract(cls.getModifiers())){
return;
}
String discoveredRecordType;
try {
AggregatedRecord<?,?> instance = cls.newInstance();
//logger.trace("-------------instance:{}",instance.toString());
discoveredRecordType = instance.getRecordType();
//logger.trace("-------------discoveredRecordType:{}",discoveredRecordType.toString());
if(!aggregatedRecordClassesFound.containsKey(discoveredRecordType)){
logger.trace("discoveredRecordType not found"+discoveredRecordType+" with cls:"+cls.getName());
logger.trace("discoveredRecordType not found:"+discoveredRecordType+" with cls:"+cls.getName());
aggregatedRecordClassesFound.put(discoveredRecordType, cls);
Class<? extends Record> recordClass = instance.getAggregable();
@ -131,29 +139,13 @@ public class RecordUtility {
aggregatedRecordClassesFound = new HashMap<>();
recordAggregationMapping = new HashMap<>();
/* Old code using Reflections
Reflections recordClassesReflections = new Reflections();
Set<Class<? extends Record>> recordClasses = recordClassesReflections.getSubTypesOf(Record.class);
for(Class<? extends Record> cls : recordClasses){
addRecordClass(cls)
}
aggregatedRecordClassesFound = new HashMap<>();
Reflections aggregatedRecordReflections = new Reflections();
Set<Class<? extends AggregatedRecord>> aggregatedRecordClasses = aggregatedRecordReflections.getSubTypesOf(AggregatedRecord.class);
for(Class<? extends AggregatedRecord> cls : aggregatedRecordClasses){
addAggregatedRecordClass(cls);
}
*/
}
public static Class<? extends AggregatedRecord<?,?>> getAggregatedRecordClass(String recordType) throws ClassNotFoundException {
if(getAggregatedRecordClassesFound().containsKey(recordType)){
logger.trace("record type {},getAggregatedRecordClassesFound {}",recordType,getAggregatedRecordClassesFound(),getAggregatedRecordClassesFound().get(recordType));
//logger.trace("record type {},getAggregatedRecordClassesFound {}",recordType,getAggregatedRecordClassesFound(),getAggregatedRecordClassesFound().get(recordType));
return getAggregatedRecordClassesFound().get(recordType);
}
logger.error("Unable to find {} class for {}.",
@ -161,7 +153,7 @@ public class RecordUtility {
//logger.trace("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound());
logger.debug("getAggregatedRecordClass getAggregatedRecordClassesFound:"+getAggregatedRecordClassesFound());
throw new ClassNotFoundException();
}
@ -227,16 +219,30 @@ public class RecordUtility {
* @return the Record
* @throws Exception if deserialization fails
*/
public static Record getRecord(String serializedMap) throws Exception {
Map<String,? extends Serializable> map = getMapFromString(serializedMap);
Record record = getRecord(map);
try {
record.validate();
}catch(InvalidValueException e){
record.setResourceProperty(INVALID, true);
logger.error("Recovered record is not valid. Anyway, it will be persisted", e);
public static <R extends Record> R getRecord(String serializedMap) throws Exception {
//verify if serializedMap is a json (new serializable or old method)
if (DSMapper.isJSONValid(serializedMap)){
logger.debug("Unmarshal record with jackson");
return (R) DSMapper.unmarshal(Record.class, serializedMap);
}
else{
//old method
logger.debug("Unmarshal record custom");
Map<String,? extends Serializable> map = getMapFromString(serializedMap);
if (map!=null){
Record record = getRecord(map);
try {
record.validate();
}catch(InvalidValueException e){
record.setResourceProperty(INVALID, true);
logger.error("Recovered record is not valid. Anyway, it will be persisted", e);
}
return (R)record;
}
else
return null;
}
return record;
}
/**
@ -286,4 +292,7 @@ public class RecordUtility {
}
}

View File

@ -139,6 +139,7 @@ public class ReflectionUtility {
*/
@SuppressWarnings("restriction")
public static List<Class<?>> getClassesForPackage(String pckgname)
throws ClassNotFoundException {
final List<Class<?>> classes = new ArrayList<Class<?>>();

View File

@ -0,0 +1,27 @@
package org.gcube.documentstore.records;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
//@XmlAccessorType(XmlAccessType.NONE)
public class SerializableList<String> {
@XmlElement
private List<String> valuesList = new ArrayList<String>();
protected SerializableList(){}
public SerializableList(List<String> valuesList) {
super();
this.valuesList = valuesList;
}
public List<String> getValuesList() {
return valuesList;
}
}

View File

@ -84,6 +84,8 @@ public abstract class AggregationScheduler implements Runnable {
}
protected AggregationScheduler(PersistenceExecutor persistenceExecutor, AggregationConfig config, String name) {
logger.trace("+++++ AggregationScheduler");
this.config = config;
this.name = name;
@ -95,17 +97,47 @@ public abstract class AggregationScheduler implements Runnable {
}
private void schedule() {
//logger.trace("TestingThread AggregationScheduler schedule");
logger.trace("+++++ schedule");
/*
logger.trace("[{}] reloadConfiguration",agScheduler.name );
PersistenceBackendConfiguration configuration=getConfiguration();
try {
AggregationConfig agConf = CheckConfiguration(configuration);
if (!agScheduler.config.equals(agConf)) {
logger.trace("[{}] reloadConfiguration changeConfiguration "
+ "old config:{} newconfig:{}",agScheduler.name,agScheduler.config.toString(),agConf.toString());
agScheduler.setConfig(agConf);
agScheduler.run();
agScheduler.schedule();
}
else{
logger.trace("[{}] reloadConfiguration no changeConfiguration",agScheduler.name );
}
} catch (IOException e) {
logger.warn("error retrieving configuration",e);
}
*/
if (futureFlush!=null)
futureFlush.cancel(false);
if ((config.getInitialDelaySet() == 0) || (config.getDelaySet() == 0)) {
logger.trace("+++++ getInitialDelaySet");
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this, 1,1, TIME_UNIT);
}
else{
Random random = new Random();
Integer randStart= Math.abs(random.nextInt(RANDOM_INIT_START));
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,config.getInitialDelaySet()+randStart, config.getDelaySet(), TIME_UNIT);
logger.trace("+++++ else "+config.getInitialDelaySet());
futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,randStart, config.getDelaySet(), TIME_UNIT);
//futureFlush = ExecutorUtils.scheduler.scheduleAtFixedRate(this,config.getInitialDelaySet()+randStart, config.getDelaySet(), TIME_UNIT);
}
logger.trace("[{}] AggregationScheduler- Thread scheduler created in {} ",name, this.toString());
logger.trace("[{}] AggregationScheduler- Load configuration every {}",name, TIME_RELOAD_CONFIGURATION);
@ -194,7 +226,7 @@ public abstract class AggregationScheduler implements Runnable {
}
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
aggregate(null, persistenceExecutor, true);
}
@ -209,12 +241,11 @@ public abstract class AggregationScheduler implements Runnable {
protected synchronized void aggregate(Record record,
PersistenceExecutor persistenceExecutor, boolean forceFlush)
throws Exception {
if (record != null) {
madeAggregation(record);
}
if (isTimeToPersist(this.config.getMaxRecordsNumberSet(), this.config.getOldRecordMaxTimeElapsedSet())|| forceFlush) {
//logger.trace("TestingThread AggregationScheduler aggregate PersistenceExecutor:{}",persistenceExecutor.getClass().getName());
reallyFlush(persistenceExecutor);
}
@ -223,6 +254,7 @@ public abstract class AggregationScheduler implements Runnable {
protected void reallyFlush(PersistenceExecutor persistenceExecutor)
throws Exception {
if (totalBufferedRecords == 0) {
return;
}

View File

@ -41,7 +41,6 @@ public class BufferAggregationScheduler extends AggregationScheduler {
@Override
protected boolean isTimeToPersist(int maxRecordNumber, long oldRecordMaxTime) {
long now = Calendar.getInstance().getTimeInMillis();
if(firstOfBuffer){
firstOfBuffer = false;

View File

@ -21,6 +21,7 @@ import java.util.UUID;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.CustomMapDeserializer;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.implementation.validations.annotations.NotEmpty;
import org.gcube.documentstore.records.implementation.validations.annotations.ValidLong;
@ -28,11 +29,18 @@ import org.gcube.documentstore.records.implementation.validations.validators.Val
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
/**
* @author Luca Frosini (ISTI - CNR)
*
*/
public abstract class AbstractRecord implements Record {
@JsonTypeName(value="Record")
public class AbstractRecord implements Record {
/**
* Generated Serial Version UID
@ -51,13 +59,19 @@ public abstract class AbstractRecord implements Record {
protected static final String RECORD_TYPE = Record.RECORD_TYPE;
/** resource-specific properties */
@JsonDeserialize(using = CustomMapDeserializer.class)
@JsonIgnoreProperties(ignoreUnknown = true)
protected Map<String, Serializable> resourceProperties;
protected Map<String, List<FieldAction>> validation;
protected Map<String, List<FieldAction>> computation;
@JsonIgnore
protected Set<String> requiredFields;
@JsonIgnore
protected Set<String> computedFields;
@JsonIgnore
protected Set<String> aggregatedFields;
protected static Set<Field> getAllFields(Class<?> type) {
@ -180,6 +194,8 @@ public abstract class AbstractRecord implements Record {
initializeValidation();
}
@JsonIgnoreProperties(ignoreUnknown = true)
public AbstractRecord(){
init();
this.resourceProperties.put(ID, UUID.randomUUID().toString());
@ -188,6 +204,8 @@ public abstract class AbstractRecord implements Record {
this.resourceProperties.put(CREATION_TIME, calendar.getTimeInMillis());
}
@JsonIgnoreProperties(ignoreUnknown = true)
public AbstractRecord(Map<String, ? extends Serializable> properties) throws InvalidValueException {
init();
setResourceProperties(properties);
@ -220,14 +238,20 @@ public abstract class AbstractRecord implements Record {
return new HashSet<String>(aggregatedFields);
}
@Override
@JsonIgnore
@Override
public String getRecordType() {
return (String) this.resourceProperties.get(RECORD_TYPE);
}
protected abstract String giveMeRecordType();
protected String giveMeRecordType(){
return null;
};
@JsonIgnore
protected void setRecordType(){
//this.resourceProperties.put(RECORD_TYPE, this.getClass().getSimpleName());
this.resourceProperties.put(RECORD_TYPE, this.giveMeRecordType());
}
@ -278,6 +302,8 @@ public abstract class AbstractRecord implements Record {
/**
* {@inheritDoc}
*/
//insert here for discovery auto
//@JsonAnyGetter
@Override
public Map<String, Serializable> getResourceProperties() {
return new HashMap<String, Serializable>(this.resourceProperties);
@ -286,15 +312,19 @@ public abstract class AbstractRecord implements Record {
/**
* {@inheritDoc}
*/
@JsonAnySetter
@Override
public void setResourceProperties(Map<String, ? extends Serializable> properties) throws InvalidValueException {
Map<String, ? extends Serializable> validated = validateProperties(properties);
public void setResourceProperties(Map<String, ? extends Serializable> properties) throws InvalidValueException {
Map<String, ? extends Serializable> validated = validateProperties(properties);
this.resourceProperties = new HashMap<String, Serializable>(validated);
}
/**
* {@inheritDoc}
*/
@JsonAnyGetter
@Override
public Serializable getResourceProperty(String key) {
return this.resourceProperties.get(key);
@ -303,7 +333,8 @@ public abstract class AbstractRecord implements Record {
/**
* {@inheritDoc}
*/
@Override
@JsonAnySetter
@Override
public void setResourceProperty(String key, Serializable value) throws InvalidValueException {
Serializable checkedValue = validateField(key, value);
if(checkedValue == null){
@ -313,6 +344,9 @@ public abstract class AbstractRecord implements Record {
}
}
// AGGREGATION
/* --------------------------------------- */
@ -367,6 +401,25 @@ public abstract class AbstractRecord implements Record {
return (Long) this.resourceProperties.get(AggregatedRecord.END_TIME);
}
//Introduce for to serialize Java Object
/**
* Set the boolean aggregate by this Record
* @param Boolean aggregate
* @throws InvalidValueException
*/
protected void setAggregate(Boolean aggregate) throws InvalidValueException {
setResourceProperty(AggregatedRecord.AGGREGATED, aggregate);
}
/**
* Return the boolean aggregate by this Record
* @return Boolean Aggregate
*/
protected Boolean getAggregate() {
return (Boolean) this.resourceProperties.get(AggregatedRecord.AGGREGATED);
}
//End Introduce for to serialize Java Object
/**
* Return the right end of the time interval covered by this Record
* @return End Time
@ -375,7 +428,8 @@ public abstract class AbstractRecord implements Record {
long millis = getEndTimeInMillis();
return timestampToCalendar(millis);
}
protected Serializable validateField(String key, Serializable value) throws InvalidValueException {
if(key == null){
throw new InvalidValueException("The key of property to set cannot be null");
@ -431,15 +485,17 @@ public abstract class AbstractRecord implements Record {
Serializable value = properties.get(key);
/* TODO Test Patch */
Serializable checkedValue = validateField(key, value);
if(checkedValue == null){
validated.remove(key);
}else{
validated.put(key, checkedValue);
}
/* Restore if test patch is not good
validated.put(key, validateField(key, value));
*/
// Restore if test patch is not good
//validated.put(key, validateField(key, value));
}
return validated;
}

View File

@ -1,5 +1,6 @@
package org.gcube.documentstore.records.implementation;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
@ -15,6 +16,8 @@ public class ConfigurationGetPropertyValues {
String propFileName = "./config/accounting.properties";
logger.trace("property file search"+propFileName);
logger.trace("find a properties in :"+new File(".").getAbsolutePath());
try (FileInputStream inputStream= new FileInputStream(propFileName)){
if (inputStream != null) {
prop=new Properties();
@ -23,7 +26,6 @@ public class ConfigurationGetPropertyValues {
}catch (Exception e) {
logger.trace("ConfigurationGetPropertyValues -property file error on input stream"+e.getLocalizedMessage());
}
return prop;
}
}