Patch for old records
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@153168 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
db2cb04b0f
commit
a6d58dc724
|
@ -12,10 +12,11 @@ import org.gcube.accounting.aggregator.utility.Constant;
|
|||
import org.gcube.accounting.aggregator.utility.Utility;
|
||||
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
|
||||
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
|
||||
import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord;
|
||||
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
||||
import org.gcube.documentstore.exception.InvalidValueException;
|
||||
import org.gcube.documentstore.records.AggregatedRecord;
|
||||
import org.gcube.documentstore.records.DSMapper;
|
||||
import org.gcube.documentstore.records.Record;
|
||||
import org.gcube.documentstore.records.RecordUtility;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -41,6 +42,8 @@ public class Aggregator {
|
|||
protected final Bucket bucket;
|
||||
protected final File originalRecordsbackupFile;
|
||||
protected final File aggregateRecordsBackupFile;
|
||||
protected final File malformedRecordsFile;
|
||||
protected int malformedRecordNumber;
|
||||
|
||||
protected Calendar startTime;
|
||||
|
||||
|
@ -50,6 +53,7 @@ public class Aggregator {
|
|||
this.bucket = bucket;
|
||||
this.originalRecordsbackupFile = originalRecordsbackupFile;
|
||||
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
|
||||
this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile);
|
||||
}
|
||||
|
||||
public void aggregate() throws Exception {
|
||||
|
@ -107,7 +111,9 @@ public class Aggregator {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
protected void retrieveAndAggregate(ViewResult viewResult) throws Exception {
|
||||
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
|
||||
|
||||
|
@ -116,14 +122,32 @@ public class Aggregator {
|
|||
|
||||
originalRecordsbackupFile.delete();
|
||||
aggregateRecordsBackupFile.delete();
|
||||
malformedRecordsFile.delete();
|
||||
|
||||
malformedRecordNumber = 0;
|
||||
int originalRecordsCounter = 0;
|
||||
for (ViewRow row : viewResult) {
|
||||
try {
|
||||
JsonObject content = row.document().content();
|
||||
|
||||
if(content.containsKey("usageRecordType")){
|
||||
String recordType = content.getString("usageRecordType");
|
||||
content.removeKey("usageRecordType");
|
||||
content.put(Record.RECORD_TYPE, recordType);
|
||||
}
|
||||
|
||||
Boolean aggregated = false;
|
||||
if(content.containsKey(AggregatedRecord.AGGREGATED)){
|
||||
aggregated = content.getBoolean(AggregatedRecord.AGGREGATED);
|
||||
}
|
||||
|
||||
if(!aggregated){
|
||||
String recordType = content.getString(Record.RECORD_TYPE);
|
||||
content.put(Record.RECORD_TYPE, "Simple" + recordType);
|
||||
}
|
||||
|
||||
|
||||
String record = content.toString();
|
||||
record = record.replace("usageRecordType", "recordType");
|
||||
|
||||
// Backup the Record on local file
|
||||
Utility.printLine(originalRecordsbackupFile, record);
|
||||
|
@ -159,41 +183,11 @@ public class Aggregator {
|
|||
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
|
||||
aggregateRecordsBackupFile);
|
||||
|
||||
File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile);
|
||||
malformedRecords.delete();
|
||||
int malformedRecordNumber = 0;
|
||||
|
||||
|
||||
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
|
||||
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
|
||||
String marshalled = null;
|
||||
try{
|
||||
marshalled = DSMapper.marshal(aggregatedRecord);
|
||||
aggregatedRecord.setId(UUID.randomUUID().toString());
|
||||
marshalled = DSMapper.marshal(aggregatedRecord);
|
||||
}catch (Exception e) {
|
||||
logger.error("{}", aggregatedRecord, e.getMessage());
|
||||
|
||||
++malformedRecordNumber;
|
||||
Utility.printLine(malformedRecords, aggregatedRecord.toString());
|
||||
|
||||
if(aggregatedRecord instanceof AbstractServiceUsageRecord){
|
||||
if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){
|
||||
aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION));
|
||||
}
|
||||
if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) {
|
||||
aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
marshalled = DSMapper.marshal(aggregatedRecord);
|
||||
aggregatedRecord.setId(UUID.randomUUID().toString());
|
||||
marshalled = DSMapper.marshal(aggregatedRecord);
|
||||
}catch (Exception ex) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
String marshalled = DSMapper.marshal(aggregatedRecord);
|
||||
JsonObject jsonObject = JsonObject.fromJson(marshalled);
|
||||
Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString());
|
||||
}
|
||||
|
@ -204,11 +198,31 @@ public class Aggregator {
|
|||
aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true);
|
||||
}
|
||||
|
||||
|
||||
protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception {
|
||||
Record record = RecordUtility.getRecord(json);
|
||||
|
||||
try {
|
||||
record.validate();
|
||||
}catch (InvalidValueException e) {
|
||||
++malformedRecordNumber;
|
||||
Utility.printLine(malformedRecordsFile, json);
|
||||
|
||||
if(record instanceof AggregatedServiceUsageRecord){
|
||||
if(record.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){
|
||||
record.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION));
|
||||
}
|
||||
if(record.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) {
|
||||
record.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION));
|
||||
}
|
||||
}
|
||||
record.validate();
|
||||
}
|
||||
record.setId(UUID.randomUUID().toString());
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
AggregatedRecord record = (AggregatedRecord) RecordUtility.getRecord(json);
|
||||
//record.setId(UUID.randomUUID().toString());
|
||||
aggregatorBuffer.aggregate(record);
|
||||
AggregatedRecord aggregatedRecord = AggregatorBuffer.getAggregatedRecord(record);
|
||||
aggregatorBuffer.aggregate(aggregatedRecord);
|
||||
}
|
||||
|
||||
protected JsonDocument getJsonDocument(ViewRow row) {
|
||||
|
|
|
@ -129,6 +129,8 @@ public class Elaborator {
|
|||
aggregateRecordsBackupFile);
|
||||
aggregator.aggregate();
|
||||
|
||||
|
||||
|
||||
Calendar now = Utility.getUTCCalendarInstance();
|
||||
/*
|
||||
* now is passed as argument to isTimeElapsed function to avoid situation
|
||||
|
|
Loading…
Reference in New Issue