Migrating aggregator completely to TimescaleDB

This commit is contained in:
Luca Frosini 2021-05-06 13:13:30 +02:00
parent bf5acae417
commit 508f599729
17 changed files with 161 additions and 640 deletions

View File

@ -44,17 +44,10 @@
<groupId>org.gcube.vremanagement</groupId> <groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-api</artifactId> <artifactId>smart-executor-api</artifactId>
</dependency> </dependency>
<!-- Document Store and Accounting libraries -->
<dependency> <dependency>
<groupId>org.gcube.accounting</groupId> <groupId>org.gcube.accounting</groupId>
<artifactId>accounting-lib</artifactId> <artifactId>accounting-lib</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.7.11</version>
</dependency>
<!-- END Document Store and Accounting libraries -->
<dependency> <dependency>
<groupId>org.gcube.common</groupId> <groupId>org.gcube.common</groupId>
<artifactId>storagehub-client-library</artifactId> <artifactId>storagehub-client-library</artifactId>

View File

@ -1,6 +1,7 @@
package org.gcube.accounting.aggregator.aggregation; package org.gcube.accounting.aggregator.aggregation;
import java.io.File; import java.io.File;
import java.sql.ResultSet;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
@ -15,6 +16,8 @@ import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.com.fasterxml.jackson.databind.ObjectMapper;
import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
@ -23,14 +26,6 @@ import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.view.ViewQuery;
import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -41,77 +36,49 @@ public class Aggregator {
private static final String TMP_SUFFIX = ".tmp"; private static final String TMP_SUFFIX = ".tmp";
protected final AggregationStatus aggregationStatus; protected final AggregationStatus aggregationStatus;
protected final Bucket bucket;
protected final File originalRecordsbackupFile; protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile; protected final File aggregateRecordsBackupFile;
protected final File malformedRecordsFile; protected final File malformedRecordsFile;
protected int malformedRecordNumber; protected int malformedRecordNumber;
protected ObjectMapper objectMapper;
protected Calendar startTime; protected Calendar startTime;
public Aggregator(AggregationStatus aggregationStatus, Bucket bucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
this.aggregationStatus = aggregationStatus; this.aggregationStatus = aggregationStatus;
this.bucket = bucket;
this.originalRecordsbackupFile = originalRecordsbackupFile; this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile; this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile); this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile);
this.objectMapper = new ObjectMapper();
} }
public void aggregate() throws Exception { public void aggregate() throws Exception {
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
startTime = Utility.getUTCCalendarInstance(); startTime = Utility.getUTCCalendarInstance();
ViewResult viewResult = getViewResult();
retrieveAndAggregate(viewResult); // TODO query
ResultSet resultSet = null;
retrieveAndAggregate(resultSet);
} }
} }
/**
* Generate a key for map-reduce
* @param key
* @return JsonArray containing the map reduce key
*/
protected JsonArray generateKey(String key){
JsonArray arrayKey = JsonArray.create();
for (String value : key.split("/")){
if (!value.toString().isEmpty()){
arrayKey.add(Integer.parseInt(value));
}
}
return arrayKey;
} protected ResultSet getViewResult() throws Exception {
protected ViewResult getViewResult() throws Exception {
DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat(); DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat();
String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate()); String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate());
String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate()); String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate());
JsonArray startKey = generateKey(dateStartKey);
JsonArray endKey = generateKey(dateEndKey); // TODO query here
DesignID designid = DesignID.valueOf(bucket.name()); return null;
String designDocId = designid.getDesignName();
String viewName = designid.getViewName();
ViewQuery query = ViewQuery.from(designDocId, viewName);
query.startKey(startKey);
query.endKey(endKey);
query.reduce(false);
query.inclusiveEnd(false);
logger.debug("View Query: designDocId:{} - viewName:{}, startKey:{} - endKey:{} ",
designDocId, viewName, startKey, endKey);
try {
return bucket.query(query);
} catch (Exception e) {
logger.error("Exception error VIEW", e.getLocalizedMessage(), e);
throw e;
}
} }
private static final String USAGE_RECORD_TYPE = "usageRecordType"; private static final String USAGE_RECORD_TYPE = "usageRecordType";
@ -119,19 +86,18 @@ public class Aggregator {
private static final String SIMPLE = "Simple"; private static final String SIMPLE = "Simple";
protected int elaborateRow(ViewRow row, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
try { try {
JsonObject content = row.document().content();
if(content.has(USAGE_RECORD_TYPE)){
if(content.containsKey(USAGE_RECORD_TYPE)){ String recordType = content.get(USAGE_RECORD_TYPE).asText();
String recordType = content.getString(USAGE_RECORD_TYPE); content.remove(USAGE_RECORD_TYPE);
content.removeKey(USAGE_RECORD_TYPE);
content.put(Record.RECORD_TYPE, recordType); content.put(Record.RECORD_TYPE, recordType);
} }
Boolean aggregated = false; Boolean aggregated = false;
if(content.containsKey(AggregatedRecord.CREATION_TIME)) { if(content.has(AggregatedRecord.CREATION_TIME)) {
Object object = content.get(AggregatedRecord.CREATION_TIME); Object object = content.get(AggregatedRecord.CREATION_TIME);
if(object instanceof Double) { if(object instanceof Double) {
Double d = ((Double) object); Double d = ((Double) object);
@ -139,7 +105,7 @@ public class Aggregator {
} }
} }
if(content.containsKey(AggregatedRecord.START_TIME)) { if(content.has(AggregatedRecord.START_TIME)) {
aggregated = true; aggregated = true;
Object object = content.get(AggregatedRecord.START_TIME); Object object = content.get(AggregatedRecord.START_TIME);
if(object instanceof Double) { if(object instanceof Double) {
@ -148,7 +114,7 @@ public class Aggregator {
} }
} }
if(content.containsKey(AggregatedRecord.END_TIME)) { if(content.has(AggregatedRecord.END_TIME)) {
aggregated = true; aggregated = true;
Object object = content.get(AggregatedRecord.END_TIME); Object object = content.get(AggregatedRecord.END_TIME);
if(object instanceof Double) { if(object instanceof Double) {
@ -157,14 +123,14 @@ public class Aggregator {
} }
} }
if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) { if(content.has(AggregatedRecord.OPERATION_COUNT)) {
Object object = content.get(AggregatedRecord.OPERATION_COUNT); Object object = content.get(AggregatedRecord.OPERATION_COUNT);
if(object instanceof Double) { if(object instanceof Double) {
Double d = ((Double) object); Double d = ((Double) object);
content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue());
} }
if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) { if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) {
aggregated = true; aggregated = true;
} }
} }
@ -173,7 +139,7 @@ public class Aggregator {
content.put(AggregatedRecord.AGGREGATED, true); content.put(AggregatedRecord.AGGREGATED, true);
} }
String recordType = content.getString(Record.RECORD_TYPE); String recordType = content.get(Record.RECORD_TYPE).asText();
if(!aggregated){ if(!aggregated){
if(recordType.startsWith(SIMPLE)){ if(recordType.startsWith(SIMPLE)){
@ -221,7 +187,7 @@ public class Aggregator {
private static final int MAX_RETRY = 3; private static final int MAX_RETRY = 3;
protected void retrieveAndAggregate(ViewResult viewResult) throws Exception { protected void retrieveAndAggregate(ResultSet resultSet) throws Exception {
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
Calendar start = Utility.getUTCCalendarInstance(); Calendar start = Utility.getUTCCalendarInstance();
@ -233,15 +199,19 @@ public class Aggregator {
malformedRecordNumber = 0; malformedRecordNumber = 0;
int originalRecordsCounter = 0; int originalRecordsCounter = 0;
for (ViewRow row : viewResult) { while (resultSet.next()) {
for(int i=1; i<=MAX_RETRY; i++){ for(int i=1; i<=MAX_RETRY; i++){
try { try {
originalRecordsCounter = elaborateRow(row, aggregatorBuffer, originalRecordsCounter); ObjectNode content = objectMapper.createObjectNode();
// todo set data from resultset
originalRecordsCounter = elaborateRow(content, aggregatorBuffer, originalRecordsCounter);
TimeUnit.MILLISECONDS.sleep(3); TimeUnit.MILLISECONDS.sleep(3);
break; break;
}catch (RuntimeException e) { }catch (RuntimeException e) {
if(i==2){ if(i==2){
logger.error("Unable to elaborate {} {}. Tryed {} times.", ViewRow.class.getSimpleName(), row, i, e); logger.error("Unable to elaborate row {}. Tryed {} times.", i, e);
} }
} }
} }
@ -266,8 +236,7 @@ public class Aggregator {
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) { for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord); String marshalled = DSMapper.marshal(aggregatedRecord);
JsonObject jsonObject = JsonObject.fromJson(marshalled); Utility.printLine(aggregateRecordsBackupFileTmp, marshalled);
Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString());
} }
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
@ -312,11 +281,4 @@ public class Aggregator {
aggregatorBuffer.aggregate(aggregatedRecord); aggregatorBuffer.aggregate(aggregatedRecord);
} }
protected JsonDocument getJsonDocument(ViewRow row) {
String identifier = (String) row.document().content().get("id");
JsonDocument jsonDocument = JsonDocument.create(identifier, row.document().content());
logger.trace("{}", jsonDocument.toString());
return jsonDocument;
}
} }

View File

@ -4,11 +4,9 @@ import java.util.Date;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,26 +15,24 @@ public class AggregatorManager {
private static Logger logger = LoggerFactory.getLogger(AggregatorManager.class); private static Logger logger = LoggerFactory.getLogger(AggregatorManager.class);
public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager";
protected final AggregationType aggregationType; protected final AggregationType aggregationType;
protected Date aggregationStartDate; protected Date aggregationStartDate;
protected Date aggregationEndDate; protected Date aggregationEndDate;
protected final boolean restartFromLastAggregationDate; protected final boolean restartFromLastAggregationDate;
protected boolean forceEarlyAggregation; protected boolean forceEarlyAggregation;
protected boolean forceRerun; protected boolean forceRerun;
protected boolean forceRestart; protected boolean forceRestart;
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate, public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
Date aggregationStartDate, Date aggregationEndDate) throws Exception { Date aggregationStartDate, Date aggregationEndDate) throws Exception {
this.aggregationType = aggregationType; this.aggregationType = aggregationType;
this.aggregationStartDate = Utility.sanitizeDate(aggregationType, aggregationStartDate); this.aggregationStartDate = Utility.sanitizeDate(aggregationType, aggregationStartDate);
this.aggregationEndDate = aggregationEndDate; this.aggregationEndDate = aggregationEndDate;
this.restartFromLastAggregationDate = restartFromLastAggregationDate; this.restartFromLastAggregationDate = restartFromLastAggregationDate;
this.forceEarlyAggregation = false; this.forceEarlyAggregation = false;
this.forceRerun = false; this.forceRerun = false;
@ -54,7 +50,7 @@ public class AggregatorManager {
public void setForceRestart(boolean forceRestart) { public void setForceRestart(boolean forceRestart) {
this.forceRestart = forceRestart; this.forceRestart = forceRestart;
} }
protected Date getEndDateFromStartDate() { protected Date getEndDateFromStartDate() {
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1); return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
} }
@ -67,56 +63,41 @@ public class AggregatorManager {
return aggregationStatus; return aggregationStatus;
} }
public void elaborate(Date persistStartTime, Date persistEndTime, Class<? extends UsageRecord> usageRecordClass) public void elaborate(Date persistStartTime, Date persistEndTime, String recordType)
throws Exception { throws Exception {
AggregationStatus aggregationStatus = null;
if (restartFromLastAggregationDate) {
AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType,
aggregationStartDate, aggregationEndDate);
CouchBaseConnector couchBaseConnector = CouchBaseConnector.getInstance(); // I don't check if this aggregation is COMPLETED because this
// is responsibility of Recovery Process
for (String recordType : couchBaseConnector.getRecordTypes()) { if (lastAggregationStatus != null) {
this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate();
if (usageRecordClass != null && usageRecordClass.newInstance().getRecordType().compareTo(recordType) != 0) { logger.info("Last got AggregationStatus is {}. Restarting from {}",
continue; DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus),
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate));
} }
if (recordType.compareTo(ACCOUNTING_MANAGER_BUCKET_NAME) == 0) {
continue;
}
AggregationStatus aggregationStatus = null;
if (restartFromLastAggregationDate) {
AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType,
aggregationStartDate, aggregationEndDate);
// I don't check if this aggregation is COMPLETED because this
// is responsibility of Recovery Process
if (lastAggregationStatus != null) {
this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate();
logger.info("Last got AggregationStatus is {}. Restarting from {}",
DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus),
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate));
}
}
aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType,
aggregationStartDate);
if (aggregationStatus == null) {
aggregationStatus = createAggregationStatus(recordType);
}
if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
logger.info("Start Date {} is after provided End Date {}. Please check input parameters.",
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
return;
}
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
} }
aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType, aggregationStartDate);
if (aggregationStatus == null) {
aggregationStatus = createAggregationStatus(recordType);
}
if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) {
logger.info("Start Date {} is after provided End Date {}. Please check input parameters.",
Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate),
Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate));
return;
}
Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime);
elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
} }
} }

