diff --git a/pom.xml b/pom.xml
index 0f3f12f..b73250f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,17 +44,10 @@
org.gcube.vremanagement
smart-executor-api
-
org.gcube.accounting
accounting-lib
-
- com.couchbase.client
- java-client
- 2.7.11
-
-
org.gcube.common
storagehub-client-library
diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
index e5782e2..769950b 100644
--- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
+++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
@@ -1,6 +1,7 @@
package org.gcube.accounting.aggregator.aggregation;
import java.io.File;
+import java.sql.ResultSet;
import java.text.DateFormat;
import java.util.Calendar;
import java.util.List;
@@ -15,6 +16,8 @@ import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
+import org.gcube.com.fasterxml.jackson.databind.ObjectMapper;
+import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
@@ -23,14 +26,6 @@ import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.Bucket;
-import com.couchbase.client.java.document.JsonDocument;
-import com.couchbase.client.java.document.json.JsonArray;
-import com.couchbase.client.java.document.json.JsonObject;
-import com.couchbase.client.java.view.ViewQuery;
-import com.couchbase.client.java.view.ViewResult;
-import com.couchbase.client.java.view.ViewRow;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -41,77 +36,49 @@ public class Aggregator {
private static final String TMP_SUFFIX = ".tmp";
protected final AggregationStatus aggregationStatus;
- protected final Bucket bucket;
protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile;
protected final File malformedRecordsFile;
protected int malformedRecordNumber;
+ protected ObjectMapper objectMapper;
+
protected Calendar startTime;
- public Aggregator(AggregationStatus aggregationStatus, Bucket bucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
+ public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
this.aggregationStatus = aggregationStatus;
- this.bucket = bucket;
this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile);
+ this.objectMapper = new ObjectMapper();
+
}
public void aggregate() throws Exception {
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
startTime = Utility.getUTCCalendarInstance();
- ViewResult viewResult = getViewResult();
- retrieveAndAggregate(viewResult);
+
+ // TODO query
+ ResultSet resultSet = null;
+
+ retrieveAndAggregate(resultSet);
}
}
- /**
- * Generate a key for map-reduce
- * @param key
- * @return JsonArray containing the map reduce key
- */
- protected JsonArray generateKey(String key){
- JsonArray arrayKey = JsonArray.create();
- for (String value : key.split("/")){
- if (!value.toString().isEmpty()){
- arrayKey.add(Integer.parseInt(value));
- }
- }
- return arrayKey;
- }
-
- protected ViewResult getViewResult() throws Exception {
+ protected ResultSet getViewResult() throws Exception {
DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat();
String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate());
String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate());
- JsonArray startKey = generateKey(dateStartKey);
- JsonArray endKey = generateKey(dateEndKey);
-
- DesignID designid = DesignID.valueOf(bucket.name());
- String designDocId = designid.getDesignName();
-
- String viewName = designid.getViewName();
-
- ViewQuery query = ViewQuery.from(designDocId, viewName);
- query.startKey(startKey);
- query.endKey(endKey);
- query.reduce(false);
- query.inclusiveEnd(false);
-
- logger.debug("View Query: designDocId:{} - viewName:{}, startKey:{} - endKey:{} ",
- designDocId, viewName, startKey, endKey);
-
- try {
- return bucket.query(query);
- } catch (Exception e) {
- logger.error("Exception error VIEW", e.getLocalizedMessage(), e);
- throw e;
- }
+
+ // TODO query here
+
+ return null;
+
}
private static final String USAGE_RECORD_TYPE = "usageRecordType";
@@ -119,19 +86,18 @@ public class Aggregator {
private static final String SIMPLE = "Simple";
- protected int elaborateRow(ViewRow row, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
+ protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
try {
- JsonObject content = row.document().content();
-
- if(content.containsKey(USAGE_RECORD_TYPE)){
- String recordType = content.getString(USAGE_RECORD_TYPE);
- content.removeKey(USAGE_RECORD_TYPE);
+
+ if(content.has(USAGE_RECORD_TYPE)){
+ String recordType = content.get(USAGE_RECORD_TYPE).asText();
+ content.remove(USAGE_RECORD_TYPE);
content.put(Record.RECORD_TYPE, recordType);
}
Boolean aggregated = false;
- if(content.containsKey(AggregatedRecord.CREATION_TIME)) {
+ if(content.has(AggregatedRecord.CREATION_TIME)) {
Object object = content.get(AggregatedRecord.CREATION_TIME);
if(object instanceof Double) {
Double d = ((Double) object);
@@ -139,7 +105,7 @@ public class Aggregator {
}
}
- if(content.containsKey(AggregatedRecord.START_TIME)) {
+ if(content.has(AggregatedRecord.START_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.START_TIME);
if(object instanceof Double) {
@@ -148,7 +114,7 @@ public class Aggregator {
}
}
- if(content.containsKey(AggregatedRecord.END_TIME)) {
+ if(content.has(AggregatedRecord.END_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.END_TIME);
if(object instanceof Double) {
@@ -157,14 +123,14 @@ public class Aggregator {
}
}
- if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) {
+ if(content.has(AggregatedRecord.OPERATION_COUNT)) {
Object object = content.get(AggregatedRecord.OPERATION_COUNT);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.OPERATION_COUNT, d.intValue());
}
- if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) {
+ if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) {
aggregated = true;
}
}
@@ -173,7 +139,7 @@ public class Aggregator {
content.put(AggregatedRecord.AGGREGATED, true);
}
- String recordType = content.getString(Record.RECORD_TYPE);
+ String recordType = content.get(Record.RECORD_TYPE).asText();
if(!aggregated){
if(recordType.startsWith(SIMPLE)){
@@ -221,7 +187,7 @@ public class Aggregator {
private static final int MAX_RETRY = 3;
- protected void retrieveAndAggregate(ViewResult viewResult) throws Exception {
+ protected void retrieveAndAggregate(ResultSet resultSet) throws Exception {
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
Calendar start = Utility.getUTCCalendarInstance();
@@ -233,15 +199,19 @@ public class Aggregator {
malformedRecordNumber = 0;
int originalRecordsCounter = 0;
- for (ViewRow row : viewResult) {
+ while (resultSet.next()) {
for(int i=1; i<=MAX_RETRY; i++){
try {
- originalRecordsCounter = elaborateRow(row, aggregatorBuffer, originalRecordsCounter);
+ ObjectNode content = objectMapper.createObjectNode();
+
+ // todo set data from resultset
+
+ originalRecordsCounter = elaborateRow(content, aggregatorBuffer, originalRecordsCounter);
TimeUnit.MILLISECONDS.sleep(3);
break;
}catch (RuntimeException e) {
if(i==2){
- logger.error("Unable to elaborate {} {}. Tryed {} times.", ViewRow.class.getSimpleName(), row, i, e);
+ logger.error("Unable to elaborate row {}. Tryed {} times.", i, e);
}
}
}
@@ -266,8 +236,7 @@ public class Aggregator {
List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord);
- JsonObject jsonObject = JsonObject.fromJson(marshalled);
- Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString());
+ Utility.printLine(aggregateRecordsBackupFileTmp, marshalled);
}
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
@@ -312,11 +281,4 @@ public class Aggregator {
aggregatorBuffer.aggregate(aggregatedRecord);
}
- protected JsonDocument getJsonDocument(ViewRow row) {
- String identifier = (String) row.document().content().get("id");
- JsonDocument jsonDocument = JsonDocument.create(identifier, row.document().content());
- logger.trace("{}", jsonDocument.toString());
- return jsonDocument;
- }
-
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java
index 5bfd764..f74b73e 100644
--- a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java
+++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java
@@ -4,11 +4,9 @@ import java.util.Date;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility;
-import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,26 +15,24 @@ public class AggregatorManager {
private static Logger logger = LoggerFactory.getLogger(AggregatorManager.class);
- public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager";
-
protected final AggregationType aggregationType;
protected Date aggregationStartDate;
protected Date aggregationEndDate;
-
+
protected final boolean restartFromLastAggregationDate;
protected boolean forceEarlyAggregation;
protected boolean forceRerun;
protected boolean forceRestart;
-
+
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
this.aggregationType = aggregationType;
this.aggregationStartDate = Utility.sanitizeDate(aggregationType, aggregationStartDate);
-
+
this.aggregationEndDate = aggregationEndDate;
-
+
this.restartFromLastAggregationDate = restartFromLastAggregationDate;
this.forceEarlyAggregation = false;
this.forceRerun = false;
@@ -54,7 +50,7 @@ public class AggregatorManager {
public void setForceRestart(boolean forceRestart) {
this.forceRestart = forceRestart;
}
-
+
protected Date getEndDateFromStartDate() {
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
}
@@ -67,56 +63,41 @@ public class AggregatorManager {
return aggregationStatus;
}
- public void elaborate(Date persistStartTime, Date persistEndTime, Class extends UsageRecord> usageRecordClass)
+ public void elaborate(Date persistStartTime, Date persistEndTime, String recordType)
throws Exception {
+
+ AggregationStatus aggregationStatus = null;
+ if (restartFromLastAggregationDate) {
+ AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType,
+ aggregationStartDate, aggregationEndDate);
- CouchBaseConnector couchBaseConnector = CouchBaseConnector.getInstance();
-
- for (String recordType : couchBaseConnector.getRecordTypes()) {
-
- if (usageRecordClass != null && usageRecordClass.newInstance().getRecordType().compareTo(recordType) != 0) {
- continue;
+ // I don't check if this aggregation is COMPLETED because this
+ // is responsibility of Recovery Process
+ if (lastAggregationStatus != null) {
+ this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate();
+ logger.info("Last got AggregationStatus is {}. Restarting from {}",
+ DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus),
+ Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate));
}
- if (recordType.compareTo(ACCOUNTING_MANAGER_BUCKET_NAME) == 0) {
- continue;
- }
-
- AggregationStatus aggregationStatus = null;
- if (restartFromLastAggregationDate) {
- AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType,
- aggregationStartDate, aggregationEndDate);
-
- // I don't check if this aggregation is COMPLETED because this
- // is responsibility of Recovery Process
- if (lastAggregationStatus != null) {
- this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate();
- logger.info("Last got AggregationStatus is {}. Restarting from {}",
- DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus),
- Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate));
- }
-
- }
-
- aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType,
- aggregationStartDate);
-
- if (aggregationStatus == null) {
- aggregationStatus = createAggregationStatus(recordType);
- }
-
- if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
- logger.info("Start Date {} is after provided End Date {}. Please check input parameters.",
- Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
- Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
- return;
- }
-
- Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
- elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
-
}
+ aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType, aggregationStartDate);
+
+ if (aggregationStatus == null) {
+ aggregationStatus = createAggregationStatus(recordType);
+ }
+
+ if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
+ logger.info("Start Date {} is after provided End Date {}. Please check input parameters.",
+ Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
+ Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
+ return;
+ }
+
+ Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
+ elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
+
}
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
index a42d95e..a3e4056 100644
--- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
+++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
@@ -10,9 +10,6 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.aggregation.Aggregator;
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
import org.gcube.accounting.aggregator.persist.Persist;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector.SUFFIX;
-import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant;
@@ -21,8 +18,6 @@ import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.Bucket;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -167,14 +162,13 @@ public class Elaborator {
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
- Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
+ // Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
// Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
- Aggregator aggregator = new Aggregator(aggregationStatus, srcBucket, originalRecordsbackupFile,
- aggregateRecordsBackupFile);
+ Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile);
aggregator.aggregate();
@@ -186,7 +180,7 @@ public class Elaborator {
*/
// if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
// Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
- Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
+ Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover();
/*
}else{
diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java
index 7548c33..84117f2 100644
--- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java
+++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java
@@ -4,7 +4,7 @@ import java.util.Date;
import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
+import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger;
@@ -28,7 +28,10 @@ public class RecoveryManager {
}
public void recovery() throws Exception {
- List aggregationStatusList = CouchBaseConnector.getUnterminated(aggregationStartDate, aggregationEndDate);
+ PostgreSQLConnector postgreSQLConnector = new PostgreSQLConnector();
+ // TODO
+
+ List aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate);
if(aggregationStatusList.size()==0){
logger.info("Nothing to recover :)");
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java
index e5322a0..c05ca47 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java
@@ -1,35 +1,26 @@
package org.gcube.accounting.aggregator.persist;
import java.io.File;
-import java.util.concurrent.TimeUnit;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
-
-import com.couchbase.client.java.Bucket;
-import com.couchbase.client.java.PersistTo;
-import com.couchbase.client.java.document.json.JsonObject;
-import com.couchbase.client.java.error.DocumentDoesNotExistException;
+import org.gcube.com.fasterxml.jackson.databind.JsonNode;
+import org.gcube.documentstore.records.DSMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class DeleteDocument extends DocumentElaboration {
- public DeleteDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
- super(aggregationStatus, AggregationState.DELETED, file, bucket, aggregationStatus.getOriginalRecordsNumber());
+ public DeleteDocument(AggregationStatus aggregationStatus, File file){
+ super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber());
}
@Override
protected void elaborateLine(String line) throws Exception {
- JsonObject jsonObject = JsonObject.fromJson(line);
- String id = jsonObject.getString(ID);
- try {
- bucket.remove(id, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
- }catch (DocumentDoesNotExistException e) {
- // OK it can happen when the delete procedure were started but was interrupted
- }
+ JsonNode jsonNode = DSMapper.asJsonNode(line);
+ String id = jsonNode.get(ID).asText();
+
}
@Override
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
index 2a40c3c..f087106 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
@@ -15,8 +15,6 @@ import org.gcube.documentstore.records.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.Bucket;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -33,19 +31,16 @@ public abstract class DocumentElaboration {
protected final AggregationStatus aggregationStatus;
protected final File file;
- protected final Bucket bucket;
protected final AggregationState finalAggregationState;
protected final int rowToBeElaborated;
protected Calendar startTime;
- protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file,
- Bucket bucket, int rowToBeElaborated) {
+ protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) {
this.aggregationStatus = statusManager;
this.finalAggregationState = finalAggregationState;
this.file = file;
- this.bucket = bucket;
this.rowToBeElaborated = rowToBeElaborated;
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
index f25d5a4..da795ba 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
@@ -1,6 +1,7 @@
package org.gcube.accounting.aggregator.persist;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -13,12 +14,13 @@ import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
+import org.gcube.com.fasterxml.jackson.core.JsonProcessingException;
+import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
+import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
-import com.couchbase.client.java.document.json.JsonObject;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -47,7 +49,7 @@ public class InsertDocument extends DocumentElaboration {
// public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{
// super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
- super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber());
+ super(aggregationStatus, AggregationState.ADDED, file, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new TreeMap>();
unparsableLines = new ArrayList<>();
@@ -61,18 +63,18 @@ public class InsertDocument extends DocumentElaboration {
count = 0;
}
- protected String getKey(JsonObject jsonObject) {
- String serviceClass = jsonObject.getString(ServiceUsageRecord.SERVICE_CLASS);
- String serviceName = jsonObject.getString(ServiceUsageRecord.SERVICE_NAME);
+ protected String getKey(JsonNode jsonNode) {
+ String serviceClass = jsonNode.get(ServiceUsageRecord.SERVICE_CLASS).asText();
+ String serviceName = jsonNode.get(ServiceUsageRecord.SERVICE_NAME).asText();
return serviceClass + "," + serviceName;
}
- protected void addServiceClassName_calledMethods(JsonObject jsonObject) {
- String key = getKey(jsonObject);
- String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD);
+ protected void addServiceClassName_calledMethods(JsonNode jsonNode) {
+ String key = getKey(jsonNode);
+ String calledMethod = jsonNode.get(ServiceUsageRecord.CALLED_METHOD).asText();
int operationCount = 0;
try {
- operationCount = jsonObject.getInt(AggregatedServiceUsageRecord.OPERATION_COUNT);
+ operationCount = jsonNode.get(AggregatedServiceUsageRecord.OPERATION_COUNT).asInt();
}catch (Exception e) {
logger.error("", e);
// the record was not an Aggregated ServiceUsageRecord
@@ -92,27 +94,27 @@ public class InsertDocument extends DocumentElaboration {
}
}
- protected JsonObject analyseLine(String line) {
- JsonObject jsonObject = JsonObject.fromJson(line);
+ protected JsonNode analyseLine(String line) throws JsonProcessingException, IOException {
+ JsonNode jsonNode = DSMapper.asJsonNode(line);
if(serviceUsageRecordElaboration) {
try {
- addServiceClassName_calledMethods(jsonObject);
+ addServiceClassName_calledMethods(jsonNode);
}catch (Throwable e) {
unparsableLines.add(line);
}
}
- return jsonObject;
+ return jsonNode;
}
@Override
protected void elaborateLine(String line) throws Exception {
- JsonObject jsonObject = analyseLine(line);
+ JsonNode jsonNode = analyseLine(line);
/*
* String id = jsonObject.getString(ID);
* JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
* bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
*/
- Record record = RecordUtility.getRecord(jsonObject.toString());
+ Record record = RecordUtility.getRecord(jsonNode.toString());
persistencePostgreSQL.insert(record);
++count;
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
index 9d7803e..67cbbd2 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
@@ -16,8 +16,6 @@ import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.Bucket;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -27,7 +25,7 @@ public class Persist {
protected final AggregationStatus aggregationStatus;
- protected final Bucket originalRecordBucket;
+ // protected final Bucket originalRecordBucket;
// protected final Bucket aggregatedRecordBucket;
protected final File originalRecordsbackupFile;
@@ -40,12 +38,11 @@ public class Persist {
Bucket originalRecordBucket, Bucket aggregatedRecordBucket,
File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
*/
- public Persist(AggregationStatus aggregationStatus,
- Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
+ public Persist(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
super();
this.aggregationStatus = aggregationStatus;
- this.originalRecordBucket = originalRecordBucket;
+ // this.originalRecordBucket = originalRecordBucket;
// this.aggregatedRecordBucket = aggregatedRecordBucket;
this.originalRecordsbackupFile = originalRecordsbackupFile;
@@ -94,7 +91,7 @@ public class Persist {
// For Each original row stored on file it remove them from Bucket.
// At the end of elaboration set AgrgegationStatus to DELETED
// Then save the file in Workspace and set AgrgegationStatus to COMPLETED
- DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket);
+ DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile);
deleteDocument.elaborate();
}
// InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java
index f1dfa2e..28963f5 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java
@@ -3,7 +3,8 @@ package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
/**
- * @author Alessandro Pieve (ISTI - CNR)
+ * @author Luca Frosini (ISTI-CNR)
+ * @author Alessandro Pieve (ISTI-CNR)
*/
public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration {
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java
deleted file mode 100644
index 4f9bb42..0000000
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java
+++ /dev/null
@@ -1,408 +0,0 @@
-package org.gcube.accounting.aggregator.persistence;
-
-import static com.couchbase.client.java.query.Select.select;
-import static com.couchbase.client.java.query.dsl.Expression.s;
-import static com.couchbase.client.java.query.dsl.Expression.x;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.gcube.accounting.aggregator.aggregation.AggregationType;
-import org.gcube.accounting.aggregator.status.AggregationState;
-import org.gcube.accounting.aggregator.status.AggregationStatus;
-import org.gcube.accounting.aggregator.utility.Constant;
-import org.gcube.accounting.aggregator.utility.Utility;
-import org.gcube.accounting.datamodel.AggregatedUsageRecord;
-import org.gcube.accounting.datamodel.UsageRecord;
-import org.gcube.documentstore.records.DSMapper;
-import org.gcube.documentstore.records.Record;
-import org.gcube.documentstore.records.RecordUtility;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.couchbase.client.java.Bucket;
-import com.couchbase.client.java.Cluster;
-import com.couchbase.client.java.CouchbaseCluster;
-import com.couchbase.client.java.PersistTo;
-import com.couchbase.client.java.document.JsonDocument;
-import com.couchbase.client.java.document.json.JsonObject;
-import com.couchbase.client.java.env.CouchbaseEnvironment;
-import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
-import com.couchbase.client.java.error.DocumentAlreadyExistsException;
-import com.couchbase.client.java.query.N1qlQueryResult;
-import com.couchbase.client.java.query.N1qlQueryRow;
-import com.couchbase.client.java.query.Statement;
-import com.couchbase.client.java.query.dsl.Expression;
-import com.couchbase.client.java.query.dsl.Sort;
-
-/**
- * @author Luca Frosini (ISTI - CNR)
- */
-public class CouchBaseConnector {
-
- private static Logger logger = LoggerFactory.getLogger(CouchBaseConnector.class);
-
- public static final long MAX_REQUEST_LIFE_TIME = TimeUnit.SECONDS.toMillis(120);
- public static final long KEEP_ALIVE_INTERVAL = TimeUnit.HOURS.toMillis(1);
- public static final long AUTO_RELEASE_AFTER = TimeUnit.HOURS.toMillis(1);
- public static final long VIEW_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(120);
- public static final long CONNECTION_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(15);
- public static final long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
-
- private static final String URL_PROPERTY_KEY = "URL";
- private static final String PASSWORD_PROPERTY_KEY = "password";
-
- public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager";
-
- /* The environment configuration */
- protected static final CouchbaseEnvironment ENV;
- protected static final PersistTo PERSIST_TO;
-
- static {
- ENV = DefaultCouchbaseEnvironment.builder()
- .connectTimeout(CouchBaseConnector.CONNECTION_TIMEOUT)
- .maxRequestLifetime(CouchBaseConnector.MAX_REQUEST_LIFE_TIME)
- .queryTimeout(CouchBaseConnector.CONNECTION_TIMEOUT)
- .viewTimeout(CouchBaseConnector.VIEW_TIMEOUT_BUCKET)
- .keepAliveInterval(CouchBaseConnector.KEEP_ALIVE_INTERVAL)
- .kvTimeout(5000)
- .autoreleaseAfter(CouchBaseConnector.AUTO_RELEASE_AFTER).build();
-
- PERSIST_TO = PersistTo.MASTER;
-
- }
-
- protected static CouchBaseConnector couchBaseConnector;
-
- protected AggregatorPersitenceConfiguration configuration;
- protected Cluster cluster;
- protected Map connectionMap;
- protected Map> recordTypeMap;
-
- public synchronized static CouchBaseConnector getInstance() throws Exception{
- if(couchBaseConnector==null){
- couchBaseConnector = new CouchBaseConnector();
- }
- return couchBaseConnector;
- }
-
- protected CouchBaseConnector() throws Exception {
- this.configuration = new AggregatorPersitenceConfiguration(AggregatorPersistence.class);
- this.cluster = getCluster();
- createConnectionMap();
- }
-
-
- private Cluster getCluster() throws Exception {
- try {
- String url = configuration.getProperty(URL_PROPERTY_KEY);
- return CouchbaseCluster.create(ENV, url);
- } catch (Exception e) {
- throw e;
- }
- }
-
- public static enum SUFFIX {
- src, dst
- };
-
- private static String getBucketKey(String recordType, AggregationType aggregationType, SUFFIX suffix){
- return recordType + "-" + aggregationType.name() + "-" + suffix.name();
- }
-
- private Map createConnectionMap() throws Exception {
- connectionMap = new HashMap<>();
- recordTypeMap = new HashMap<>();
-
- try {
- Bucket b = cluster.openBucket(
- ACCOUNTING_MANAGER_BUCKET_NAME,
- configuration.getProperty(PASSWORD_PROPERTY_KEY));
- connectionMap.put(ACCOUNTING_MANAGER_BUCKET_NAME, b);
- }catch (Exception e) {
- logger.error("Unable to open Bucket used for Accounting Aggregation Management", e);
- throw e;
- }
-
- Map> recordClasses = RecordUtility.getRecordClassesFound();
- for (Class extends Record> recordClass : recordClasses.values()) {
- Record recordInstance = recordClass.newInstance();
- if (recordInstance instanceof UsageRecord && !(recordInstance instanceof AggregatedUsageRecord,?>)) {
- String recordType = recordInstance.getRecordType();
- recordTypeMap.put(recordType, recordClass);
-
- for(AggregationType aggregationType : AggregationType.values()){
- for(SUFFIX suffix : SUFFIX.values()){
- logger.debug("Trying to get the Bucket for {} {} {}", suffix, recordType, aggregationType);
- String bucketKey = getBucketKey(recordType, aggregationType, suffix);
- String bucketName = configuration.getProperty(bucketKey);
- logger.debug("Bucket for {} {} {} is {}. Going to open it.", suffix, recordType, aggregationType, bucketName);
- try {
- Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY));
- connectionMap.put(bucketKey, bucket);
- }catch (Exception e) {
- logger.warn("Unable to open Bucket {} for {} {} {}. This normally means that is not configured.", bucketName, suffix, recordType, aggregationType, recordClass);
- }
- }
- }
- }
- }
-
- return connectionMap;
- }
-
- public Set getConnectionMapKeys(){
- return connectionMap.keySet();
- }
-
- public Set getRecordTypes(){
- return recordTypeMap.keySet();
- }
-
- public Bucket getBucket(String recordType, AggregationType aggregationType, SUFFIX suffix){
- return connectionMap.get(getBucketKey(recordType, aggregationType, suffix));
- }
-
- public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
- Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
-
- /*
- * SELECT *
- * FROM AccountingManager
- * WHERE
- * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
- * `aggregationInfo`.`aggregationType` = "DAILY" AND
- * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
- * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
- * ORDER BY `aggregationInfo`.`aggregationStartDate` DESC LIMIT 1
- */
-
- Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType));
- expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
-
-
- String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
- if(aggregationStartDate!=null){
- expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
- }
-
- if(aggregationEndDate!=null){
- expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate))));
- }
-
- Sort sort = Sort.desc(aggregationStartDateField);
-
- Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort).limit(1);
-
- logger.trace("Going to query : {}", statement.toString());
-
- N1qlQueryResult result = bucket.query(statement);
- if (!result.finalSuccess()) {
- logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
- return null;
- }
-
- List rows = result.allRows();
-
- if(rows.size()>1){
- String error = String.format("More than one Document found for query %. This is really strange and should not occur. Please contact the Administrator.", statement.toString());
- logger.error(error);
- throw new Exception(error);
- }
-
- if(rows.size()==1){
- N1qlQueryRow row = rows.get(0);
- try {
- JsonObject jsonObject = row.value().getObject(bucket.name());
- logger.trace("JsonObject : {}", jsonObject.toString());
- return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
- } catch (Exception e) {
- logger.warn("Unable to elaborate result for {}", row.toString());
- }
- }
-
- return null;
- }
-
- public static List getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{
- return getUnterminated(null, null, aggregationStartDate, aggregationEndDate);
- }
-
- public static List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
- Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
-
- /*
- * SELECT *
- * FROM AccountingManager
- * WHERE
- * `aggregationState` != "COMPLETED" AND
- * `lastUpdateTime` < "2017-07-31 09:31:10.984 +0000" AND
- * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
- * `aggregationInfo`.`aggregationType` = "DAILY" AND
- * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
- * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
- *
- * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
- */
-
- Calendar now = Utility.getUTCCalendarInstance();
- now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
-
- Expression expression = x("`aggregationState`").ne(s(AggregationState.COMPLETED.name()));
- expression = expression.and(x("`lastUpdateTime`").lt(s(Constant.DEFAULT_DATE_FORMAT.format(now.getTime()))));
-
- if(recordType!=null){
- expression = expression.and(x("`aggregationInfo`.`recordType`").eq(s(recordType)));
- }
-
- if(aggregationType!=null){
- expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
- }
-
- String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
- if(aggregationStartDate!=null){
- expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
- }
-
- if(aggregationEndDate!=null){
- expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate))));
- }
-
- Sort sort = Sort.asc(aggregationStartDateField);
-
- Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort);
-
- logger.trace("Going to query : {}", statement.toString());
-
- N1qlQueryResult result = bucket.query(statement);
- if (!result.finalSuccess()) {
- logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
- return null;
- }
-
- List rows = result.allRows();
- List aggregationStatuses = new ArrayList<>(rows.size());
- for(N1qlQueryRow row: rows){
- try {
- JsonObject jsonObject = row.value().getObject(bucket.name());
- logger.trace("JsonObject : {}", jsonObject.toString());
- AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
- aggregationStatuses.add(aggregationStatus);
- } catch (Exception e) {
- logger.warn("Unable to elaborate result for {}", row.toString());
- }
- }
-
- return aggregationStatuses;
-
- }
-
- public static List getAll() throws Exception{
- Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
-
- /*
- * SELECT *
- * FROM AccountingManager
- * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
- */
-
-
- String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
-
- Sort sort = Sort.asc(aggregationStartDateField);
-
- Statement statement = select("*").from(bucket.name()).orderBy(sort);
-
- logger.trace("Going to query : {}", statement.toString());
-
- N1qlQueryResult result = bucket.query(statement);
- if (!result.finalSuccess()) {
- logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
- return null;
- }
-
- List rows = result.allRows();
- List aggregationStatuses = new ArrayList<>(rows.size());
- for(N1qlQueryRow row: rows){
- try {
- JsonObject jsonObject = row.value().getObject(bucket.name());
- logger.trace("JsonObject : {}", jsonObject.toString());
- AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
- aggregationStatuses.add(aggregationStatus);
- } catch (Exception e) {
- logger.warn("Unable to elaborate result for {}", row.toString());
- }
- }
-
- return aggregationStatuses;
-
- }
-
-
- public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
- Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
-
- /*
- * SELECT *
- * FROM AccountingManager
- * WHERE
- * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
- * `aggregationInfo`.`aggregationType` = "DAILY" AND
- * `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000"
- */
-
- Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType));
- expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
-
- expression = expression.and(x("`aggregationInfo`.`aggregationStartDate`").eq(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
-
- Statement statement = select("*").from(bucket.name()).where(expression);
-
- logger.trace("Going to query : {}", statement.toString());
-
- N1qlQueryResult result = bucket.query(statement);
- if (!result.finalSuccess()) {
- logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
- return null;
- }
-
- List rows = result.allRows();
-
- if(rows.size()>1){
- String error = String.format("More than one Document found for query %s. This is really strange and should not occur. Please contact the Administrator.", statement.toString());
- logger.error(error);
- throw new Exception(error);
- }
-
- if(rows.size()==1){
- N1qlQueryRow row = rows.get(0);
- try {
- JsonObject jsonObject = row.value().getObject(bucket.name());
- logger.trace("JsonObject : {}", jsonObject.toString());
- return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
- } catch (Exception e) {
- logger.warn("Unable to elaborate result for {}", row.toString());
- }
- }
-
- return null;
- }
-
-
- public static void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception{
- Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
- JsonObject jsonObject = JsonObject.fromJson(DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
- JsonDocument jsonDocument = JsonDocument.create(aggregationStatus.getUUID().toString(), jsonObject);
- try{
- bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
- }catch (DocumentAlreadyExistsException e) {
- // OK it can happen when the insert procedure were started but was interrupted
- }
- }
-}
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
index 1637b89..f1e9af4 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
@@ -21,6 +21,9 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.postgresql.core.Utils;
+/**
+ * @author Luca Frosini (ISTI-CNR)
+ */
public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
private static final String UTC_TIME_ZONE = "UTC";
@@ -219,7 +222,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
}
- public static List getAll() throws Exception{
+ public List getAll() throws Exception{
/*
* SELECT *
@@ -235,7 +238,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
}
- public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
+ public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
/*
* SELECT *
@@ -246,7 +249,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
* `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000"
*/
- return null;
+ return null;
}
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
index de616eb..9bb3260 100644
--- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
+++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java
@@ -130,6 +130,7 @@ public class AccountingAggregatorPlugin extends Plugin {
Date persistStartTime = null;
Date persistEndTime = null;
+ String recordType = null;
Class extends UsageRecord> usageRecordClass = null;
boolean forceEarlyAggregation = false;
@@ -192,14 +193,16 @@ public class AccountingAggregatorPlugin extends Plugin {
}
if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) {
- usageRecordClass = (Class extends UsageRecord>) RecordUtility.getRecordClass((String) inputs.get(RECORD_TYPE_INPUT_PARAMETER));
+ recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER);
+ usageRecordClass = (Class extends UsageRecord>) RecordUtility.getRecordClass(recordType);
+ logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass);
}
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
aggregatorManager.setForceRerun(forceRerun);
aggregatorManager.setForceRestart(forceRestart);
- aggregatorManager.elaborate(persistStartTime, persistEndTime, usageRecordClass);
+ aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType);
break;
diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java
index bbcd44a..6a7c731 100644
--- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java
+++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java
@@ -8,7 +8,7 @@ import java.util.UUID;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
+import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
@@ -62,20 +62,26 @@ public class AggregationStatus {
@JsonProperty
protected List aggregationStateEvents;
+ private static PostgreSQLConnector postgreSQLConnector;
+
+ static {
+ // TODO
+ postgreSQLConnector = null;
+ }
+
// Needed for Jackon Unmarshalling
- @SuppressWarnings("unused")
- private AggregationStatus(){}
+ protected AggregationStatus(){}
public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
- return CouchBaseConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
+ return postgreSQLConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
}
public static List getUnterminated(String recordType, AggregationType aggregationType) throws Exception{
- return CouchBaseConnector.getUnterminated(recordType, aggregationType, null, null);
+ return postgreSQLConnector.getUnterminated(recordType, aggregationType, null, null);
}
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
- return CouchBaseConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate);
+ return postgreSQLConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate);
}
public AggregationStatus(AggregationInfo aggregationInfo) throws Exception {
@@ -115,7 +121,7 @@ public class AggregationStatus {
aggregationStateEvents.add(aggregationStatusEvent);
if(sync){
- CouchBaseConnector.upsertAggregationStatus(this);
+ postgreSQLConnector.upsertAggregationStatus(this);
}
}
@@ -188,7 +194,7 @@ public class AggregationStatus {
public void updateLastUpdateTime(boolean sync) throws Exception {
this.lastUpdateTime = Utility.getUTCCalendarInstance();
if(sync){
- CouchBaseConnector.upsertAggregationStatus(this);
+ postgreSQLConnector.upsertAggregationStatus(this);
}
}
diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java
index ab034ed..a5379e3 100644
--- a/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java
@@ -6,7 +6,6 @@ import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
@@ -43,7 +42,7 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test
public void insertAllInPostgreSQL() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE);
- List aggregationStatuses = CouchBaseConnector.getAll();
+ List aggregationStatuses = postgreSQLConnector.getAll();
for(AggregationStatus aggregationStatus : aggregationStatuses) {
analyseAggregationStatus(aggregationStatus);
}
@@ -51,13 +50,13 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test
public void getLastTest() throws Exception {
- AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
+ AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@Test
public void getUnterminatedTest() throws Exception{
- List aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
+ List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@@ -68,7 +67,7 @@ public class CouchBaseConnectorTest extends ContextTest {
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31);
- AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
+ AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@@ -77,7 +76,7 @@ public class CouchBaseConnectorTest extends ContextTest {
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1);
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30);
- List aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
+ List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
@@ -86,7 +85,7 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test
public void getAggregationStatusTest() throws Exception{
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);
- AggregationStatus aggregationStatus = CouchBaseConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime());
+ AggregationStatus aggregationStatus = postgreSQLConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime());
logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
}
diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java
index 6f04e71..2293382 100644
--- a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java
@@ -19,6 +19,7 @@ import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
+import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.common.storagehub.client.dsl.ContainerType;
import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.gcube.common.storagehub.client.dsl.ItemContainer;
@@ -26,6 +27,7 @@ import org.gcube.common.storagehub.client.dsl.ListResolver;
import org.gcube.common.storagehub.client.dsl.StorageHubClient;
import org.gcube.common.storagehub.model.items.Item;
import org.gcube.documentstore.records.AggregatedRecord;
+import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.junit.Ignore;
@@ -33,8 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.document.json.JsonObject;
-
public class MyTest {
private static Logger logger = LoggerFactory.getLogger(Elaborator.class);
@@ -66,17 +66,17 @@ public class MyTest {
String jsonString = "{\"operationCount\":41.0,\"creationTime\":1.454284803916E12,\"consumerId\":\"wps.statisticalmanager\",\"resourceOwner\":\"wps.statisticalmanager\",\"recordType\":\"SingleStorageUsageRecord\",\"dataType\":\"STORAGE\",\"_rev\":\"1-05c467553141723f51dad0fa1aab2ce0\",\"resourceURI\":\"56aea0025caf8b5b69c0071e\",\"providerURI\":\"data.d4science.org\",\"resourceScope\":\"/d4science.research-infrastructures.eu\",\"dataVolume\":95807.0,\"scope\":\"/d4science.research-infrastructures.eu\",\"startTime\":1.454284803059E12,\"operationType\":\"CREATE\",\"endTime\":1.454284803059E12,\"id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"_id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"operationResult\":\"SUCCESS\"};";
- JsonObject content = JsonObject.fromJson(jsonString);
+ ObjectNode content = (ObjectNode) DSMapper.asJsonNode(jsonString);
- if(content.containsKey(USAGE_RECORD_TYPE)){
- String recordType = content.getString(USAGE_RECORD_TYPE);
- content.removeKey(USAGE_RECORD_TYPE);
+ if(content.has(USAGE_RECORD_TYPE)){
+ String recordType = content.get(USAGE_RECORD_TYPE).asText();
+ content.remove(USAGE_RECORD_TYPE);
content.put(Record.RECORD_TYPE, recordType);
}
Boolean aggregated = false;
- if(content.containsKey(AggregatedRecord.CREATION_TIME)) {
+ if(content.has(AggregatedRecord.CREATION_TIME)) {
Object object = content.get(AggregatedRecord.CREATION_TIME);
if(object instanceof Double) {
Double d = ((Double) object);
@@ -84,7 +84,7 @@ public class MyTest {
}
}
- if(content.containsKey(AggregatedRecord.START_TIME)) {
+ if(content.has(AggregatedRecord.START_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.START_TIME);
if(object instanceof Double) {
@@ -93,7 +93,7 @@ public class MyTest {
}
}
- if(content.containsKey(AggregatedRecord.END_TIME)) {
+ if(content.has(AggregatedRecord.END_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.END_TIME);
if(object instanceof Double) {
@@ -102,14 +102,14 @@ public class MyTest {
}
}
- if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) {
+ if(content.has(AggregatedRecord.OPERATION_COUNT)) {
Object object = content.get(AggregatedRecord.OPERATION_COUNT);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.OPERATION_COUNT, d.intValue());
}
- if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) {
+ if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) {
aggregated = true;
}
}
@@ -118,7 +118,7 @@ public class MyTest {
content.put(AggregatedRecord.AGGREGATED, true);
}
- String recordType = content.getString(Record.RECORD_TYPE);
+ String recordType = content.get(Record.RECORD_TYPE).asText();
if(!aggregated){
if(recordType.startsWith(SIMPLE)){
diff --git a/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java b/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java
index 0eb4b94..74f332e 100644
--- a/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java
+++ b/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java
@@ -13,6 +13,7 @@ import org.gcube.accounting.aggregator.aggregation.AggregatorBuffer;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
+import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
@@ -20,8 +21,6 @@ import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.couchbase.client.java.document.json.JsonObject;
-
/**
* @author Luca Frosini (ISTI - CNR)
*/
@@ -96,8 +95,8 @@ public class RecoverOriginalRecords {
}
protected void elaborateLine(String line) throws Exception {
- JsonObject jsonObject = JsonObject.fromJson(line);
- String id = jsonObject.getString(ID);
+ JsonNode jsonNode = DSMapper.asJsonNode(line);
+ String id = jsonNode.get(ID).asText();
if(uniqueRecords.containsKey(id)){
logger.trace("Duplicated Original Record with ID {}", id);
Utility.printLine(duplicatedFile, line);
@@ -122,8 +121,8 @@ public class RecoverOriginalRecords {
List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord);
- JsonObject jsonObject = JsonObject.fromJson(marshalled);
- Utility.printLine(aggregatedFile, jsonObject.toString());
+ JsonNode jsonNode = DSMapper.asJsonNode(marshalled);
+ Utility.printLine(aggregatedFile, jsonNode.toString());
aggregated++;
}
}