Fix warning and retry

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@141773 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2017-01-25 14:06:16 +00:00
parent 6d851f0db6
commit ec7773313a
5 changed files with 198 additions and 114 deletions

View File

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-aggregator-se-plugin</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
<name>Accounting Aggregator</name>
<description>Accounting Aggregator Smart Executor Plugin</description>

View File

@ -25,7 +25,9 @@ public class Constant {
public static final Integer CONNECTION_TIMEOUT=10;
public static final Integer NUM_RETRY=5;
public static final Integer CONNECTION_TIMEOUT_BUCKET=10;
public static final Integer CONNECTION_TIMEOUT=15;
public static final Integer NUM_RETRY=6;
public static final Integer CONNECTION_TIMEOUT_BUCKET=15;
public static final Integer VIEW_TIMEOUT_BUCKET=120;
public static final Integer MAX_REQUEST_LIFE_TIME=120;
}

View File

@ -32,9 +32,13 @@ public class Aggregation {
//count buffer records
protected int totalBufferedRecords;
//list Aggregate record
protected Map<String, List<AggregatedRecord<?,?>>> bufferedRecords = new HashMap<String, List<AggregatedRecord<?,?>>>();
//list Aggregate record
//TODO RIMETTERE A PROTECTED
public Map<String, List<AggregatedRecord<?,?>>> bufferedRecords = new HashMap<String, List<AggregatedRecord<?,?>>>();
public Aggregation() {
super();
}
@ -70,57 +74,52 @@ public class Aggregation {
List<AggregatedRecord<?,?>> records;
if(this.bufferedRecords.containsKey(recordType)){
records = this.bufferedRecords.get(recordType);
//logger.debug("value endtime{}, type endtime{}",record.getEndTime());
boolean found = false;
for(AggregatedRecord bufferedRecord : records){
if(!(bufferedRecord instanceof AggregatedRecord)){
continue;
}
AggregationUtility util = new AggregationUtility(bufferedRecord);
//verify a record is aggregable
//verify a record is aggregable
//logger.debug("record: {}",record.toString());
if (util.isAggregable(record)){
try {
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
// TODO check compatibility using getAggregable
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
logger.debug("if -- madeAggregation aggregate");
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
//patch for not changed a creation time
//bufferedAggregatedRecord.setCreationTime(bufferedAggregatedRecord.getStartTime());
bufferedAggregatedRecord.setCreationTime(record.getCreationTime());
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
logger.trace("{} is not usable for aggregation", bufferedRecord);
logger.debug("{} is not usable for aggregation", bufferedRecord);
}
}
}
if(!found){
//logger.debug("Aggregated Record not found with execption");
logger.debug("if -- madeAggregation not found with execption add");
records.add(record);
totalBufferedRecords++;
return;
}
}else{
//logger.debug("else if record contains "+recordType);
records = new ArrayList<AggregatedRecord<?,?>>();
try {
records.add(getAggregatedRecord(record));
logger.debug("else -- add getAggregatedRecord");
records.add(getAggregatedRecord(record));
} catch (Exception e) {
logger.debug("else -- add Exception");
records.add(record);
}
totalBufferedRecords++;
this.bufferedRecords.put(recordType, records);
}
}
}

View File

