Fixed bug on aggregation

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@122213 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2016-01-13 14:34:05 +00:00
parent 53f773791c
commit 0feaeb33e6
6 changed files with 177 additions and 7 deletions

View File

@ -75,4 +75,8 @@ public abstract class PersistenceBackendConfiguration {
public String getProperty(String key) throws Exception {
return properties.get(key);
}
public String toString() {
return properties.toString();
}
}

View File

@ -4,9 +4,12 @@
package org.gcube.documentstore.records;
import java.util.Calendar;
import java.util.Set;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.records.implementation.AggregatedField;
import org.gcube.documentstore.records.implementation.RequiredField;
/**
@ -17,11 +20,13 @@ public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Rec
/**
* KEY : Indicate that this {@link Record} is an aggregation
*/
@RequiredField @AggregatedField
public static final String AGGREGATED = "aggregated";
/**
* KEY : Indicate The Number of {@link AggregatedRecord}
*/
@RequiredField @AggregatedField
public static final String OPERATION_COUNT = "operationCount";
/**
@ -29,6 +34,7 @@ public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Rec
* {@link AggregatedRecord}. The value will be recorded in UTC milliseconds
* from the epoch.
*/
@RequiredField @AggregatedField
public static final String START_TIME = "startTime";
/**
@ -36,8 +42,14 @@ public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Rec
* {@link AggregatedRecord}. The value will be recorded in UTC milliseconds
* from the epoch.
*/
@RequiredField @AggregatedField
public static final String END_TIME = "endTime";
/**
* @return a Set containing the keys of required fields
*/
public Set<String> getAggregatedFields();
public int getOperationCount();
public void setOperationCount(int operationCount) throws InvalidValueException;
@ -54,6 +66,10 @@ public interface AggregatedRecord<A extends AggregatedRecord<A,R>, R extends Rec
public A aggregate(R record) throws NotAggregatableRecordsExceptions;
public boolean isAggregable(A record) throws NotAggregatableRecordsExceptions;
public boolean isAggregable(R record) throws NotAggregatableRecordsExceptions;
public Class<R> getAggregable();
}

View File

