diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index d06db06..0cbe410 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -117,6 +117,58 @@ public class Aggregator { private static final String SINGLE = "Single"; private static final String SIMPLE = "Simple"; + + protected int elaborateRow(ViewRow row, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { + try { + JsonObject content = row.document().content(); + + if(content.containsKey(USAGE_RECORD_TYPE)){ + String recordType = content.getString(USAGE_RECORD_TYPE); + content.removeKey(USAGE_RECORD_TYPE); + content.put(Record.RECORD_TYPE, recordType); + } + + Boolean aggregated = false; + if(content.containsKey(AggregatedRecord.AGGREGATED)){ + aggregated = content.getBoolean(AggregatedRecord.AGGREGATED); + } + + if(!aggregated){ + String recordType = content.getString(Record.RECORD_TYPE); + content.put(Record.RECORD_TYPE, SINGLE + recordType); + } + + + String recordType = content.getString(Record.RECORD_TYPE); + if(recordType.contains(SIMPLE)){ + recordType.replace(SIMPLE, SINGLE); + } + + String record = content.toString(); + + // Backup the Record on local file + Utility.printLine(originalRecordsbackupFile, record); + + // Aggregate the Record + aggregateRow(aggregatorBuffer, record); + + ++originalRecordsCounter; + if(originalRecordsCounter%1000==0){ + int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); + int diff = originalRecordsCounter - aggregatedRecordsNumber; + float percentage = (100 * diff) / originalRecordsCounter; + logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents", + aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage); + } + + return originalRecordsCounter; + }catch (Exception e) { + throw e; + } + } + + private static final int MAX_RETRY = 3; + protected void retrieveAndAggregate(ViewResult viewResult) throws Exception { AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); @@ -130,50 +182,15 @@ public class Aggregator { malformedRecordNumber = 0; int originalRecordsCounter = 0; for (ViewRow row : viewResult) { - try { - JsonObject content = row.document().content(); - - if(content.containsKey(USAGE_RECORD_TYPE)){ - String recordType = content.getString(USAGE_RECORD_TYPE); - content.removeKey(USAGE_RECORD_TYPE); - content.put(Record.RECORD_TYPE, recordType); + for(int i=1; i<=MAX_RETRY; i++){ + try { + originalRecordsCounter = elaborateRow(row, aggregatorBuffer, originalRecordsCounter); + break; + }catch (Exception e) { + if(i==2){ + logger.error("Unable to elaborate {} {}. Tryed {} times.", ViewRow.class.getSimpleName(), row, i, e); + } } - - Boolean aggregated = false; - if(content.containsKey(AggregatedRecord.AGGREGATED)){ - aggregated = content.getBoolean(AggregatedRecord.AGGREGATED); - } - - if(!aggregated){ - String recordType = content.getString(Record.RECORD_TYPE); - content.put(Record.RECORD_TYPE, SINGLE + recordType); - } - - - String recordType = content.getString(Record.RECORD_TYPE); - if(recordType.contains(SIMPLE)){ - recordType.replace(SIMPLE, SINGLE); - } - - String record = content.toString(); - - // Backup the Record on local file - Utility.printLine(originalRecordsbackupFile, record); - - // Aggregate the Record - aggregateRow(aggregatorBuffer, record); - - ++originalRecordsCounter; - if(originalRecordsCounter%1000==0){ - int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); - int diff = originalRecordsCounter - aggregatedRecordsNumber; - float percentage = (100 * diff) / originalRecordsCounter; - logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents", - aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage); - } - }catch (Exception e) { - logger.error("Unable to elaborate {} {}", ViewRow.class.getSimpleName(), row, e); - throw e; } }