@ -61,11 +61,16 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
private final static String LINE_SUFFIX = "}";
private final static String KEY_VALUE_PAIR_SEPARATOR = ",";
private final static String KEY_VALUE_LINKER = "=";
public static Integer countInsert=0;
public static Integer countDelete=0;
public static Integer RecoveryMode=0;
public static Integer recoveryMode=0;
public Boolean backup=true;
//value if 0 PersistTo.MASTER if 1 PersistTo.ONE
public static Integer typePersisted=0;
protected PersistTo persisted ;
/**
* @param runningPluginEvolution
*/
@ -77,8 +82,11 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
protected static final CouchbaseEnvironment ENV =
DefaultCouchbaseEnvironment.builder()
.connectTimeout(Constant.CONNECTION_TIMEOUT * 1000)
.queryTimeout(Constant.CONNECTION_TIMEOUT * 1000)
.keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds
.maxRequestLifetime(Constant.MAX_REQUEST_LIFE_TIME * 1000)
.queryTimeout(Constant.CONNECTION_TIMEOUT * 1000) //15 Seconds in milliseconds
.viewTimeout(Constant.VIEW_TIMEOUT_BUCKET * 1000)//120 Seconds in milliseconds
.keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds
.kvTimeout(5000) //in ms
.build();
/**{@inheritDoc}*/
@ -98,6 +106,10 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
AggregationType aggType =AggregationType.valueOf((String)inputs.get("type"));
Integer interval=(Integer)inputs.get("interval")* aggType.getMultiplierFactor();
//new feature for not elaborate the full range but a set of small intervals
if (inputs.containsKey("intervalStep"))
interval=(Integer)inputs.get("intervalStep");
Integer inputStartTime=null;
if (inputs.containsKey("startTime"))
inputStartTime=(Integer)inputs.get("startTime");
@ -106,19 +118,43 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
String scope=null;
if (inputs.containsKey("currentScope"))
currentScope=(Boolean)inputs.get("currentScope");
if (currentScope)
scope=ScopeProvider.instance.get();
if (inputs.containsKey("user"))
Constant.user=(String)inputs.get("user");
else
Constant.user="service.aggregatorAccounting";
if (inputs.containsKey("recovery"))
RecoveryMode=(Integer)inputs.get("recovery");
recoveryMode=(Integer)inputs.get("recovery");
logger.debug("Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,RecoveryMode);
if (inputs.containsKey("backup"))
backup=(Boolean)inputs.get("backup");
if (inputs.containsKey("typePersisted"))
typePersisted=(Integer)inputs.get("typePersisted");
switch(typePersisted) {
case 0:
persisted=PersistTo.MASTER;
break;
case 1:
persisted=PersistTo.ONE;
break;
default:
persisted=PersistTo.MASTER;
}
logger.debug("Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,recoveryMode);
logger.debug("persist:{} backup:{}",persisted.toString(),backup);
if(!backup){
logger.warn("Attention backup disabled");
Thread.sleep(20000);
}
if (inputs.containsKey("intervalStep")){
logger.debug("Interval is not considered, aggregate only :{} step",interval);
}
//Get Configuration from service end point
String url=null;
@ -144,20 +180,20 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
throw e;
}
Cluster cluster = CouchbaseCluster.create(ENV, url);
//Define a type for aggregate
RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage());
initFolder();
if ((RecoveryMode==2)||(RecoveryMode==0)){
if ((recoveryMode==2)||(recoveryMode==0)){
logger.debug("Recovery mode enabled");
RecoveryRecord.searchFile(cluster,configuration);
}
if (RecoveryMode!=2){
if (recoveryMode!=2){
for (String bucket:listBucket){
logger.trace("OpenBucket:{}",bucket);
accountingBucket = cluster.openBucket(bucket,password);
@ -167,7 +203,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
logger.debug("Complete countInsert{}, countDelete{}",countInsert,countDelete);
}
}
@ -191,10 +227,10 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
DirRoot.mkdir();
}
logger.debug("init folder:{}",Constant.PATH_DIR_BACKUP);
}
/**
* Elaborate a Bucket from startTime to interval
* @param bucket
@ -238,12 +274,16 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
String endAllKeyString = format.format(now.getTime());
String endKeyString = format.format(now.getTime());
//save a record modified into a file and save into a workspace
nowTemp.add(aggType.getCalendarField(), -1*interval);
String startAllKeyString = format.format(nowTemp.getTime());
WorkSpaceManagement.onSaveBackupFile(accountingBucket,bucket,scope,startAllKeyString, endAllKeyString,aggType);
//logger.debug("Backup complete startKeyString{}, endKeyString{}",startAllKeyString,endAllKeyString);
if (backup){
logger.debug("Start Backup");
WorkSpaceManagement.onSaveBackupFile(accountingBucket,bucket,scope,startAllKeyString, endAllKeyString,aggType);
//logger.debug("Backup complete startKeyString{}, endKeyString{}",startAllKeyString,endAllKeyString);
}
else
logger.debug("No Backup required");
List<JsonDocument> documentElaborate=new ArrayList<JsonDocument>();
@ -269,25 +309,23 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
query.endKey(endKey);
query.reduce(false);
query.inclusiveEnd(false);
logger.debug("View Query: startKey:{} - endKey:{} designDocId:{} - viewName:{}",startKey, endKey,designDocId,viewName);
logger.debug("--{}/{} View Query: startKey:{} - endKey:{} designDocId:{} - viewName:{}",i,interval,startKey, endKey,designDocId,viewName);
ViewResult viewResult = null;
try {
viewResult = accountingBucket.query(query);
} catch (Exception e) {
logger.error("ERROR VIEW",e.getLocalizedMessage());
logger.error("Exception error VIEW",e.getLocalizedMessage(),e);
//throw e;
}
// Iterate through the returned ViewRows
aggregate = new Aggregation();
documentElaborate.clear();
logger.debug("Start elaborate row");
for (ViewRow row : viewResult)
elaborateRow(row,documentElaborate);
logger.debug("End elaborate row");
//File backup have a name with scope e
String nameFileBackup="";
if (scope!=null)
@ -311,39 +349,46 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
* @throws Exception
*/
protected boolean elaborateRow(ViewRow row ,List<JsonDocument> documentElaborate) throws Exception{
int i=0;
int size=aggregate.bufferedRecords.size();
try {
//patch for field of long type
String document=row.value().toString().replace("\":", "=").replace("\"", "");
i=1;//1
Map<String,? extends Serializable> map = getMapFromString(document);
i=2;//2
//prepare a document for elaborate
String identifier=(String) row.document().content().get("id");
i=5;//5
JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
i=6;//6
@SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map);
i=3;//3
aggregate.aggregate(record);
i=4;//4
//insert an elaborate row into list JsonDocument for memory document elaborate
String identifier=(String) row.document().content().get("id");
JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
documentElaborate.add(documentJson);
i=7;//7
return true;
}
catch(InvalidValueException ex){
logger.warn("Record is not valid. Anyway, it will be persisted");
logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i);
return true;
}
catch(RuntimeException exr){
logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i);
return true;
}
catch (Exception e) {
logger.error("Error elaborateRow", e,e.getLocalizedMessage());
logger.error("record is not elaborated:"+row.toString()+" but it will be persisted");
logger.error("error elaborateRow", e);
logger.error("i:{}",i);
logger.error("size before:{}, after buffer size:{}",size,aggregate.bufferedRecords.size());
//throw e;
return false;
}
}
@ -354,30 +399,28 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
*/
protected static Map<String, ? extends Serializable> getMapFromString(String serializedMap){
/* Checking line sanity */
if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
return null;
}
/* Cleaning prefix and suffix to parse line */
serializedMap = serializedMap.replace(LINE_FREFIX, "");
serializedMap = serializedMap.replace(LINE_SUFFIX, "");
Map<String, Serializable> map = new HashMap<String,Serializable>();
String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
for (int i=0;i<pairs.length;i++) {
String pair = pairs[i];
pair.trim();
String[] keyValue = pair.split(KEY_VALUE_LINKER);
String key = keyValue[0].trim();
Serializable value = keyValue[1].trim();
map.put(key, value);
}
return map;
if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
return null;
}
/* Cleaning prefix and suffix to parse line */
serializedMap = serializedMap.replace(LINE_FREFIX, "");
serializedMap = serializedMap.replace(LINE_SUFFIX, "");
Map<String, Serializable> map = new HashMap<String,Serializable>();
String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
for (int i=0;i<pairs.length;i++) {
String pair = pairs[i];
pair.trim();
String[] keyValue = pair.split(KEY_VALUE_LINKER);
String key = keyValue[0].trim();
Serializable value = keyValue[1].trim();
map.put(key, value);
}
return map;
}
/**
* Delete a record not aggregate and insert a new record aggregate
* If a problem with delete record, not insert a new record and save a backupfile
@ -392,7 +435,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
if (docs.size()!=0){
Integer index=0;
boolean succesfulDelete=false;
logger.trace("Start a delete document:{}",docs.size());
//before elaborate a record, create a backup file
List<JsonDocument> notDeleted = docs;
List<JsonDocument> notInserted = aggregate.reallyFlush();
@ -406,18 +449,30 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
for (JsonDocument doc: notDeleted){
countDelete ++;
try{
accountingBucket.remove(doc.id(),PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
//accountingBucket.remove(doc.id(),PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
accountingBucket.remove(doc.id(),persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}catch(Exception e){
logger.trace("doc:{} not deleted retry:{}",doc.id(),index);
Thread.sleep(1500);
try{
if (accountingBucket.exists(doc.id()))
notDeletedTemp.add(doc);
}
catch(Exception ex){
logger.warn("doc:{} not verify for delete",doc.id());
}
catch(Exception ext){
logger.warn("doc:{} not verify for delete because timeout, retry",doc.id());
Thread.sleep(6000);
try{
if (!accountingBucket.exists(doc.id()))
notDeletedTemp.add(doc);
}
catch(Exception ex) {
logger.error("doc:{} not delete ({}), problem with exist bucket",doc.id(),doc.toString(),ex);
logger.error("force insert into list for delete");
notDeletedTemp.add(doc);
}
}
}
}
if (notDeletedTemp.isEmpty()){
succesfulDelete=true;
@ -430,11 +485,9 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
}
if (!succesfulDelete){
logger.error("Error Delete record");
}
logger.debug("Delete complete {}, Start a insert aggregated document",countDelete);
/**
* delete all record and ready for insert a new aggregated record
*/
}
logger.debug("Delete complete:{}, Start a insert aggregated document:{}",countDelete,notInserted.size());
// delete all record and ready for insert a new aggregated record
if (succesfulDelete){
//if successful record delete, delete backup file
ManagementFileBackup.getInstance().onDeleteFile(Constant.FILE_RECORD_NO_AGGREGATE+"_"+nameFile,false);
@ -445,22 +498,34 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
for (JsonDocument document: notInserted){
countInsert ++;
try{
@SuppressWarnings("unused")
JsonDocument response = accountingBucket.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
//JsonDocument response = accountingBucket.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
JsonDocument response = accountingBucket.upsert(document,persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
catch(Exception e){
logger.trace("record:{} not insert retry:{} ",document.id(),index);
Thread.sleep(1500);
try{
if (!accountingBucket.exists(document.id()))
notInsertedTemp.add(document);
}
catch(Exception ex){
logger.warn("doc:{} not verify for inset",document.id());
}
catch(Exception ext){
logger.warn("doc:{} not verify for insert because timeout, retry",document.id(),ext);
Thread.sleep(3000);
try{
if (!accountingBucket.exists(document.id()))
notInsertedTemp.add(document);
}
catch(Exception ex) {
logger.error("doc:{} not insert ({}), problem with exist bucket",document.id(),document.toString(),ex);
logger.error("force insert into list for insert");
notInsertedTemp.add(document);
}
}
}
}
if (notInsertedTemp.isEmpty()){
succesfulInsert=true;
succesfulInsert=true;
}
else {
index++;
@ -476,7 +541,8 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
ManagementFileBackup.getInstance().onDeleteFile(Constant.FILE_RECORD_AGGREGATE+"_"+nameFile,true);
}
}
}
logger.trace("Insert complete");
}
return true;
}

