Alessandro Pieve 2016-09-30 16:39:37 +00:00
parent f3446308e9
commit 890fee74d0
5 changed files with 48 additions and 37 deletions

View File

@ -122,7 +122,7 @@
<dependency>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-lib</artifactId>
<version>[2.2.0-SNAPSHOT,2.3.0-SNAPSHOT)</version>
<version>[2.2.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -14,6 +14,7 @@ import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.aggregation.AggregationUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,7 +31,7 @@ public class Aggregation {
//count buffer records
protected int totalBufferedRecords;
//list Aggregate record
protected Map<String, List<AggregatedRecord<?,?>>> bufferedRecords = new HashMap<String, List<AggregatedRecord<?,?>>>();
@ -65,18 +66,12 @@ public class Aggregation {
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void madeAggregation(AggregatedRecord<?,?> record) throws InvalidValueException{
String recordType = record.getRecordType();
List<AggregatedRecord<?,?>> records;
if(this.bufferedRecords.containsKey(recordType)){
records = this.bufferedRecords.get(recordType);
//logger.debug("value endtime{}, type endtime{}",record.getEndTime());
boolean found = false;
for(AggregatedRecord bufferedRecord : records){
if(!(bufferedRecord instanceof AggregatedRecord)){
continue;
@ -88,23 +83,20 @@ public class Aggregation {
//logger.debug("record: {}",record.toString());
if (util.isAggregable(record)){
try {
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
// TODO check compatibility using getAggregable
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
//patch for not changed a creation time
//bufferedAggregatedRecord.setCreationTime(bufferedAggregatedRecord.getStartTime());
bufferedAggregatedRecord.setCreationTime(record.getCreationTime());
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation", bufferedRecord);
}
}
}
if(!found){
@ -115,6 +107,7 @@ public class Aggregation {
}
}else{
//logger.debug("else if record contains "+recordType);
records = new ArrayList<AggregatedRecord<?,?>>();
try {
@ -126,6 +119,8 @@ public class Aggregation {
this.bufferedRecords.put(recordType, records);
}
}
@ -155,16 +150,16 @@ public class Aggregation {
String id=thisRecord.getId();
JsonObject accounting = JsonObject.empty();
for (String key : thisRecord.getResourceProperties().keySet()){
Object value=thisRecord.getResourceProperty(key);
if (!Utility.checkType(value))
value=(String)value.toString();
accounting.put(key, value);
}
JsonDocument document = JsonDocument.create(id, accounting);
listDocumentToPersist.add(document);
for (String key : thisRecord.getResourceProperties().keySet()){
Object value=thisRecord.getResourceProperty(key);
if (!Utility.checkType(value))
value=(String)value.toString();
accounting.put(key, value);
}
JsonDocument document = JsonDocument.create(id, accounting);
listDocumentToPersist.add(document);
}
}
clear();

View File

@ -86,10 +86,13 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
* one provided to the Constructor. False otherwise.
*/
@SuppressWarnings("unchecked")
public boolean isAggregable(T record) {
public boolean isAggregable(T record) {
for(String field : aggregationFields){
Serializable recordValue = record.getResourceProperty(field);
Serializable thisValue = t.getResourceProperty(field);
//logger.error("isAggregable-field:{} ,recordValue:{}, thisValue:{}",field,recordValue,thisValue);
if(recordValue instanceof Comparable && thisValue instanceof Comparable){
@SuppressWarnings("rawtypes")
Comparable recordValueComparable = (Comparable) recordValue;
@ -100,10 +103,17 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
return false;
}
}else{
if(recordValue.hashCode()!=this.hashCode()){
logger.trace("{} != {}", recordValue, thisValue);
/*
if (recordValue==null){
//logger.trace("{} != {}", recordValue, thisValue);
return false;
}
*/
if(recordValue.hashCode()!=this.hashCode()){
//logger.trace("{} != {}", recordValue, thisValue);
return false;
}
}
}
return true;

View File

@ -140,11 +140,9 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
}
}
catch (Exception e) {
logger.error(e.getLocalizedMessage());
logger.error("launch",e.getLocalizedMessage());
throw e;
}
Cluster cluster = CouchbaseCluster.create(ENV, url);
//Define a type for aggregate
@ -317,16 +315,22 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
try {
//patch for field of long type
String document=row.value().toString().replace("\":", "=").replace("\"", "");
Map<String,? extends Serializable> map = getMapFromString(document);
@SuppressWarnings("rawtypes")
@SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map);
aggregate.aggregate(record);
//insert an elaborate row into list JsonDocument for memory document elaborate
//insert an elaborate row into list JsonDocument for memory document elaborate
String identifier=(String) row.document().content().get("id");
JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
documentElaborate.add(documentJson);
return true;
}
catch(InvalidValueException ex){
@ -334,7 +338,8 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
return true;
}
catch (Exception e) {
logger.error(e.getLocalizedMessage());
logger.error("Error elaborateRow", e,e.getLocalizedMessage());
//throw e;
return false;
}

View File

@ -20,12 +20,13 @@ public class Tests {
@Before
public void beforeTest(){
}
@Test
public void testLaunch() throws Exception {
SecurityTokenProvider.instance.set("36501a0d-a205-4bf1-87ad-4c7185faa0d6-98187548");
//FOR DEBUG
String scopeDebug="/gcube/devNext";
ScopeProvider.instance.set(scopeDebug);
@ -38,13 +39,13 @@ public class Tests {
inputs.put("interval",1 );
/* OPTIONAL INPUT */
//change to time
inputs.put("startTime", 20);
inputs.put("startTime", 6);
//specify bucket
inputs.put("bucket","accounting_service");
//current scope
inputs.put("currentScope",true);
inputs.put("currentScope",false);
//specify user for save to workspace
//specify a recovery 0 default recovery and aggregate, 1 only aggregate, 2 only recovery