From ec7773313ab0548470c364c9bc635d3615e16515 Mon Sep 17 00:00:00 2001 From: Alessandro Pieve Date: Wed, 25 Jan 2017 14:06:16 +0000 Subject: [PATCH] Fix warning and retry git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@141773 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 2 +- .../aggregator/configuration/Constant.java | 8 +- .../madeaggregation/Aggregation.java | 31 ++- .../plugin/AccountingAggregatorPlugin.java | 234 +++++++++++------- .../plugin/WorkSpaceManagement.java | 37 ++- 5 files changed, 198 insertions(+), 114 deletions(-) diff --git a/pom.xml b/pom.xml index 22811db..d799cb9 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.gcube.accounting accounting-aggregator-se-plugin - 1.0.0-SNAPSHOT + 1.0.1-SNAPSHOT Accounting Aggregator Accounting Aggregator Smart Executor Plugin diff --git a/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java b/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java index 9ff70cf..3c3b4e7 100644 --- a/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java +++ b/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java @@ -25,7 +25,9 @@ public class Constant { - public static final Integer CONNECTION_TIMEOUT=10; - public static final Integer NUM_RETRY=5; - public static final Integer CONNECTION_TIMEOUT_BUCKET=10; + public static final Integer CONNECTION_TIMEOUT=15; + public static final Integer NUM_RETRY=6; + public static final Integer CONNECTION_TIMEOUT_BUCKET=15; + public static final Integer VIEW_TIMEOUT_BUCKET=120; + public static final Integer MAX_REQUEST_LIFE_TIME=120; } diff --git a/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java b/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java index 58177a8..4794dd1 100644 --- a/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java +++ b/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java @@ -32,9 +32,13 @@ public class Aggregation { //count buffer records protected int totalBufferedRecords; - //list Aggregate record - protected Map>> bufferedRecords = new HashMap>>(); + //list Aggregate record + //TODO RIMETTERE A PROTECTED + public Map>> bufferedRecords = new HashMap>>(); + + + public Aggregation() { super(); } @@ -70,57 +74,52 @@ public class Aggregation { List> records; if(this.bufferedRecords.containsKey(recordType)){ records = this.bufferedRecords.get(recordType); - //logger.debug("value endtime{}, type endtime{}",record.getEndTime()); boolean found = false; for(AggregatedRecord bufferedRecord : records){ if(!(bufferedRecord instanceof AggregatedRecord)){ continue; } - AggregationUtility util = new AggregationUtility(bufferedRecord); - //verify a record is aggregable - + //verify a record is aggregable //logger.debug("record: {}",record.toString()); if (util.isAggregable(record)){ try { - AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord; // TODO check compatibility using getAggregable - bufferedAggregatedRecord.aggregate((AggregatedRecord) record); + logger.debug("if -- madeAggregation aggregate"); + bufferedAggregatedRecord.aggregate((AggregatedRecord) record); //patch for not changed a creation time //bufferedAggregatedRecord.setCreationTime(bufferedAggregatedRecord.getStartTime()); bufferedAggregatedRecord.setCreationTime(record.getCreationTime()); found = true; break; } catch(NotAggregatableRecordsExceptions e) { - logger.trace("{} is not usable for aggregation", bufferedRecord); + logger.debug("{} is not usable for aggregation", bufferedRecord); } } - } if(!found){ //logger.debug("Aggregated Record not found with execption"); + logger.debug("if -- madeAggregation not found with execption add"); records.add(record); totalBufferedRecords++; return; } }else{ - //logger.debug("else if record contains "+recordType); records = new ArrayList>(); try { - records.add(getAggregatedRecord(record)); + logger.debug("else -- add getAggregatedRecord"); + records.add(getAggregatedRecord(record)); } catch (Exception e) { + logger.debug("else -- add Exception"); records.add(record); } totalBufferedRecords++; this.bufferedRecords.put(recordType, records); - } - - - + } } diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index 09310ec..42cd76a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -61,11 +61,16 @@ public class AccountingAggregatorPlugin extends Plugin documentElaborate=new ArrayList(); @@ -269,25 +309,23 @@ public class AccountingAggregatorPlugin extends Plugin documentElaborate) throws Exception{ - + int i=0; + int size=aggregate.bufferedRecords.size(); try { //patch for field of long type String document=row.value().toString().replace("\":", "=").replace("\"", ""); - + i=1;//1 Map map = getMapFromString(document); - + i=2;//2 + //prepare a document for elaborate + String identifier=(String) row.document().content().get("id"); + i=5;//5 + JsonDocument documentJson = JsonDocument.create(identifier, row.document().content()); + i=6;//6 + @SuppressWarnings("rawtypes") AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map); - + i=3;//3 aggregate.aggregate(record); - + i=4;//4 //insert an elaborate row into list JsonDocument for memory document elaborate - - String identifier=(String) row.document().content().get("id"); - - JsonDocument documentJson = JsonDocument.create(identifier, row.document().content()); - documentElaborate.add(documentJson); - + i=7;//7 return true; } catch(InvalidValueException ex){ - logger.warn("Record is not valid. Anyway, it will be persisted"); + logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i); + return true; + } + catch(RuntimeException exr){ + logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i); return true; } catch (Exception e) { - - logger.error("Error elaborateRow", e,e.getLocalizedMessage()); + logger.error("record is not elaborated:"+row.toString()+" but it will be persisted"); + logger.error("error elaborateRow", e); + logger.error("i:{}",i); + logger.error("size before:{}, after buffer size:{}",size,aggregate.bufferedRecords.size()); //throw e; return false; } - } @@ -354,30 +399,28 @@ public class AccountingAggregatorPlugin extends Plugin getMapFromString(String serializedMap){ /* Checking line sanity */ - if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){ - return null; - } - - /* Cleaning prefix and suffix to parse line */ - serializedMap = serializedMap.replace(LINE_FREFIX, ""); - serializedMap = serializedMap.replace(LINE_SUFFIX, ""); - - Map map = new HashMap(); - - String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR); - for (int i=0;i map = new HashMap(); + + String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR); + for (int i=0;i notDeleted = docs; List notInserted = aggregate.reallyFlush(); @@ -406,18 +449,30 @@ public class AccountingAggregatorPlugin extends Plugin 0) { zos.write(buffer, 0, len); } - + in.close(); zos.closeEntry(); zos.close(); - + InputStream fileZipStream = new FileInputStream(namePathFileZip); WorkSpaceManagement.saveItemOnWorkspace(Constant.user,fileZipStream,"complete.zip", "Description", folderStartTimeName.getId()); logger.trace("Save a backup file into workspace; bucket{},scope:{}, startkey:{},endkey:{}, aggregation type:{}",bucket,scope,startKeyString,endKeyString ,aggType.toString()); @@ -212,7 +229,7 @@ public class WorkSpaceManagement { logger.trace("Save Item on WorkSpace Folder:{}, name:{},description:{}, folderID:{}",projectItem,name,description,folderId); if (projectItem == null) { ws.createExternalFile(name, description, null, inputStream, folderId); - + } else{ ws.remove(name, folderId);