View File

@ -94,35 +94,52 @@ public class WorkSpaceManagement {
BufferedWriter filebackup =null;
File logFile = new File(namePathFile);
logFile.delete();
Thread.sleep(500);
filebackup = new BufferedWriter(new FileWriter(logFile));
filebackup = new BufferedWriter(new FileWriter(logFile));
int count = 0;
int maxTries = 3;
boolean exitRetry=false;
for (ViewRow row : viewResult){
if (row.document()!=null){
if (!row.document().content().toString().isEmpty()){
filebackup.write(row.document().content().toString());
filebackup.newLine();
while(!exitRetry) {
try {
if (row.document()!=null){
if (!row.document().content().toString().isEmpty()){
filebackup.write(row.document().content().toString());
filebackup.newLine();
}
}
exitRetry=true;
} catch (Exception e) {
logger.error("retry:{}",count);
logger.error(e.getMessage());
if (++count == maxTries){
filebackup.close();
throw e;
}
}
}
}
filebackup.close();
//create a zip file
byte[] buffer = new byte[1024];
FileOutputStream fos = new FileOutputStream(namePathFileZip);
ZipOutputStream zos = new ZipOutputStream(fos);
ZipEntry ze= new ZipEntry(nameFile);
zos.putNextEntry(ze);
FileInputStream in = new FileInputStream(namePathFile);
int len;
while ((len = in.read(buffer)) > 0) {
zos.write(buffer, 0, len);
}
in.close();
zos.closeEntry();
zos.close();
InputStream fileZipStream = new FileInputStream(namePathFileZip);
WorkSpaceManagement.saveItemOnWorkspace(Constant.user,fileZipStream,"complete.zip", "Description", folderStartTimeName.getId());
logger.trace("Save a backup file into workspace; bucket{},scope:{}, startkey:{},endkey:{}, aggregation type:{}",bucket,scope,startKeyString,endKeyString ,aggType.toString());
@ -212,7 +229,7 @@ public class WorkSpaceManagement {
logger.trace("Save Item on WorkSpace Folder:{}, name:{},description:{}, folderID:{}",projectItem,name,description,folderId);
if (projectItem == null) {
ws.createExternalFile(name, description, null, inputStream, folderId);
}
else{
ws.remove(name, folderId);