git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@144203 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
63c4e12b57
commit
d3fcb61715
|
@ -113,7 +113,7 @@ public class Aggregation {
|
|||
//logger.debug("else -- add getAggregatedRecord");
|
||||
records.add(getAggregatedRecord(record));
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.debug("pre Exception but records");
|
||||
records.add(record);
|
||||
logger.debug("Exception but records Add e:{}",e);
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
persisted=PersistTo.MASTER;
|
||||
}
|
||||
|
||||
logger.debug("Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,recoveryMode);
|
||||
logger.debug("-Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,recoveryMode);
|
||||
logger.debug("persist:{} backup:{}",persisted.toString(),backup);
|
||||
if(!backup){
|
||||
logger.warn("Attention backup disabled");
|
||||
|
@ -378,16 +378,18 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
}
|
||||
catch(InvalidValueException ex){
|
||||
logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i);
|
||||
logger.warn("Runtime Exception exr",ex);
|
||||
if ((i==5)&&(documentJson!=null)){
|
||||
documentElaborate.add(documentJson);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
catch(RuntimeException exr){
|
||||
logger.warn("Runtime Exception exr",exr);
|
||||
logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i);
|
||||
logger.warn("Runtime Exception exr",exr);
|
||||
if ((i==5)&&(documentJson!=null)){
|
||||
documentElaborate.add(documentJson);
|
||||
logger.debug("Record is elaborate");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -397,6 +399,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
logger.error("i:{}",i);
|
||||
if ((i==5)&&(documentJson!=null)){
|
||||
documentElaborate.add(documentJson);
|
||||
logger.debug("Record is elaborate");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -454,14 +457,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
nameFile =nameFile+"-"+UUID.randomUUID();
|
||||
ManagementFileBackup.getInstance().onCreateStringToFile(notDeleted,Constant.FILE_RECORD_NO_AGGREGATE+"_"+nameFile,false);
|
||||
ManagementFileBackup.getInstance().onCreateStringToFile(notInserted,Constant.FILE_RECORD_AGGREGATE+"_"+nameFile,true);
|
||||
|
||||
List<JsonDocument> notDeletedTemp = null;
|
||||
while ((index < Constant.NUM_RETRY) && !succesfulDelete){
|
||||
List<JsonDocument> notDeletedTemp = new ArrayList<JsonDocument>();
|
||||
notDeletedTemp = new ArrayList<JsonDocument>();
|
||||
for (JsonDocument doc: notDeleted){
|
||||
if (index>0){
|
||||
logger.trace("delete Start {} pass",index);
|
||||
}
|
||||
countDelete ++;
|
||||
try{
|
||||
|
||||
//accountingBucket.remove(doc.id(),PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
|
||||
accountingBucket.remove(doc.id(),persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
|
||||
}catch(Exception e){
|
||||
logger.warn("doc:{} not deleted retry:{} for error:{}",doc.id(),index,e);
|
||||
|
@ -489,15 +493,17 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
succesfulDelete=true;
|
||||
}
|
||||
else {
|
||||
logger.trace("First pass no delete all");
|
||||
index++;
|
||||
notDeleted = new ArrayList<JsonDocument>(notDeletedTemp);
|
||||
Thread.sleep(1000);
|
||||
logger.trace("First pass no delete all succesfulDelete:{} index:{}",succesfulDelete,index);
|
||||
}
|
||||
}
|
||||
if (!succesfulDelete){
|
||||
logger.error("Error Delete record");
|
||||
}
|
||||
|
||||
logger.debug("notDeletedTemp size:{} notDeleted:{}",notDeletedTemp.size(),notDeleted.size());
|
||||
logger.debug("Delete complete:{}, Start a insert aggregated document:{}",countDelete,notInserted.size());
|
||||
// delete all record and ready for insert a new aggregated record
|
||||
if (succesfulDelete){
|
||||
|
@ -508,6 +514,9 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
while ((index < Constant.NUM_RETRY) && !succesfulInsert){
|
||||
List<JsonDocument> notInsertedTemp = new ArrayList<JsonDocument>();
|
||||
for (JsonDocument document: notInserted){
|
||||
if (index>0){
|
||||
logger.trace("insert Start {} pass for document:{}",index,document.toString());
|
||||
}
|
||||
countInsert ++;
|
||||
try{
|
||||
//JsonDocument response = accountingBucket.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
|
||||
|
@ -540,10 +549,11 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
|
|||
succesfulInsert=true;
|
||||
}
|
||||
else {
|
||||
logger.trace("First pass no insert all");
|
||||
|
||||
index++;
|
||||
notInserted = new ArrayList<JsonDocument>(notInsertedTemp);
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(1000);
|
||||
logger.trace("First pass no insert all succesfulInsert:{} index:{}",succesfulInsert,index);
|
||||
}
|
||||
}
|
||||
if (!succesfulInsert){
|
||||
|
|
Loading…
Reference in New Issue