View File

@ -10,9 +10,6 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.aggregation.Aggregator; import org.gcube.accounting.aggregator.aggregation.Aggregator;
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure; import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
import org.gcube.accounting.aggregator.persist.Persist; import org.gcube.accounting.aggregator.persist.Persist;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector.SUFFIX;
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Constant;
@ -21,8 +18,6 @@ import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -167,14 +162,13 @@ public class Elaborator {
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure(); FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate); File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src); // Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
// Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst); // Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType); File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile); File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
Aggregator aggregator = new Aggregator(aggregationStatus, srcBucket, originalRecordsbackupFile, Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile);
aggregateRecordsBackupFile);
aggregator.aggregate(); aggregator.aggregate();
@ -186,7 +180,7 @@ public class Elaborator {
*/ */
// if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) { // if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
// Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); // Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover(); persist.recover();
/* /*
}else{ }else{

View File

@ -4,7 +4,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,7 +28,10 @@ public class RecoveryManager {
} }
public void recovery() throws Exception { public void recovery() throws Exception {
List<AggregationStatus> aggregationStatusList = CouchBaseConnector.getUnterminated(aggregationStartDate, aggregationEndDate); PostgreSQLConnector postgreSQLConnector = new PostgreSQLConnector();
// TODO
List<AggregationStatus> aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate);
if(aggregationStatusList.size()==0){ if(aggregationStatusList.size()==0){
logger.info("Nothing to recover :)"); logger.info("Nothing to recover :)");
} }

View File

@ -1,35 +1,26 @@
package org.gcube.accounting.aggregator.persist; package org.gcube.accounting.aggregator.persist;
import java.io.File; import java.io.File;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.java.Bucket; import org.gcube.documentstore.records.DSMapper;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
public class DeleteDocument extends DocumentElaboration { public class DeleteDocument extends DocumentElaboration {
public DeleteDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ public DeleteDocument(AggregationStatus aggregationStatus, File file){
super(aggregationStatus, AggregationState.DELETED, file, bucket, aggregationStatus.getOriginalRecordsNumber()); super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber());
} }
@Override @Override
protected void elaborateLine(String line) throws Exception { protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = JsonObject.fromJson(line); JsonNode jsonNode = DSMapper.asJsonNode(line);
String id = jsonObject.getString(ID); String id = jsonNode.get(ID).asText();
try {
bucket.remove(id, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}catch (DocumentDoesNotExistException e) {
// OK it can happen when the delete procedure were started but was interrupted
}
} }
@Override @Override

View File

@ -15,8 +15,6 @@ import org.gcube.documentstore.records.Record;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -33,19 +31,16 @@ public abstract class DocumentElaboration {
protected final AggregationStatus aggregationStatus; protected final AggregationStatus aggregationStatus;
protected final File file; protected final File file;
protected final Bucket bucket;
protected final AggregationState finalAggregationState; protected final AggregationState finalAggregationState;
protected final int rowToBeElaborated; protected final int rowToBeElaborated;
protected Calendar startTime; protected Calendar startTime;
protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) {
Bucket bucket, int rowToBeElaborated) {
this.aggregationStatus = statusManager; this.aggregationStatus = statusManager;
this.finalAggregationState = finalAggregationState; this.finalAggregationState = finalAggregationState;
this.file = file; this.file = file;
this.bucket = bucket;
this.rowToBeElaborated = rowToBeElaborated; this.rowToBeElaborated = rowToBeElaborated;
} }

View File

@ -1,6 +1,7 @@
package org.gcube.accounting.aggregator.persist; package org.gcube.accounting.aggregator.persist;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -13,12 +14,13 @@ import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.com.fasterxml.jackson.core.JsonProcessingException;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility; import org.gcube.documentstore.records.RecordUtility;
import com.couchbase.client.java.document.json.JsonObject;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -47,7 +49,7 @@ public class InsertDocument extends DocumentElaboration {
// public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ // public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{ public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{
// super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber()); // super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber()); super(aggregationStatus, AggregationState.ADDED, file, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false; serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new TreeMap<String,Map<String,Integer>>(); serviceClassName_calledMethods = new TreeMap<String,Map<String,Integer>>();
unparsableLines = new ArrayList<>(); unparsableLines = new ArrayList<>();
@ -61,18 +63,18 @@ public class InsertDocument extends DocumentElaboration {
count = 0; count = 0;
} }
protected String getKey(JsonObject jsonObject) { protected String getKey(JsonNode jsonNode) {
String serviceClass = jsonObject.getString(ServiceUsageRecord.SERVICE_CLASS); String serviceClass = jsonNode.get(ServiceUsageRecord.SERVICE_CLASS).asText();
String serviceName = jsonObject.getString(ServiceUsageRecord.SERVICE_NAME); String serviceName = jsonNode.get(ServiceUsageRecord.SERVICE_NAME).asText();
return serviceClass + "," + serviceName; return serviceClass + "," + serviceName;
} }
protected void addServiceClassName_calledMethods(JsonObject jsonObject) { protected void addServiceClassName_calledMethods(JsonNode jsonNode) {
String key = getKey(jsonObject); String key = getKey(jsonNode);
String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD); String calledMethod = jsonNode.get(ServiceUsageRecord.CALLED_METHOD).asText();
int operationCount = 0; int operationCount = 0;
try { try {
operationCount = jsonObject.getInt(AggregatedServiceUsageRecord.OPERATION_COUNT); operationCount = jsonNode.get(AggregatedServiceUsageRecord.OPERATION_COUNT).asInt();
}catch (Exception e) { }catch (Exception e) {
logger.error("", e); logger.error("", e);
// the record was not an Aggregated ServiceUsageRecord // the record was not an Aggregated ServiceUsageRecord
@ -92,27 +94,27 @@ public class InsertDocument extends DocumentElaboration {
} }
} }
protected JsonObject analyseLine(String line) { protected JsonNode analyseLine(String line) throws JsonProcessingException, IOException {
JsonObject jsonObject = JsonObject.fromJson(line); JsonNode jsonNode = DSMapper.asJsonNode(line);
if(serviceUsageRecordElaboration) { if(serviceUsageRecordElaboration) {
try { try {
addServiceClassName_calledMethods(jsonObject); addServiceClassName_calledMethods(jsonNode);
}catch (Throwable e) { }catch (Throwable e) {
unparsableLines.add(line); unparsableLines.add(line);
} }
} }
return jsonObject; return jsonNode;
} }
@Override @Override
protected void elaborateLine(String line) throws Exception { protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = analyseLine(line); JsonNode jsonNode = analyseLine(line);
/* /*
* String id = jsonObject.getString(ID); * String id = jsonObject.getString(ID);
* JsonDocument jsonDocument = JsonDocument.create(id, jsonObject); * JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
* bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
*/ */
Record record = RecordUtility.getRecord(jsonObject.toString()); Record record = RecordUtility.getRecord(jsonNode.toString());
persistencePostgreSQL.insert(record); persistencePostgreSQL.insert(record);
++count; ++count;

View File

@ -16,8 +16,6 @@ import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -27,7 +25,7 @@ public class Persist {
protected final AggregationStatus aggregationStatus; protected final AggregationStatus aggregationStatus;
protected final Bucket originalRecordBucket; // protected final Bucket originalRecordBucket;
// protected final Bucket aggregatedRecordBucket; // protected final Bucket aggregatedRecordBucket;
protected final File originalRecordsbackupFile; protected final File originalRecordsbackupFile;
@ -40,12 +38,11 @@ public class Persist {
Bucket originalRecordBucket, Bucket aggregatedRecordBucket, Bucket originalRecordBucket, Bucket aggregatedRecordBucket,
File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
*/ */
public Persist(AggregationStatus aggregationStatus, public Persist(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
super(); super();
this.aggregationStatus = aggregationStatus; this.aggregationStatus = aggregationStatus;
this.originalRecordBucket = originalRecordBucket; // this.originalRecordBucket = originalRecordBucket;
// this.aggregatedRecordBucket = aggregatedRecordBucket; // this.aggregatedRecordBucket = aggregatedRecordBucket;
this.originalRecordsbackupFile = originalRecordsbackupFile; this.originalRecordsbackupFile = originalRecordsbackupFile;
@ -94,7 +91,7 @@ public class Persist {
// For Each original row stored on file it remove them from Bucket. // For Each original row stored on file it remove them from Bucket.
// At the end of elaboration set AgrgegationStatus to DELETED // At the end of elaboration set AgrgegationStatus to DELETED
// Then save the file in Workspace and set AgrgegationStatus to COMPLETED // Then save the file in Workspace and set AgrgegationStatus to COMPLETED
DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket); DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile);
deleteDocument.elaborate(); deleteDocument.elaborate();
} }
// InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket); // InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);

View File

@ -3,7 +3,8 @@ package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
/** /**
* @author Alessandro Pieve (ISTI - CNR) * @author Luca Frosini (ISTI-CNR)
* @author Alessandro Pieve (ISTI-CNR)
*/ */
public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration { public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration {

View File

@ -1,408 +0,0 @@
package org.gcube.accounting.aggregator.persistence;
import static com.couchbase.client.java.query.Select.select;
import static com.couchbase.client.java.query.dsl.Expression.s;
import static com.couchbase.client.java.query.dsl.Expression.x;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.query.Statement;
import com.couchbase.client.java.query.dsl.Expression;
import com.couchbase.client.java.query.dsl.Sort;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class CouchBaseConnector {
private static Logger logger = LoggerFactory.getLogger(CouchBaseConnector.class);
public static final long MAX_REQUEST_LIFE_TIME = TimeUnit.SECONDS.toMillis(120);
public static final long KEEP_ALIVE_INTERVAL = TimeUnit.HOURS.toMillis(1);
public static final long AUTO_RELEASE_AFTER = TimeUnit.HOURS.toMillis(1);
public static final long VIEW_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(120);
public static final long CONNECTION_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(15);
public static final long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
private static final String URL_PROPERTY_KEY = "URL";
private static final String PASSWORD_PROPERTY_KEY = "password";
public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager";
/* The environment configuration */
protected static final CouchbaseEnvironment ENV;
protected static final PersistTo PERSIST_TO;
static {
ENV = DefaultCouchbaseEnvironment.builder()
.connectTimeout(CouchBaseConnector.CONNECTION_TIMEOUT)
.maxRequestLifetime(CouchBaseConnector.MAX_REQUEST_LIFE_TIME)
.queryTimeout(CouchBaseConnector.CONNECTION_TIMEOUT)
.viewTimeout(CouchBaseConnector.VIEW_TIMEOUT_BUCKET)
.keepAliveInterval(CouchBaseConnector.KEEP_ALIVE_INTERVAL)
.kvTimeout(5000)
.autoreleaseAfter(CouchBaseConnector.AUTO_RELEASE_AFTER).build();
PERSIST_TO = PersistTo.MASTER;
}
protected static CouchBaseConnector couchBaseConnector;
protected AggregatorPersitenceConfiguration configuration;
protected Cluster cluster;
protected Map<String,Bucket> connectionMap;
protected Map<String, Class<? extends Record>> recordTypeMap;
public synchronized static CouchBaseConnector getInstance() throws Exception{
if(couchBaseConnector==null){
couchBaseConnector = new CouchBaseConnector();
}
return couchBaseConnector;
}
protected CouchBaseConnector() throws Exception {
this.configuration = new AggregatorPersitenceConfiguration(AggregatorPersistence.class);
this.cluster = getCluster();
createConnectionMap();
}
private Cluster getCluster() throws Exception {
try {
String url = configuration.getProperty(URL_PROPERTY_KEY);
return CouchbaseCluster.create(ENV, url);
} catch (Exception e) {
throw e;
}
}
public static enum SUFFIX {
src, dst
};
private static String getBucketKey(String recordType, AggregationType aggregationType, SUFFIX suffix){
return recordType + "-" + aggregationType.name() + "-" + suffix.name();
}
private Map<String,Bucket> createConnectionMap() throws Exception {
connectionMap = new HashMap<>();
recordTypeMap = new HashMap<>();
try {
Bucket b = cluster.openBucket(
ACCOUNTING_MANAGER_BUCKET_NAME,
configuration.getProperty(PASSWORD_PROPERTY_KEY));
connectionMap.put(ACCOUNTING_MANAGER_BUCKET_NAME, b);
}catch (Exception e) {
logger.error("Unable to open Bucket used for Accounting Aggregation Management", e);
throw e;
}
Map<String, Class<? extends Record>> recordClasses = RecordUtility.getRecordClassesFound();
for (Class<? extends Record> recordClass : recordClasses.values()) {
Record recordInstance = recordClass.newInstance();
if (recordInstance instanceof UsageRecord && !(recordInstance instanceof AggregatedUsageRecord<?,?>)) {
String recordType = recordInstance.getRecordType();
recordTypeMap.put(recordType, recordClass);
for(AggregationType aggregationType : AggregationType.values()){
for(SUFFIX suffix : SUFFIX.values()){
logger.debug("Trying to get the Bucket for {} {} {}", suffix, recordType, aggregationType);
String bucketKey = getBucketKey(recordType, aggregationType, suffix);
String bucketName = configuration.getProperty(bucketKey);
logger.debug("Bucket for {} {} {} is {}. Going to open it.", suffix, recordType, aggregationType, bucketName);
try {
Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY));
connectionMap.put(bucketKey, bucket);
}catch (Exception e) {
logger.warn("Unable to open Bucket {} for {} {} {}. This normally means that is not configured.", bucketName, suffix, recordType, aggregationType, recordClass);
}
}
}
}
}
return connectionMap;
}
public Set<String> getConnectionMapKeys(){
return connectionMap.keySet();
}
public Set<String> getRecordTypes(){
return recordTypeMap.keySet();
}
public Bucket getBucket(String recordType, AggregationType aggregationType, SUFFIX suffix){
return connectionMap.get(getBucketKey(recordType, aggregationType, suffix));
}
public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
/*
* SELECT *
* FROM AccountingManager
* WHERE
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
* `aggregationInfo`.`aggregationType` = "DAILY" AND
* `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
* `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
* ORDER BY `aggregationInfo`.`aggregationStartDate` DESC LIMIT 1
*/
Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType));
expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
if(aggregationStartDate!=null){
expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
}
if(aggregationEndDate!=null){
expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate))));
}
Sort sort = Sort.desc(aggregationStartDateField);
Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort).limit(1);
logger.trace("Going to query : {}", statement.toString());
N1qlQueryResult result = bucket.query(statement);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
return null;
}
List<N1qlQueryRow> rows = result.allRows();
if(rows.size()>1){
String error = String.format("More than one Document found for query %. This is really strange and should not occur. Please contact the Administrator.", statement.toString());
logger.error(error);
throw new Exception(error);
}
if(rows.size()==1){
N1qlQueryRow row = rows.get(0);
try {
JsonObject jsonObject = row.value().getObject(bucket.name());
logger.trace("JsonObject : {}", jsonObject.toString());
return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
} catch (Exception e) {
logger.warn("Unable to elaborate result for {}", row.toString());
}
}
return null;
}
public static List<AggregationStatus> getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{
return getUnterminated(null, null, aggregationStartDate, aggregationEndDate);
}
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
/*
* SELECT *
* FROM AccountingManager
* WHERE
* `aggregationState` != "COMPLETED" AND
* `lastUpdateTime` < "2017-07-31 09:31:10.984 +0000" AND
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
* `aggregationInfo`.`aggregationType` = "DAILY" AND
* `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
* `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
*
* ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
*/
Calendar now = Utility.getUTCCalendarInstance();
now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED);
Expression expression = x("`aggregationState`").ne(s(AggregationState.COMPLETED.name()));
expression = expression.and(x("`lastUpdateTime`").lt(s(Constant.DEFAULT_DATE_FORMAT.format(now.getTime()))));
if(recordType!=null){
expression = expression.and(x("`aggregationInfo`.`recordType`").eq(s(recordType)));
}
if(aggregationType!=null){
expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
}
String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
if(aggregationStartDate!=null){
expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
}
if(aggregationEndDate!=null){
expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate))));
}
Sort sort = Sort.asc(aggregationStartDateField);
Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort);
logger.trace("Going to query : {}", statement.toString());
N1qlQueryResult result = bucket.query(statement);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
return null;
}
List<N1qlQueryRow> rows = result.allRows();
List<AggregationStatus> aggregationStatuses = new ArrayList<>(rows.size());
for(N1qlQueryRow row: rows){
try {
JsonObject jsonObject = row.value().getObject(bucket.name());
logger.trace("JsonObject : {}", jsonObject.toString());
AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
aggregationStatuses.add(aggregationStatus);
} catch (Exception e) {
logger.warn("Unable to elaborate result for {}", row.toString());
}
}
return aggregationStatuses;
}
public static List<AggregationStatus> getAll() throws Exception{
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
/*
* SELECT *
* FROM AccountingManager
* ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
*/
String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
Sort sort = Sort.asc(aggregationStartDateField);
Statement statement = select("*").from(bucket.name()).orderBy(sort);
logger.trace("Going to query : {}", statement.toString());
N1qlQueryResult result = bucket.query(statement);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
return null;
}
List<N1qlQueryRow> rows = result.allRows();
List<AggregationStatus> aggregationStatuses = new ArrayList<>(rows.size());
for(N1qlQueryRow row: rows){
try {
JsonObject jsonObject = row.value().getObject(bucket.name());
logger.trace("JsonObject : {}", jsonObject.toString());
AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
aggregationStatuses.add(aggregationStatus);
} catch (Exception e) {
logger.warn("Unable to elaborate result for {}", row.toString());
}
}
return aggregationStatuses;
}
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
/*
* SELECT *
* FROM AccountingManager
* WHERE
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
* `aggregationInfo`.`aggregationType` = "DAILY" AND
* `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000"
*/
Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType));
expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name())));
expression = expression.and(x("`aggregationInfo`.`aggregationStartDate`").eq(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate))));
Statement statement = select("*").from(bucket.name()).where(expression);
logger.trace("Going to query : {}", statement.toString());
N1qlQueryResult result = bucket.query(statement);
if (!result.finalSuccess()) {
logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
return null;
}
List<N1qlQueryRow> rows = result.allRows();
if(rows.size()>1){
String error = String.format("More than one Document found for query %s. This is really strange and should not occur. Please contact the Administrator.", statement.toString());
logger.error(error);
throw new Exception(error);
}
if(rows.size()==1){
N1qlQueryRow row = rows.get(0);
try {
JsonObject jsonObject = row.value().getObject(bucket.name());
logger.trace("JsonObject : {}", jsonObject.toString());
return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
} catch (Exception e) {
logger.warn("Unable to elaborate result for {}", row.toString());
}
}
return null;
}
public static void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception{
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
JsonObject jsonObject = JsonObject.fromJson(DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
JsonDocument jsonDocument = JsonDocument.create(aggregationStatus.getUUID().toString(), jsonObject);
try{
bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}catch (DocumentAlreadyExistsException e) {
// OK it can happen when the insert procedure were started but was interrupted
}
}
}