@ -9,6 +9,7 @@ import java.util.Map;
import java.util.Set;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.implementation.RequiredField;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
@ -20,17 +21,20 @@ public interface Record extends Comparable<Record>, Serializable {
* KEY : The unique identifier for the {@link Record}
* The ID SHOULD automatically created by the implementation class.
*/
@RequiredField
public static final String ID = "id";
/**
* KEY : The instant when the {@link Record} was created.
* The value MUST be recorded in UTC milliseconds from the epoch.
*/
@RequiredField
public static final String CREATION_TIME = "creationTime";
/**
* KEY : The Type of the represented {@link Record}
*/
@RequiredField
public static final String RECORD_TYPE = "recordType";
/**

View File

@ -79,7 +79,7 @@ public abstract class AggregationScheduler {
logger.trace("Trying to use {} for aggregation.", bufferedAggregatedRecord);
if(record instanceof AggregatedRecord){
// TODO check compatibility using getAggrergable
// TODO check compatibility using getAggregable
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
}else{
bufferedAggregatedRecord.aggregate((Record) record);
@ -94,7 +94,11 @@ public abstract class AggregationScheduler {
}
if(!found){
records.add(record);
try {
records.add(getAggregatedRecord(record));
} catch (Exception e) {
records.add(record);
}
totalBufferedRecords++;
return;
}

View File

@ -0,0 +1,142 @@
/**
*
*/
package org.gcube.documentstore.records.aggregation;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Set;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
/**
* @author Luca Frosini (ISTI - CNR) http://www.lucafrosini.com/
*/
public class AggregationUtility<T extends AggregatedRecord<T,?>> {
protected T t;
protected Set<String> aggregationFields;
protected Set<String> neededFields;
protected void setDefaultAggregationFields(){
this.aggregationFields = new HashSet<String>(t.getRequiredFields());
this.aggregationFields.removeAll(t.getAggregatedFields());
this.aggregationFields.remove(Record.ID);
this.aggregationFields.remove(Record.CREATION_TIME);
this.aggregationFields.remove(AggregatedRecord.OPERATION_COUNT);
this.aggregationFields.remove(AggregatedRecord.AGGREGATED);
this.aggregationFields.remove(AggregatedRecord.START_TIME);
this.aggregationFields.remove(AggregatedRecord.END_TIME);
}
protected void setDefaultNeededFields(){
this.neededFields = new HashSet<String>(t.getRequiredFields());
this.neededFields.addAll(t.getAggregatedFields());
this.neededFields.add(AggregatedRecord.OPERATION_COUNT);
this.neededFields.add(AggregatedRecord.AGGREGATED);
this.neededFields.add(AggregatedRecord.START_TIME);
this.neededFields.add(AggregatedRecord.END_TIME);
}
public AggregationUtility(T t){
this.t = t;
setDefaultAggregationFields();
setDefaultNeededFields();
}
/**
* This function is used to set the Set of Aggregation Fields.
* By default this Set if composed by Required Fields for lossless
* aggregation. If you want perform lossy aggregation set this Set
* consistently with NeededFields using {@link #setNeededFields}
* @param aggregationFields
*/
public void setAggregationFields(Set<String> aggregationFields){
this.aggregationFields = aggregationFields;
}
/**
* This function is used to set the Set of Needed Fields to keep after
* aggregation. All other fields are removed.
* By default this Set if composed by Required Fields and AggregationField
* for lossless aggregation. If you want perform lossy aggregation set
* this Set consistently with AggregationFields using
* {@link #setAggregationFields}
* @param neededFields
*/
public void setNeededFields(Set<String> neededFields){
this.neededFields = neededFields;
}
/**
* Check if the record provided as argument is aggregable with the one
* provided to the Constructor.
* This is done comparing the value of each AggregationFields
* @param record to check
* @return true if the record provided as argument is aggregable with the
* one provided to the Constructor. False otherwise.
*/
@SuppressWarnings("unchecked")
public boolean isAggregable(T record) {
for(String field : aggregationFields){
@SuppressWarnings("rawtypes")
Comparable recordValue = record.getResourceProperty(field);
@SuppressWarnings("rawtypes")
Comparable thisValue = t.getResourceProperty(field);
if(recordValue.compareTo(thisValue)!=0){
return false;
}
}
return true;
}
/**
* Remove all fields which are not in AggregationFields nor in
* AggregatedFields Sets
*/
protected void cleanExtraFields(){
Set<String> propertyKeys = t.getResourceProperties().keySet();
for(String propertyName : propertyKeys){
if(!neededFields.contains(propertyName)){
t.getResourceProperties().remove(propertyName);
}
}
}
public synchronized T aggregate(T record) throws NotAggregatableRecordsExceptions {
try{
if(!isAggregable(record)){
throw new NotAggregatableRecordsExceptions("The Record provided as argument has different values for field wich must be common to be aggregatable");
}
Calendar recordStartTime = record.getStartTime();
Calendar actualStartTime = t.getStartTime();
if(recordStartTime.before(actualStartTime)){
t.setStartTime(recordStartTime);
}
Calendar recordEndTime = record.getEndTime();
Calendar actualEndTime = t.getEndTime();
if(recordEndTime.after(actualEndTime)){
t.setEndTime(recordEndTime);
}
Calendar newCreationTime = Calendar.getInstance();
t.setCreationTime(newCreationTime);
t.setOperationCount(t.getOperationCount() + record.getOperationCount());
cleanExtraFields();
return t;
}catch(NotAggregatableRecordsExceptions e){
throw e;
}catch(Exception ex){
throw new NotAggregatableRecordsExceptions(ex.getCause());
}
}
}

View File

@ -190,7 +190,7 @@ public abstract class AbstractRecord implements Record {
setResourceProperty(ID, id);
}
protected Calendar timestampStringToCalendar(long millis){
public static Calendar timestampToCalendar(long millis){
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(millis);
return calendar;
@ -202,7 +202,7 @@ public abstract class AbstractRecord implements Record {
@Override
public Calendar getCreationTime() {
long millis = (Long) this.resourceProperties.get(CREATION_TIME);
return timestampStringToCalendar(millis);
return timestampToCalendar(millis);
}
/**
@ -285,7 +285,7 @@ public abstract class AbstractRecord implements Record {
*/
protected Calendar getStartTimeAsCalendar() {
long millis = getStartTimeInMillis();
return timestampStringToCalendar(millis);
return timestampToCalendar(millis);
}
/**
@ -311,7 +311,7 @@ public abstract class AbstractRecord implements Record {
*/
protected Calendar getEndTimeAsCalendar() {
long millis = getEndTimeInMillis();
return timestampStringToCalendar(millis);
return timestampToCalendar(millis);
}
protected Comparable<? extends Serializable> validateField(String key, Comparable<? extends Serializable> value) throws InvalidValueException {