From 4466e2bbac38b59b4bef4004f4b31519c1c54f19 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Fri, 15 Sep 2017 08:40:03 +0000 Subject: [PATCH] Added malformed records management git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@153108 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../aggregator/aggregation/Aggregator.java | 40 ++++++++++++++++++- .../aggregator/persist/Persist.java | 12 +++++- .../persistence/CouchBaseConnector.java | 2 +- .../aggregator/status/AggregationStatus.java | 12 ++++++ .../aggregator/utility/Utility.java | 5 +++ 5 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index 0828c42..83d91b6 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -11,6 +11,9 @@ import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.AggregatedUsageRecord; +import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; +import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; +import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.RecordUtility; @@ -156,9 +159,41 @@ public class Aggregator { logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), aggregateRecordsBackupFile); + File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile); + malformedRecords.delete(); + int malformedRecordNumber = 0; + List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); for (AggregatedRecord aggregatedRecord : aggregatedRecords) { - String marshalled = DSMapper.marshal(aggregatedRecord); + String marshalled = null; + try{ + marshalled = DSMapper.marshal(aggregatedRecord); + aggregatedRecord.setId(UUID.randomUUID().toString()); + marshalled = DSMapper.marshal(aggregatedRecord); + }catch (Exception e) { + logger.error("{}", aggregatedRecord, e.getMessage()); + + ++malformedRecordNumber; + Utility.printLine(malformedRecords, aggregatedRecord.toString()); + + if(aggregatedRecord instanceof AbstractServiceUsageRecord){ + if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){ + aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION)); + } + if(aggregatedRecord.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) { + aggregatedRecord.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, aggregatedRecord.getResourceProperty(ServiceUsageRecord.DURATION)); + } + } + + try { + marshalled = DSMapper.marshal(aggregatedRecord); + aggregatedRecord.setId(UUID.randomUUID().toString()); + marshalled = DSMapper.marshal(aggregatedRecord); + }catch (Exception ex) { + throw ex; + } + } + JsonObject jsonObject = JsonObject.fromJson(marshalled); Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString()); } @@ -166,13 +201,14 @@ public class Aggregator { aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size()); + aggregationStatus.setMalformedRecordNumber(malformedRecordNumber); aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true); } protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception { @SuppressWarnings("rawtypes") AggregatedRecord record = (AggregatedRecord) RecordUtility.getRecord(json); - record.setId(UUID.randomUUID().toString()); + //record.setId(UUID.randomUUID().toString()); aggregatorBuffer.aggregate(record); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java index e811c75..8456a1a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java @@ -46,6 +46,10 @@ public class Persist { private void setAggregationStateToCompleted(Calendar now) throws Exception { originalRecordsbackupFile.delete(); aggregateRecordsBackupFile.delete(); + File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile); + if(malformedRecords.exists()){ + malformedRecords.delete(); + } aggregationStatus.setState(AggregationState.COMPLETED, now, true); } @@ -87,8 +91,14 @@ public class Persist { WorkSpaceDirectoryStructure workspaceDirectoryStructure = new WorkSpaceDirectoryStructure(); String targetFolder = workspaceDirectoryStructure.getTargetFolder(aggregationStatus.getAggregationInfo().getAggregationType(), aggregationStatus.getAggregationInfo().getAggregationStartDate()); - WorkSpaceManagement.zipAndBackupFiles(targetFolder, + File malformedRecords = Utility.getMalformatedFile(aggregateRecordsBackupFile); + if(malformedRecords.exists()){ + WorkSpaceManagement.zipAndBackupFiles(targetFolder, + originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, "-with-malformed"), originalRecordsbackupFile, aggregateRecordsBackupFile, malformedRecords); + }else{ + WorkSpaceManagement.zipAndBackupFiles(targetFolder, originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ""), originalRecordsbackupFile, aggregateRecordsBackupFile); + } setAggregationStateToCompleted(now); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java index 79dcf74..6aacaa6 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java @@ -147,7 +147,7 @@ public class CouchBaseConnector { Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY)); connectionMap.put(bucketKey, bucket); }catch (Exception e) { - logger.info("Unable to open Bucket for type {}. This normally mean that is configured.", recordClass); + logger.info("Unable to open Bucket for type {}. This normally mean that is not configured.", recordClass); } } } diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index c98de89..7a3ac5a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -38,6 +38,9 @@ public class AggregationStatus { @JsonProperty protected int recoveredRecordNumber; + @JsonProperty + protected int malformedRecordNumber; + @JsonProperty protected float percentage; @@ -77,6 +80,7 @@ public class AggregationStatus { this.aggregationInfo = aggregationInfo; this.aggregationStateEvents = new ArrayList<>(); this.uuid = UUID.randomUUID(); + this.malformedRecordNumber = 0; } public AggregationInfo getAggregationInfo() { @@ -145,4 +149,12 @@ public class AggregationStatus { this.context = context; } + public int getMalformedRecordNumber() { + return malformedRecordNumber; + } + + public void setMalformedRecordNumber(int malformedRecordNumber) { + this.malformedRecordNumber = malformedRecordNumber; + } + } diff --git a/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java b/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java index 13588c4..f20ab41 100644 --- a/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java +++ b/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java @@ -165,4 +165,9 @@ public class Utility { throw new Exception("Unable to retrieve user.", e); } } + + + public static File getMalformatedFile(File aggregateRecordsBackupFile){ + return new File(aggregateRecordsBackupFile.getParent(), aggregateRecordsBackupFile.getName().replaceAll("aggregated", "malformed")); + } }