View File

@ -21,6 +21,9 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.postgresql.core.Utils; import org.postgresql.core.Utils;
/**
* @author Luca Frosini (ISTI-CNR)
*/
public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
private static final String UTC_TIME_ZONE = "UTC"; private static final String UTC_TIME_ZONE = "UTC";
@ -219,7 +222,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
} }
public static List<AggregationStatus> getAll() throws Exception{ public List<AggregationStatus> getAll() throws Exception{
/* /*
* SELECT * * SELECT *
@ -235,7 +238,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
} }
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
/* /*
* SELECT * * SELECT *
@ -246,7 +249,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
* `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000" * `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000"
*/ */
return null; return null;
} }
} }

View File

@ -130,6 +130,7 @@ public class AccountingAggregatorPlugin extends Plugin {
Date persistStartTime = null; Date persistStartTime = null;
Date persistEndTime = null; Date persistEndTime = null;
String recordType = null;
Class<? extends UsageRecord> usageRecordClass = null; Class<? extends UsageRecord> usageRecordClass = null;
boolean forceEarlyAggregation = false; boolean forceEarlyAggregation = false;
@ -192,14 +193,16 @@ public class AccountingAggregatorPlugin extends Plugin {
} }
if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) { if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) {
usageRecordClass = (Class<? extends UsageRecord>) RecordUtility.getRecordClass((String) inputs.get(RECORD_TYPE_INPUT_PARAMETER)); recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER);
usageRecordClass = (Class<? extends UsageRecord>) RecordUtility.getRecordClass(recordType);
logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass);
} }
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate); AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation); aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
aggregatorManager.setForceRerun(forceRerun); aggregatorManager.setForceRerun(forceRerun);
aggregatorManager.setForceRestart(forceRestart); aggregatorManager.setForceRestart(forceRestart);
aggregatorManager.elaborate(persistStartTime, persistEndTime, usageRecordClass); aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType);
break; break;

