Alessandro Pieve 2017-02-21 13:17:41 +00:00
parent 27c4848eeb
commit 651a5b853c
3 changed files with 32 additions and 28 deletions

View File

@ -136,14 +136,14 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
if (inputs.containsKey("typePersisted"))
typePersisted=(Integer)inputs.get("typePersisted");
switch(typePersisted) {
case 0:
persisted=PersistTo.MASTER;
break;
case 1:
persisted=PersistTo.ONE;
break;
default:
persisted=PersistTo.MASTER;
case 0:
persisted=PersistTo.MASTER;
break;
case 1:
persisted=PersistTo.ONE;
break;
default:
persisted=PersistTo.MASTER;
}
logger.debug("Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,recoveryMode);
@ -155,7 +155,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
if (inputs.containsKey("intervalStep")){
logger.debug("Interval is not considered, aggregate only :{} step",interval);
}
//Get Configuration from service end point
String url=null;
String password =null;
@ -323,8 +323,9 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
aggregate = new Aggregation();
documentElaborate.clear();
logger.debug("Start elaborate row");
Boolean resultElaborate=false;
for (ViewRow row : viewResult)
elaborateRow(row,documentElaborate);
resultElaborate=elaborateRow(row,documentElaborate);
logger.debug("End elaborate row");
//File backup have a name with scope e
String nameFileBackup="";
@ -334,6 +335,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
nameFileBackup=startKeyString+"-"+endKeyString;
//save into db (delete no aggregate record and insert a record aggregate)
reallyFlush(aggregate,documentElaborate,nameFileBackup);
endKeyString = startKeyString;
}
return true;
@ -348,7 +350,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
* @return
* @throws Exception
*/
protected boolean elaborateRow(ViewRow row ,List<JsonDocument> documentElaborate) throws Exception{
protected Boolean elaborateRow(ViewRow row ,List<JsonDocument> documentElaborate) throws Exception{
int i=0;
try {
//patch for field of long type
@ -358,15 +360,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
i=2;//2
//prepare a document for elaborate
String identifier=(String) row.document().content().get("id");
i=5;//5
i=3;//3
JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
i=6;//6
i=4;//4
@SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map);
i=3;//3
i=5;//5
aggregate.aggregate(record);
i=4;//4
i=6;//6
//insert an elaborate row into list JsonDocument for memory document elaborate
documentElaborate.add(documentJson);
i=7;//7
@ -374,11 +376,11 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
}
catch(InvalidValueException ex){
logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i);
return true;
return false;
}
catch(RuntimeException exr){
logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i);
return true;
return false;
}
catch (Exception e) {
logger.error("record is not elaborated:"+row.toString()+" but it will be persisted");
@ -450,15 +452,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
//accountingBucket.remove(doc.id(),PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
accountingBucket.remove(doc.id(),persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}catch(Exception e){
logger.trace("doc:{} not deleted retry:{}",doc.id(),index);
logger.warn("doc:{} not deleted retry:{} for error:{}",doc.id(),index,e);
Thread.sleep(1500);
try{
if (accountingBucket.exists(doc.id()))
notDeletedTemp.add(doc);
}
catch(Exception ext){
logger.warn("doc:{} not verify for delete because timeout, retry",doc.id());
Thread.sleep(6000);
logger.warn("doc:{} not verify for delete because timeout, retry:{}",doc.id(),index);
Thread.sleep(3000);
try{
if (!accountingBucket.exists(doc.id()))
notDeletedTemp.add(doc);
@ -475,6 +477,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
succesfulDelete=true;
}
else {
logger.trace("First pass no delete all");
index++;
notDeleted = new ArrayList<JsonDocument>(notDeletedTemp);
Thread.sleep(1000);
@ -499,7 +502,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
JsonDocument response = accountingBucket.upsert(document,persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
catch(Exception e){
logger.trace("record:{} not insert retry:{} ",document.id(),index);
logger.warn("record:{} not insert retry:{} for error:{}",document.id(),index,e);
Thread.sleep(1500);
try{
@ -525,6 +528,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
succesfulInsert=true;
}
else {
logger.trace("First pass no insert all");
index++;
notInserted = new ArrayList<JsonDocument>(notInsertedTemp);
Thread.sleep(1000);

View File

@ -91,11 +91,12 @@ public class RecoveryRecord {
@SuppressWarnings("null")
public static void searchFile(Cluster cluster,AggregatorPersistenceBackendQueryConfiguration configuration) throws Exception{
try{
prepareConnection(cluster,configuration);
File folderDelete = new File(Constant.PATH_DIR_BACKUP_DELETE);
if (folderDelete.exists() && folderDelete.isDirectory()) {
logger.trace("Start Recovery delete");
File[] listOfFilesDelete = folderDelete.listFiles();
for (int i = 0; i < listOfFilesDelete.length; i++) {
if (listOfFilesDelete[i].isFile()){
@ -114,6 +115,7 @@ public class RecoveryRecord {
//search for insert file
File folderInsert= new File(Constant.PATH_DIR_BACKUP_INSERT);
if (folderInsert.exists() && folderInsert.isDirectory()) {
logger.trace("Start Recovery insert");
File[] listOfFilesInsert = folderInsert.listFiles();
for (int i = 0; i < listOfFilesInsert.length; i++) {
if (listOfFilesInsert[i].isFile()) {

View File

@ -20,17 +20,15 @@ public class Tests {
@Before
public void beforeTest(){
}
@Test
public void testLaunch() throws Exception {
SecurityTokenProvider.instance.set("36501a0d-a205-4bf1-87ad-4c7185faa0d6-98187548");
//FOR DEBUG
String scopeDebug="/gcube/devNext";
ScopeProvider.instance.set(scopeDebug);
// END FOR DEBUG
}
@Test
public void testLaunch() throws Exception {
Map<String, Object> inputs = new HashMap<String, Object>();
//type aggregation