Added malformed records management

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@153108 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-09-15 08:40:03 +00:00
parent 7c45f425dc
commit 4466e2bbac
5 changed files with 67 additions and 4 deletions

View File

@ -11,6 +11,9 @@ import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.RecordUtility; import org.gcube.documentstore.records.RecordUtility;
@ -156,9 +159,41 @@ public class Aggregator {
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
aggregateRecordsBackupFile); aggregateRecordsBackupFile);
File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile);
malformedRecords.delete();
int malformedRecordNumber = 0;
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) { for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord); String marshalled = null;
try{
marshalled = DSMapper.marshal(aggregatedRecord);
aggregatedRecord.setId(UUID.randomUUID().toString());
marshalled = DSMapper.marshal(aggregatedRecord);
}catch (Exception e) {
logger.error("{}", aggregatedRecord, e.getMessage());
++malformedRecordNumber;
Utility.printLine(malformedRecords, aggregatedRecord.toString());
if(aggregatedRecord instanceof AbstractServiceUsageRecord){
if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){
aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION));
}
if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) {
aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION));
}
}
try {
marshalled = DSMapper.marshal(aggregatedRecord);
aggregatedRecord.setId(UUID.randomUUID().toString());
marshalled = DSMapper.marshal(aggregatedRecord);
}catch (Exception ex) {
throw ex;
}
}
JsonObject jsonObject = JsonObject.fromJson(marshalled); JsonObject jsonObject = JsonObject.fromJson(marshalled);
Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString()); Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString());
} }
@ -166,13 +201,14 @@ public class Aggregator {
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size()); aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size());
aggregationStatus.setMalformedRecordNumber(malformedRecordNumber);
aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true); aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true);
} }
protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception { protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord) RecordUtility.getRecord(json); AggregatedRecord record = (AggregatedRecord) RecordUtility.getRecord(json);
record.setId(UUID.randomUUID().toString()); //record.setId(UUID.randomUUID().toString());
aggregatorBuffer.aggregate(record); aggregatorBuffer.aggregate(record);
} }

View File

@ -46,6 +46,10 @@ public class Persist {
private void setAggregationStateToCompleted(Calendar now) throws Exception { private void setAggregationStateToCompleted(Calendar now) throws Exception {
originalRecordsbackupFile.delete(); originalRecordsbackupFile.delete();
aggregateRecordsBackupFile.delete(); aggregateRecordsBackupFile.delete();
File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile);
if(malformedRecords.exists()){
malformedRecords.delete();
}
aggregationStatus.setState(AggregationState.COMPLETED, now, true); aggregationStatus.setState(AggregationState.COMPLETED, now, true);
} }
@ -87,8 +91,14 @@ public class Persist {
WorkSpaceDirectoryStructure workspaceDirectoryStructure = new WorkSpaceDirectoryStructure(); WorkSpaceDirectoryStructure workspaceDirectoryStructure = new WorkSpaceDirectoryStructure();
String targetFolder = workspaceDirectoryStructure.getTargetFolder(aggregationStatus.getAggregationInfo().getAggregationType(), aggregationStatus.getAggregationInfo().getAggregationStartDate()); String targetFolder = workspaceDirectoryStructure.getTargetFolder(aggregationStatus.getAggregationInfo().getAggregationType(), aggregationStatus.getAggregationInfo().getAggregationStartDate());
File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile);
if(malformedRecords.exists()){
WorkSpaceManagement.zipAndBackupFiles(targetFolder,
originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, "-with-malformed"), originalRecordsbackupFile, aggregateRecordsBackupFile, malformedRecords);
}else{
WorkSpaceManagement.zipAndBackupFiles(targetFolder, WorkSpaceManagement.zipAndBackupFiles(targetFolder,
originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ""), originalRecordsbackupFile, aggregateRecordsBackupFile); originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ""), originalRecordsbackupFile, aggregateRecordsBackupFile);
}
setAggregationStateToCompleted(now); setAggregationStateToCompleted(now);
} }

View File

@ -147,7 +147,7 @@ public class CouchBaseConnector {
Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY)); Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY));
connectionMap.put(bucketKey, bucket); connectionMap.put(bucketKey, bucket);
}catch (Exception e) { }catch (Exception e) {
logger.info("Unable to open Bucket for type {}. This normally mean that is configured.", recordClass); logger.info("Unable to open Bucket for type {}. This normally mean that is not configured.", recordClass);
} }
} }
} }

View File

@ -38,6 +38,9 @@ public class AggregationStatus {
@JsonProperty @JsonProperty
protected int recoveredRecordNumber; protected int recoveredRecordNumber;
@JsonProperty
protected int malformedRecordNumber;
@JsonProperty @JsonProperty
protected float percentage; protected float percentage;
@ -77,6 +80,7 @@ public class AggregationStatus {
this.aggregationInfo = aggregationInfo; this.aggregationInfo = aggregationInfo;
this.aggregationStateEvents = new ArrayList<>(); this.aggregationStateEvents = new ArrayList<>();
this.uuid = UUID.randomUUID(); this.uuid = UUID.randomUUID();
this.malformedRecordNumber = 0;
} }
public AggregationInfo getAggregationInfo() { public AggregationInfo getAggregationInfo() {
@ -145,4 +149,12 @@ public class AggregationStatus {
this.context = context; this.context = context;
} }
public int getMalformedRecordNumber() {
return malformedRecordNumber;
}
public void setMalformedRecordNumber(int malformedRecordNumber) {
this.malformedRecordNumber = malformedRecordNumber;
}
} }

View File

@ -165,4 +165,9 @@ public class Utility {
throw new Exception("Unable to retrieve user.", e); throw new Exception("Unable to retrieve user.", e);
} }
} }
public static File getMalformatedFile(File aggregateRecordsBackupFile){
return new File(aggregateRecordsBackupFile.getParent(), aggregateRecordsBackupFile.getName().replaceAll("aggregated", "malformed"));
}
} }