View File

@ -8,7 +8,7 @@ import java.util.UUID;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
@ -62,20 +62,26 @@ public class AggregationStatus {
@JsonProperty @JsonProperty
protected List<AggregationStateEvent> aggregationStateEvents; protected List<AggregationStateEvent> aggregationStateEvents;
private static PostgreSQLConnector postgreSQLConnector;
static {
// TODO
postgreSQLConnector = null;
}
// Needed for Jackon Unmarshalling // Needed for Jackon Unmarshalling
@SuppressWarnings("unused") protected AggregationStatus(){}
private AggregationStatus(){}
public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
return CouchBaseConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); return postgreSQLConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
} }
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType) throws Exception{ public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType) throws Exception{
return CouchBaseConnector.getUnterminated(recordType, aggregationType, null, null); return postgreSQLConnector.getUnterminated(recordType, aggregationType, null, null);
} }
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
return CouchBaseConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate); return postgreSQLConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate);
} }
public AggregationStatus(AggregationInfo aggregationInfo) throws Exception { public AggregationStatus(AggregationInfo aggregationInfo) throws Exception {
@ -115,7 +121,7 @@ public class AggregationStatus {
aggregationStateEvents.add(aggregationStatusEvent); aggregationStateEvents.add(aggregationStatusEvent);
if(sync){ if(sync){
CouchBaseConnector.upsertAggregationStatus(this); postgreSQLConnector.upsertAggregationStatus(this);
} }
} }
@ -188,7 +194,7 @@ public class AggregationStatus {
public void updateLastUpdateTime(boolean sync) throws Exception { public void updateLastUpdateTime(boolean sync) throws Exception {
this.lastUpdateTime = Utility.getUTCCalendarInstance(); this.lastUpdateTime = Utility.getUTCCalendarInstance();
if(sync){ if(sync){
CouchBaseConnector.upsertAggregationStatus(this); postgreSQLConnector.upsertAggregationStatus(this);
} }
} }

View File

@ -6,7 +6,6 @@ import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
@ -43,7 +42,7 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test @Test
public void insertAllInPostgreSQL() throws Exception { public void insertAllInPostgreSQL() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(ROOT_DEV_SCOPE);
List<AggregationStatus> aggregationStatuses = CouchBaseConnector.getAll(); List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getAll();
for(AggregationStatus aggregationStatus : aggregationStatuses) { for(AggregationStatus aggregationStatus : aggregationStatuses) {
analyseAggregationStatus(aggregationStatus); analyseAggregationStatus(aggregationStatus);
} }
@ -51,13 +50,13 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test @Test
public void getLastTest() throws Exception { public void getLastTest() throws Exception {
AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
@Test @Test
public void getUnterminatedTest() throws Exception{ public void getUnterminatedTest() throws Exception{
List<AggregationStatus> aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
for(AggregationStatus aggregationStatus : aggregationStatuses){ for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
@ -68,7 +67,7 @@ public class CouchBaseConnectorTest extends ContextTest {
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1); Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31);
AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
@ -77,7 +76,7 @@ public class CouchBaseConnectorTest extends ContextTest {
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1); Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1);
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30);
List<AggregationStatus> aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
for(AggregationStatus aggregationStatus : aggregationStatuses){ for(AggregationStatus aggregationStatus : aggregationStatuses){
logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
@ -86,7 +85,7 @@ public class CouchBaseConnectorTest extends ContextTest {
@Test @Test
public void getAggregationStatusTest() throws Exception{ public void getAggregationStatusTest() throws Exception{
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15); Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);
AggregationStatus aggregationStatus = CouchBaseConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime()); AggregationStatus aggregationStatus = postgreSQLConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime());
logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }

View File

@ -19,6 +19,7 @@ import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.common.storagehub.client.dsl.ContainerType; import org.gcube.common.storagehub.client.dsl.ContainerType;
import org.gcube.common.storagehub.client.dsl.FolderContainer; import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.gcube.common.storagehub.client.dsl.ItemContainer; import org.gcube.common.storagehub.client.dsl.ItemContainer;
@ -26,6 +27,7 @@ import org.gcube.common.storagehub.client.dsl.ListResolver;
import org.gcube.common.storagehub.client.dsl.StorageHubClient; import org.gcube.common.storagehub.client.dsl.StorageHubClient;
import org.gcube.common.storagehub.model.items.Item; import org.gcube.common.storagehub.model.items.Item;
import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility; import org.gcube.documentstore.records.RecordUtility;
import org.junit.Ignore; import org.junit.Ignore;
@ -33,8 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.document.json.JsonObject;
public class MyTest { public class MyTest {
private static Logger logger = LoggerFactory.getLogger(Elaborator.class); private static Logger logger = LoggerFactory.getLogger(Elaborator.class);
@ -66,17 +66,17 @@ public class MyTest {
String jsonString = "{\"operationCount\":41.0,\"creationTime\":1.454284803916E12,\"consumerId\":\"wps.statisticalmanager\",\"resourceOwner\":\"wps.statisticalmanager\",\"recordType\":\"SingleStorageUsageRecord\",\"dataType\":\"STORAGE\",\"_rev\":\"1-05c467553141723f51dad0fa1aab2ce0\",\"resourceURI\":\"56aea0025caf8b5b69c0071e\",\"providerURI\":\"data.d4science.org\",\"resourceScope\":\"/d4science.research-infrastructures.eu\",\"dataVolume\":95807.0,\"scope\":\"/d4science.research-infrastructures.eu\",\"startTime\":1.454284803059E12,\"operationType\":\"CREATE\",\"endTime\":1.454284803059E12,\"id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"_id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"operationResult\":\"SUCCESS\"};"; String jsonString = "{\"operationCount\":41.0,\"creationTime\":1.454284803916E12,\"consumerId\":\"wps.statisticalmanager\",\"resourceOwner\":\"wps.statisticalmanager\",\"recordType\":\"SingleStorageUsageRecord\",\"dataType\":\"STORAGE\",\"_rev\":\"1-05c467553141723f51dad0fa1aab2ce0\",\"resourceURI\":\"56aea0025caf8b5b69c0071e\",\"providerURI\":\"data.d4science.org\",\"resourceScope\":\"/d4science.research-infrastructures.eu\",\"dataVolume\":95807.0,\"scope\":\"/d4science.research-infrastructures.eu\",\"startTime\":1.454284803059E12,\"operationType\":\"CREATE\",\"endTime\":1.454284803059E12,\"id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"_id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"operationResult\":\"SUCCESS\"};";
JsonObject content = JsonObject.fromJson(jsonString); ObjectNode content = (ObjectNode) DSMapper.asJsonNode(jsonString);
if(content.containsKey(USAGE_RECORD_TYPE)){ if(content.has(USAGE_RECORD_TYPE)){
String recordType = content.getString(USAGE_RECORD_TYPE); String recordType = content.get(USAGE_RECORD_TYPE).asText();
content.removeKey(USAGE_RECORD_TYPE); content.remove(USAGE_RECORD_TYPE);
content.put(Record.RECORD_TYPE, recordType); content.put(Record.RECORD_TYPE, recordType);
} }
Boolean aggregated = false; Boolean aggregated = false;
if(content.containsKey(AggregatedRecord.CREATION_TIME)) { if(content.has(AggregatedRecord.CREATION_TIME)) {
Object object = content.get(AggregatedRecord.CREATION_TIME); Object object = content.get(AggregatedRecord.CREATION_TIME);
if(object instanceof Double) { if(object instanceof Double) {
Double d = ((Double) object); Double d = ((Double) object);
@ -84,7 +84,7 @@ public class MyTest {
} }
} }
if(content.containsKey(AggregatedRecord.START_TIME)) { if(content.has(AggregatedRecord.START_TIME)) {
aggregated = true; aggregated = true;
Object object = content.get(AggregatedRecord.START_TIME); Object object = content.get(AggregatedRecord.START_TIME);
if(object instanceof Double) { if(object instanceof Double) {
@ -93,7 +93,7 @@ public class MyTest {
} }
} }
if(content.containsKey(AggregatedRecord.END_TIME)) { if(content.has(AggregatedRecord.END_TIME)) {
aggregated = true; aggregated = true;
Object object = content.get(AggregatedRecord.END_TIME); Object object = content.get(AggregatedRecord.END_TIME);
if(object instanceof Double) { if(object instanceof Double) {
@ -102,14 +102,14 @@ public class MyTest {
} }
} }
if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) { if(content.has(AggregatedRecord.OPERATION_COUNT)) {
Object object = content.get(AggregatedRecord.OPERATION_COUNT); Object object = content.get(AggregatedRecord.OPERATION_COUNT);
if(object instanceof Double) { if(object instanceof Double) {
Double d = ((Double) object); Double d = ((Double) object);
content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue());
} }
if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) { if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) {
aggregated = true; aggregated = true;
} }
} }
@ -118,7 +118,7 @@ public class MyTest {
content.put(AggregatedRecord.AGGREGATED, true); content.put(AggregatedRecord.AGGREGATED, true);
} }
String recordType = content.getString(Record.RECORD_TYPE); String recordType = content.get(Record.RECORD_TYPE).asText();
if(!aggregated){ if(!aggregated){
if(recordType.startsWith(SIMPLE)){ if(recordType.startsWith(SIMPLE)){

View File

@ -13,6 +13,7 @@ import org.gcube.accounting.aggregator.aggregation.AggregatorBuffer;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
@ -20,8 +21,6 @@ import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.couchbase.client.java.document.json.JsonObject;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
@ -96,8 +95,8 @@ public class RecoverOriginalRecords {
} }
protected void elaborateLine(String line) throws Exception { protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = JsonObject.fromJson(line); JsonNode jsonNode = DSMapper.asJsonNode(line);
String id = jsonObject.getString(ID); String id = jsonNode.get(ID).asText();
if(uniqueRecords.containsKey(id)){ if(uniqueRecords.containsKey(id)){
logger.trace("Duplicated Original Record with ID {}", id); logger.trace("Duplicated Original Record with ID {}", id);
Utility.printLine(duplicatedFile, line); Utility.printLine(duplicatedFile, line);
@ -122,8 +121,8 @@ public class RecoverOriginalRecords {
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) { for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord); String marshalled = DSMapper.marshal(aggregatedRecord);
JsonObject jsonObject = JsonObject.fromJson(marshalled); JsonNode jsonNode = DSMapper.asJsonNode(marshalled);
Utility.printLine(aggregatedFile, jsonObject.toString()); Utility.printLine(aggregatedFile, jsonNode.toString());
aggregated++; aggregated++;
} }
} }