Added 3 retry while elaborating row
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@154566 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
81ee077a94
commit
d7deb6e907
|
@ -117,6 +117,58 @@ public class Aggregator {
|
|||
private static final String SINGLE = "Single";
|
||||
private static final String SIMPLE = "Simple";
|
||||
|
||||
|
||||
protected int elaborateRow(ViewRow row, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
|
||||
try {
|
||||
JsonObject content = row.document().content();
|
||||
|
||||
if(content.containsKey(USAGE_RECORD_TYPE)){
|
||||
String recordType = content.getString(USAGE_RECORD_TYPE);
|
||||
content.removeKey(USAGE_RECORD_TYPE);
|
||||
content.put(Record.RECORD_TYPE, recordType);
|
||||
}
|
||||
|
||||
Boolean aggregated = false;
|
||||
if(content.containsKey(AggregatedRecord.AGGREGATED)){
|
||||
aggregated = content.getBoolean(AggregatedRecord.AGGREGATED);
|
||||
}
|
||||
|
||||
if(!aggregated){
|
||||
String recordType = content.getString(Record.RECORD_TYPE);
|
||||
content.put(Record.RECORD_TYPE, SINGLE + recordType);
|
||||
}
|
||||
|
||||
|
||||
String recordType = content.getString(Record.RECORD_TYPE);
|
||||
if(recordType.contains(SIMPLE)){
|
||||
recordType.replace(SIMPLE, SINGLE);
|
||||
}
|
||||
|
||||
String record = content.toString();
|
||||
|
||||
// Backup the Record on local file
|
||||
Utility.printLine(originalRecordsbackupFile, record);
|
||||
|
||||
// Aggregate the Record
|
||||
aggregateRow(aggregatorBuffer, record);
|
||||
|
||||
++originalRecordsCounter;
|
||||
if(originalRecordsCounter%1000==0){
|
||||
int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
|
||||
int diff = originalRecordsCounter - aggregatedRecordsNumber;
|
||||
float percentage = (100 * diff) / originalRecordsCounter;
|
||||
logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents",
|
||||
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
|
||||
}
|
||||
|
||||
return originalRecordsCounter;
|
||||
}catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private static final int MAX_RETRY = 3;
|
||||
|
||||
protected void retrieveAndAggregate(ViewResult viewResult) throws Exception {
|
||||
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
|
||||
|
||||
|
@ -130,50 +182,15 @@ public class Aggregator {
|
|||
malformedRecordNumber = 0;
|
||||
int originalRecordsCounter = 0;
|
||||
for (ViewRow row : viewResult) {
|
||||
try {
|
||||
JsonObject content = row.document().content();
|
||||
|
||||
if(content.containsKey(USAGE_RECORD_TYPE)){
|
||||
String recordType = content.getString(USAGE_RECORD_TYPE);
|
||||
content.removeKey(USAGE_RECORD_TYPE);
|
||||
content.put(Record.RECORD_TYPE, recordType);
|
||||
for(int i=1; i<=MAX_RETRY; i++){
|
||||
try {
|
||||
originalRecordsCounter = elaborateRow(row, aggregatorBuffer, originalRecordsCounter);
|
||||
break;
|
||||
}catch (Exception e) {
|
||||
if(i==2){
|
||||
logger.error("Unable to elaborate {} {}. Tryed {} times.", ViewRow.class.getSimpleName(), row, i, e);
|
||||
}
|
||||
}
|
||||
|
||||
Boolean aggregated = false;
|
||||
if(content.containsKey(AggregatedRecord.AGGREGATED)){
|
||||
aggregated = content.getBoolean(AggregatedRecord.AGGREGATED);
|
||||
}
|
||||
|
||||
if(!aggregated){
|
||||
String recordType = content.getString(Record.RECORD_TYPE);
|
||||
content.put(Record.RECORD_TYPE, SINGLE + recordType);
|
||||
}
|
||||
|
||||
|
||||
String recordType = content.getString(Record.RECORD_TYPE);
|
||||
if(recordType.contains(SIMPLE)){
|
||||
recordType.replace(SIMPLE, SINGLE);
|
||||
}
|
||||
|
||||
String record = content.toString();
|
||||
|
||||
// Backup the Record on local file
|
||||
Utility.printLine(originalRecordsbackupFile, record);
|
||||
|
||||
// Aggregate the Record
|
||||
aggregateRow(aggregatorBuffer, record);
|
||||
|
||||
++originalRecordsCounter;
|
||||
if(originalRecordsCounter%1000==0){
|
||||
int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
|
||||
int diff = originalRecordsCounter - aggregatedRecordsNumber;
|
||||
float percentage = (100 * diff) / originalRecordsCounter;
|
||||
logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents",
|
||||
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
|
||||
}
|
||||
}catch (Exception e) {
|
||||
logger.error("Unable to elaborate {} {}", ViewRow.class.getSimpleName(), row, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue