diff --git a/pom.xml b/pom.xml
index 22811db..d799cb9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
org.gcube.accounting
accounting-aggregator-se-plugin
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
Accounting Aggregator
Accounting Aggregator Smart Executor Plugin
diff --git a/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java b/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java
index 9ff70cf..3c3b4e7 100644
--- a/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java
+++ b/src/main/java/org/gcube/accounting/aggregator/configuration/Constant.java
@@ -25,7 +25,9 @@ public class Constant {
- public static final Integer CONNECTION_TIMEOUT=10;
- public static final Integer NUM_RETRY=5;
- public static final Integer CONNECTION_TIMEOUT_BUCKET=10;
+ public static final Integer CONNECTION_TIMEOUT=15;
+ public static final Integer NUM_RETRY=6;
+ public static final Integer CONNECTION_TIMEOUT_BUCKET=15;
+ public static final Integer VIEW_TIMEOUT_BUCKET=120;
+ public static final Integer MAX_REQUEST_LIFE_TIME=120;
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java b/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java
index 58177a8..4794dd1 100644
--- a/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java
+++ b/src/main/java/org/gcube/accounting/aggregator/madeaggregation/Aggregation.java
@@ -32,9 +32,13 @@ public class Aggregation {
//count buffer records
protected int totalBufferedRecords;
- //list Aggregate record
- protected Map>> bufferedRecords = new HashMap>>();
+ //list Aggregate record
+ //TODO RIMETTERE A PROTECTED
+ public Map>> bufferedRecords = new HashMap>>();
+
+
+
public Aggregation() {
super();
}
@@ -70,57 +74,52 @@ public class Aggregation {
List> records;
if(this.bufferedRecords.containsKey(recordType)){
records = this.bufferedRecords.get(recordType);
- //logger.debug("value endtime{}, type endtime{}",record.getEndTime());
boolean found = false;
for(AggregatedRecord bufferedRecord : records){
if(!(bufferedRecord instanceof AggregatedRecord)){
continue;
}
-
AggregationUtility util = new AggregationUtility(bufferedRecord);
- //verify a record is aggregable
-
+ //verify a record is aggregable
//logger.debug("record: {}",record.toString());
if (util.isAggregable(record)){
try {
-
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
// TODO check compatibility using getAggregable
- bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
+ logger.debug("if -- madeAggregation aggregate");
+ bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
//patch for not changed a creation time
//bufferedAggregatedRecord.setCreationTime(bufferedAggregatedRecord.getStartTime());
bufferedAggregatedRecord.setCreationTime(record.getCreationTime());
found = true;
break;
} catch(NotAggregatableRecordsExceptions e) {
- logger.trace("{} is not usable for aggregation", bufferedRecord);
+ logger.debug("{} is not usable for aggregation", bufferedRecord);
}
}
-
}
if(!found){
//logger.debug("Aggregated Record not found with execption");
+ logger.debug("if -- madeAggregation not found with execption add");
records.add(record);
totalBufferedRecords++;
return;
}
}else{
-
//logger.debug("else if record contains "+recordType);
records = new ArrayList>();
try {
- records.add(getAggregatedRecord(record));
+ logger.debug("else -- add getAggregatedRecord");
+ records.add(getAggregatedRecord(record));
} catch (Exception e) {
+ logger.debug("else -- add Exception");
records.add(record);
}
totalBufferedRecords++;
this.bufferedRecords.put(recordType, records);
- }
-
-
-
+ }
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
index 09310ec..42cd76a 100644
--- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
+++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
@@ -61,11 +61,16 @@ public class AccountingAggregatorPlugin extends Plugin documentElaborate=new ArrayList();
@@ -269,25 +309,23 @@ public class AccountingAggregatorPlugin extends Plugin documentElaborate) throws Exception{
-
+ int i=0;
+ int size=aggregate.bufferedRecords.size();
try {
//patch for field of long type
String document=row.value().toString().replace("\":", "=").replace("\"", "");
-
+ i=1;//1
Map map = getMapFromString(document);
-
+ i=2;//2
+ //prepare a document for elaborate
+ String identifier=(String) row.document().content().get("id");
+ i=5;//5
+ JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
+ i=6;//6
+
@SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map);
-
+ i=3;//3
aggregate.aggregate(record);
-
+ i=4;//4
//insert an elaborate row into list JsonDocument for memory document elaborate
-
- String identifier=(String) row.document().content().get("id");
-
- JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
-
documentElaborate.add(documentJson);
-
+ i=7;//7
return true;
}
catch(InvalidValueException ex){
- logger.warn("Record is not valid. Anyway, it will be persisted");
+ logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i);
+ return true;
+ }
+ catch(RuntimeException exr){
+ logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i);
return true;
}
catch (Exception e) {
-
- logger.error("Error elaborateRow", e,e.getLocalizedMessage());
+ logger.error("record is not elaborated:"+row.toString()+" but it will be persisted");
+ logger.error("error elaborateRow", e);
+ logger.error("i:{}",i);
+ logger.error("size before:{}, after buffer size:{}",size,aggregate.bufferedRecords.size());
//throw e;
return false;
}
-
}
@@ -354,30 +399,28 @@ public class AccountingAggregatorPlugin extends Plugin getMapFromString(String serializedMap){
/* Checking line sanity */
- if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){
- return null;
- }
-
- /* Cleaning prefix and suffix to parse line */
- serializedMap = serializedMap.replace(LINE_FREFIX, "");
- serializedMap = serializedMap.replace(LINE_SUFFIX, "");
-
- Map map = new HashMap();
-
- String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
- for (int i=0;i map = new HashMap();
+
+ String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR);
+ for (int i=0;i notDeleted = docs;
List notInserted = aggregate.reallyFlush();
@@ -406,18 +449,30 @@ public class AccountingAggregatorPlugin extends Plugin 0) {
zos.write(buffer, 0, len);
}
-
+
in.close();
zos.closeEntry();
zos.close();
-
+
InputStream fileZipStream = new FileInputStream(namePathFileZip);
WorkSpaceManagement.saveItemOnWorkspace(Constant.user,fileZipStream,"complete.zip", "Description", folderStartTimeName.getId());
logger.trace("Save a backup file into workspace; bucket{},scope:{}, startkey:{},endkey:{}, aggregation type:{}",bucket,scope,startKeyString,endKeyString ,aggType.toString());
@@ -212,7 +229,7 @@ public class WorkSpaceManagement {
logger.trace("Save Item on WorkSpace Folder:{}, name:{},description:{}, folderID:{}",projectItem,name,description,folderId);
if (projectItem == null) {
ws.createExternalFile(name, description, null, inputStream, folderId);
-
+
}
else{
ws.remove(name, folderId);