From 508f59972956cfa935cb76faa74154513a4a24da Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Thu, 6 May 2021 13:13:30 +0200 Subject: [PATCH] Migrating aggregator completely to TimescaleDB --- pom.xml | 7 - .../aggregator/aggregation/Aggregator.java | 116 ++--- .../elaboration/AggregatorManager.java | 87 ++-- .../aggregator/elaboration/Elaborator.java | 12 +- .../elaboration/RecoveryManager.java | 7 +- .../aggregator/persist/DeleteDocument.java | 23 +- .../persist/DocumentElaboration.java | 7 +- .../aggregator/persist/InsertDocument.java | 34 +- .../aggregator/persist/Persist.java | 11 +- .../AggregatorPersitenceConfiguration.java | 3 +- .../persistence/CouchBaseConnector.java | 408 ------------------ .../persistence/PostgreSQLConnector.java | 9 +- .../plugin/AccountingAggregatorPlugin.java | 7 +- .../aggregator/status/AggregationStatus.java | 22 +- .../plugin/CouchBaseConnectorTest.java | 13 +- .../accounting/aggregator/plugin/MyTest.java | 24 +- .../recover/RecoverOriginalRecords.java | 11 +- 17 files changed, 161 insertions(+), 640 deletions(-) delete mode 100644 src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java diff --git a/pom.xml b/pom.xml index 0f3f12f..b73250f 100644 --- a/pom.xml +++ b/pom.xml @@ -44,17 +44,10 @@ org.gcube.vremanagement smart-executor-api - org.gcube.accounting accounting-lib - - com.couchbase.client - java-client - 2.7.11 - - org.gcube.common storagehub-client-library diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index e5782e2..769950b 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -1,6 +1,7 @@ package org.gcube.accounting.aggregator.aggregation; import java.io.File; +import java.sql.ResultSet; import java.text.DateFormat; import java.util.Calendar; import java.util.List; @@ -15,6 +16,8 @@ import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; +import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; +import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; @@ -23,14 +26,6 @@ import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.Bucket; -import com.couchbase.client.java.document.JsonDocument; -import com.couchbase.client.java.document.json.JsonArray; -import com.couchbase.client.java.document.json.JsonObject; -import com.couchbase.client.java.view.ViewQuery; -import com.couchbase.client.java.view.ViewResult; -import com.couchbase.client.java.view.ViewRow; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -41,77 +36,49 @@ public class Aggregator { private static final String TMP_SUFFIX = ".tmp"; protected final AggregationStatus aggregationStatus; - protected final Bucket bucket; protected final File originalRecordsbackupFile; protected final File aggregateRecordsBackupFile; protected final File malformedRecordsFile; protected int malformedRecordNumber; + protected ObjectMapper objectMapper; + protected Calendar startTime; - public Aggregator(AggregationStatus aggregationStatus, Bucket bucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { + public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { this.aggregationStatus = aggregationStatus; - this.bucket = bucket; this.originalRecordsbackupFile = originalRecordsbackupFile; this.aggregateRecordsBackupFile = aggregateRecordsBackupFile; this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile); + this.objectMapper = new ObjectMapper(); + } public void aggregate() throws Exception { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); - ViewResult viewResult = getViewResult(); - retrieveAndAggregate(viewResult); + + // TODO query + ResultSet resultSet = null; + + retrieveAndAggregate(resultSet); } } - /** - * Generate a key for map-reduce - * @param key - * @return JsonArray containing the map reduce key - */ - protected JsonArray generateKey(String key){ - JsonArray arrayKey = JsonArray.create(); - for (String value : key.split("/")){ - if (!value.toString().isEmpty()){ - arrayKey.add(Integer.parseInt(value)); - } - } - return arrayKey; - } - - protected ViewResult getViewResult() throws Exception { + protected ResultSet getViewResult() throws Exception { DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat(); String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate()); String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate()); - JsonArray startKey = generateKey(dateStartKey); - JsonArray endKey = generateKey(dateEndKey); - - DesignID designid = DesignID.valueOf(bucket.name()); - String designDocId = designid.getDesignName(); - - String viewName = designid.getViewName(); - - ViewQuery query = ViewQuery.from(designDocId, viewName); - query.startKey(startKey); - query.endKey(endKey); - query.reduce(false); - query.inclusiveEnd(false); - - logger.debug("View Query: designDocId:{} - viewName:{}, startKey:{} - endKey:{} ", - designDocId, viewName, startKey, endKey); - - try { - return bucket.query(query); - } catch (Exception e) { - logger.error("Exception error VIEW", e.getLocalizedMessage(), e); - throw e; - } + + // TODO query here + + return null; + } private static final String USAGE_RECORD_TYPE = "usageRecordType"; @@ -119,19 +86,18 @@ public class Aggregator { private static final String SIMPLE = "Simple"; - protected int elaborateRow(ViewRow row, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { + protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { try { - JsonObject content = row.document().content(); - - if(content.containsKey(USAGE_RECORD_TYPE)){ - String recordType = content.getString(USAGE_RECORD_TYPE); - content.removeKey(USAGE_RECORD_TYPE); + + if(content.has(USAGE_RECORD_TYPE)){ + String recordType = content.get(USAGE_RECORD_TYPE).asText(); + content.remove(USAGE_RECORD_TYPE); content.put(Record.RECORD_TYPE, recordType); } Boolean aggregated = false; - if(content.containsKey(AggregatedRecord.CREATION_TIME)) { + if(content.has(AggregatedRecord.CREATION_TIME)) { Object object = content.get(AggregatedRecord.CREATION_TIME); if(object instanceof Double) { Double d = ((Double) object); @@ -139,7 +105,7 @@ public class Aggregator { } } - if(content.containsKey(AggregatedRecord.START_TIME)) { + if(content.has(AggregatedRecord.START_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.START_TIME); if(object instanceof Double) { @@ -148,7 +114,7 @@ public class Aggregator { } } - if(content.containsKey(AggregatedRecord.END_TIME)) { + if(content.has(AggregatedRecord.END_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.END_TIME); if(object instanceof Double) { @@ -157,14 +123,14 @@ public class Aggregator { } } - if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) { + if(content.has(AggregatedRecord.OPERATION_COUNT)) { Object object = content.get(AggregatedRecord.OPERATION_COUNT); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); } - if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) { + if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) { aggregated = true; } } @@ -173,7 +139,7 @@ public class Aggregator { content.put(AggregatedRecord.AGGREGATED, true); } - String recordType = content.getString(Record.RECORD_TYPE); + String recordType = content.get(Record.RECORD_TYPE).asText(); if(!aggregated){ if(recordType.startsWith(SIMPLE)){ @@ -221,7 +187,7 @@ public class Aggregator { private static final int MAX_RETRY = 3; - protected void retrieveAndAggregate(ViewResult viewResult) throws Exception { + protected void retrieveAndAggregate(ResultSet resultSet) throws Exception { AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); Calendar start = Utility.getUTCCalendarInstance(); @@ -233,15 +199,19 @@ public class Aggregator { malformedRecordNumber = 0; int originalRecordsCounter = 0; - for (ViewRow row : viewResult) { + while (resultSet.next()) { for(int i=1; i<=MAX_RETRY; i++){ try { - originalRecordsCounter = elaborateRow(row, aggregatorBuffer, originalRecordsCounter); + ObjectNode content = objectMapper.createObjectNode(); + + // todo set data from resultset + + originalRecordsCounter = elaborateRow(content, aggregatorBuffer, originalRecordsCounter); TimeUnit.MILLISECONDS.sleep(3); break; }catch (RuntimeException e) { if(i==2){ - logger.error("Unable to elaborate {} {}. Tryed {} times.", ViewRow.class.getSimpleName(), row, i, e); + logger.error("Unable to elaborate row {}. Tryed {} times.", i, e); } } } @@ -266,8 +236,7 @@ public class Aggregator { List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); for (AggregatedRecord aggregatedRecord : aggregatedRecords) { String marshalled = DSMapper.marshal(aggregatedRecord); - JsonObject jsonObject = JsonObject.fromJson(marshalled); - Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString()); + Utility.printLine(aggregateRecordsBackupFileTmp, marshalled); } aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); @@ -312,11 +281,4 @@ public class Aggregator { aggregatorBuffer.aggregate(aggregatedRecord); } - protected JsonDocument getJsonDocument(ViewRow row) { - String identifier = (String) row.document().content().get("id"); - JsonDocument jsonDocument = JsonDocument.create(identifier, row.document().content()); - logger.trace("{}", jsonDocument.toString()); - return jsonDocument; - } - } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java index 5bfd764..f74b73e 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/AggregatorManager.java @@ -4,11 +4,9 @@ import java.util.Date; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; -import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.documentstore.records.DSMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,26 +15,24 @@ public class AggregatorManager { private static Logger logger = LoggerFactory.getLogger(AggregatorManager.class); - public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager"; - protected final AggregationType aggregationType; protected Date aggregationStartDate; protected Date aggregationEndDate; - + protected final boolean restartFromLastAggregationDate; protected boolean forceEarlyAggregation; protected boolean forceRerun; protected boolean forceRestart; - + public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate, Date aggregationStartDate, Date aggregationEndDate) throws Exception { this.aggregationType = aggregationType; this.aggregationStartDate = Utility.sanitizeDate(aggregationType, aggregationStartDate); - + this.aggregationEndDate = aggregationEndDate; - + this.restartFromLastAggregationDate = restartFromLastAggregationDate; this.forceEarlyAggregation = false; this.forceRerun = false; @@ -54,7 +50,7 @@ public class AggregatorManager { public void setForceRestart(boolean forceRestart) { this.forceRestart = forceRestart; } - + protected Date getEndDateFromStartDate() { return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1); } @@ -67,56 +63,41 @@ public class AggregatorManager { return aggregationStatus; } - public void elaborate(Date persistStartTime, Date persistEndTime, Class usageRecordClass) + public void elaborate(Date persistStartTime, Date persistEndTime, String recordType) throws Exception { + + AggregationStatus aggregationStatus = null; + if (restartFromLastAggregationDate) { + AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType, + aggregationStartDate, aggregationEndDate); - CouchBaseConnector couchBaseConnector = CouchBaseConnector.getInstance(); - - for (String recordType : couchBaseConnector.getRecordTypes()) { - - if (usageRecordClass != null && usageRecordClass.newInstance().getRecordType().compareTo(recordType) != 0) { - continue; + // I don't check if this aggregation is COMPLETED because this + // is responsibility of Recovery Process + if (lastAggregationStatus != null) { + this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate(); + logger.info("Last got AggregationStatus is {}. Restarting from {}", + DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus), + Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate)); } - if (recordType.compareTo(ACCOUNTING_MANAGER_BUCKET_NAME) == 0) { - continue; - } - - AggregationStatus aggregationStatus = null; - if (restartFromLastAggregationDate) { - AggregationStatus lastAggregationStatus = AggregationStatus.getLast(recordType, aggregationType, - aggregationStartDate, aggregationEndDate); - - // I don't check if this aggregation is COMPLETED because this - // is responsibility of Recovery Process - if (lastAggregationStatus != null) { - this.aggregationStartDate = lastAggregationStatus.getAggregationInfo().getAggregationEndDate(); - logger.info("Last got AggregationStatus is {}. Restarting from {}", - DSMapper.getObjectMapper().writeValueAsString(lastAggregationStatus), - Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate)); - } - - } - - aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType, - aggregationStartDate); - - if (aggregationStatus == null) { - aggregationStatus = createAggregationStatus(recordType); - } - - if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) { - logger.info("Start Date {} is after provided End Date {}. Please check input parameters.", - Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate), - Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate)); - return; - } - - Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime); - elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart); - } + aggregationStatus = AggregationStatus.getAggregationStatus(recordType, aggregationType, aggregationStartDate); + + if (aggregationStatus == null) { + aggregationStatus = createAggregationStatus(recordType); + } + + if (aggregationEndDate != null && aggregationStartDate.after(aggregationEndDate)) { + logger.info("Start Date {} is after provided End Date {}. Please check input parameters.", + Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate), + Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate)); + return; + } + + Elaborator elaborator = new Elaborator(aggregationStatus, persistStartTime, persistEndTime); + elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart); + } } diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java index a42d95e..a3e4056 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java @@ -10,9 +10,6 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.Aggregator; import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure; import org.gcube.accounting.aggregator.persist.Persist; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector.SUFFIX; -import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; @@ -21,8 +18,6 @@ import org.gcube.documentstore.records.DSMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.Bucket; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -167,14 +162,13 @@ public class Elaborator { FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure(); File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate); - Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src); + // Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src); // Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst); File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType); File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile); - Aggregator aggregator = new Aggregator(aggregationStatus, srcBucket, originalRecordsbackupFile, - aggregateRecordsBackupFile); + Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile); aggregator.aggregate(); @@ -186,7 +180,7 @@ public class Elaborator { */ // if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) { // Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); - Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); + Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); persist.recover(); /* }else{ diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java index 7548c33..84117f2 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/RecoveryManager.java @@ -4,7 +4,7 @@ import java.util.Date; import java.util.List; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; +import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.documentstore.records.DSMapper; import org.slf4j.Logger; @@ -28,7 +28,10 @@ public class RecoveryManager { } public void recovery() throws Exception { - List aggregationStatusList = CouchBaseConnector.getUnterminated(aggregationStartDate, aggregationEndDate); + PostgreSQLConnector postgreSQLConnector = new PostgreSQLConnector(); + // TODO + + List aggregationStatusList = postgreSQLConnector.getUnterminated(aggregationStartDate, aggregationEndDate); if(aggregationStatusList.size()==0){ logger.info("Nothing to recover :)"); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java index e5322a0..c05ca47 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java @@ -1,35 +1,26 @@ package org.gcube.accounting.aggregator.persist; import java.io.File; -import java.util.concurrent.TimeUnit; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; - -import com.couchbase.client.java.Bucket; -import com.couchbase.client.java.PersistTo; -import com.couchbase.client.java.document.json.JsonObject; -import com.couchbase.client.java.error.DocumentDoesNotExistException; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.documentstore.records.DSMapper; /** * @author Luca Frosini (ISTI - CNR) */ public class DeleteDocument extends DocumentElaboration { - public DeleteDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ - super(aggregationStatus, AggregationState.DELETED, file, bucket, aggregationStatus.getOriginalRecordsNumber()); + public DeleteDocument(AggregationStatus aggregationStatus, File file){ + super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber()); } @Override protected void elaborateLine(String line) throws Exception { - JsonObject jsonObject = JsonObject.fromJson(line); - String id = jsonObject.getString(ID); - try { - bucket.remove(id, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); - }catch (DocumentDoesNotExistException e) { - // OK it can happen when the delete procedure were started but was interrupted - } + JsonNode jsonNode = DSMapper.asJsonNode(line); + String id = jsonNode.get(ID).asText(); + } @Override diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java index 2a40c3c..f087106 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java @@ -15,8 +15,6 @@ import org.gcube.documentstore.records.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.Bucket; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -33,19 +31,16 @@ public abstract class DocumentElaboration { protected final AggregationStatus aggregationStatus; protected final File file; - protected final Bucket bucket; protected final AggregationState finalAggregationState; protected final int rowToBeElaborated; protected Calendar startTime; - protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, - Bucket bucket, int rowToBeElaborated) { + protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) { this.aggregationStatus = statusManager; this.finalAggregationState = finalAggregationState; this.file = file; - this.bucket = bucket; this.rowToBeElaborated = rowToBeElaborated; } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index f25d5a4..da795ba 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -1,6 +1,7 @@ package org.gcube.accounting.aggregator.persist; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -13,12 +14,13 @@ import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; +import org.gcube.com.fasterxml.jackson.core.JsonProcessingException; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.documentstore.persistence.PersistencePostgreSQL; +import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; -import com.couchbase.client.java.document.json.JsonObject; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -47,7 +49,7 @@ public class InsertDocument extends DocumentElaboration { // public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{ // super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber()); - super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber()); + super(aggregationStatus, AggregationState.ADDED, file, aggregationStatus.getAggregatedRecordsNumber()); serviceUsageRecordElaboration = false; serviceClassName_calledMethods = new TreeMap>(); unparsableLines = new ArrayList<>(); @@ -61,18 +63,18 @@ public class InsertDocument extends DocumentElaboration { count = 0; } - protected String getKey(JsonObject jsonObject) { - String serviceClass = jsonObject.getString(ServiceUsageRecord.SERVICE_CLASS); - String serviceName = jsonObject.getString(ServiceUsageRecord.SERVICE_NAME); + protected String getKey(JsonNode jsonNode) { + String serviceClass = jsonNode.get(ServiceUsageRecord.SERVICE_CLASS).asText(); + String serviceName = jsonNode.get(ServiceUsageRecord.SERVICE_NAME).asText(); return serviceClass + "," + serviceName; } - protected void addServiceClassName_calledMethods(JsonObject jsonObject) { - String key = getKey(jsonObject); - String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD); + protected void addServiceClassName_calledMethods(JsonNode jsonNode) { + String key = getKey(jsonNode); + String calledMethod = jsonNode.get(ServiceUsageRecord.CALLED_METHOD).asText(); int operationCount = 0; try { - operationCount = jsonObject.getInt(AggregatedServiceUsageRecord.OPERATION_COUNT); + operationCount = jsonNode.get(AggregatedServiceUsageRecord.OPERATION_COUNT).asInt(); }catch (Exception e) { logger.error("", e); // the record was not an Aggregated ServiceUsageRecord @@ -92,27 +94,27 @@ public class InsertDocument extends DocumentElaboration { } } - protected JsonObject analyseLine(String line) { - JsonObject jsonObject = JsonObject.fromJson(line); + protected JsonNode analyseLine(String line) throws JsonProcessingException, IOException { + JsonNode jsonNode = DSMapper.asJsonNode(line); if(serviceUsageRecordElaboration) { try { - addServiceClassName_calledMethods(jsonObject); + addServiceClassName_calledMethods(jsonNode); }catch (Throwable e) { unparsableLines.add(line); } } - return jsonObject; + return jsonNode; } @Override protected void elaborateLine(String line) throws Exception { - JsonObject jsonObject = analyseLine(line); + JsonNode jsonNode = analyseLine(line); /* * String id = jsonObject.getString(ID); * JsonDocument jsonDocument = JsonDocument.create(id, jsonObject); * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); */ - Record record = RecordUtility.getRecord(jsonObject.toString()); + Record record = RecordUtility.getRecord(jsonNode.toString()); persistencePostgreSQL.insert(record); ++count; diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java index 9d7803e..67cbbd2 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java @@ -16,8 +16,6 @@ import org.gcube.common.storagehub.client.dsl.FolderContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.Bucket; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -27,7 +25,7 @@ public class Persist { protected final AggregationStatus aggregationStatus; - protected final Bucket originalRecordBucket; + // protected final Bucket originalRecordBucket; // protected final Bucket aggregatedRecordBucket; protected final File originalRecordsbackupFile; @@ -40,12 +38,11 @@ public class Persist { Bucket originalRecordBucket, Bucket aggregatedRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { */ - public Persist(AggregationStatus aggregationStatus, - Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { + public Persist(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { super(); this.aggregationStatus = aggregationStatus; - this.originalRecordBucket = originalRecordBucket; + // this.originalRecordBucket = originalRecordBucket; // this.aggregatedRecordBucket = aggregatedRecordBucket; this.originalRecordsbackupFile = originalRecordsbackupFile; @@ -94,7 +91,7 @@ public class Persist { // For Each original row stored on file it remove them from Bucket. // At the end of elaboration set AgrgegationStatus to DELETED // Then save the file in Workspace and set AgrgegationStatus to COMPLETED - DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket); + DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile); deleteDocument.elaborate(); } // InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java index f1dfa2e..28963f5 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java @@ -3,7 +3,8 @@ package org.gcube.accounting.aggregator.persistence; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; /** - * @author Alessandro Pieve (ISTI - CNR) + * @author Luca Frosini (ISTI-CNR) + * @author Alessandro Pieve (ISTI-CNR) */ public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration { diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java deleted file mode 100644 index 4f9bb42..0000000 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/CouchBaseConnector.java +++ /dev/null @@ -1,408 +0,0 @@ -package org.gcube.accounting.aggregator.persistence; - -import static com.couchbase.client.java.query.Select.select; -import static com.couchbase.client.java.query.dsl.Expression.s; -import static com.couchbase.client.java.query.dsl.Expression.x; - -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.status.AggregationState; -import org.gcube.accounting.aggregator.status.AggregationStatus; -import org.gcube.accounting.aggregator.utility.Constant; -import org.gcube.accounting.aggregator.utility.Utility; -import org.gcube.accounting.datamodel.AggregatedUsageRecord; -import org.gcube.accounting.datamodel.UsageRecord; -import org.gcube.documentstore.records.DSMapper; -import org.gcube.documentstore.records.Record; -import org.gcube.documentstore.records.RecordUtility; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.couchbase.client.java.Bucket; -import com.couchbase.client.java.Cluster; -import com.couchbase.client.java.CouchbaseCluster; -import com.couchbase.client.java.PersistTo; -import com.couchbase.client.java.document.JsonDocument; -import com.couchbase.client.java.document.json.JsonObject; -import com.couchbase.client.java.env.CouchbaseEnvironment; -import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; -import com.couchbase.client.java.error.DocumentAlreadyExistsException; -import com.couchbase.client.java.query.N1qlQueryResult; -import com.couchbase.client.java.query.N1qlQueryRow; -import com.couchbase.client.java.query.Statement; -import com.couchbase.client.java.query.dsl.Expression; -import com.couchbase.client.java.query.dsl.Sort; - -/** - * @author Luca Frosini (ISTI - CNR) - */ -public class CouchBaseConnector { - - private static Logger logger = LoggerFactory.getLogger(CouchBaseConnector.class); - - public static final long MAX_REQUEST_LIFE_TIME = TimeUnit.SECONDS.toMillis(120); - public static final long KEEP_ALIVE_INTERVAL = TimeUnit.HOURS.toMillis(1); - public static final long AUTO_RELEASE_AFTER = TimeUnit.HOURS.toMillis(1); - public static final long VIEW_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(120); - public static final long CONNECTION_TIMEOUT_BUCKET = TimeUnit.SECONDS.toMillis(15); - public static final long CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(15); - - private static final String URL_PROPERTY_KEY = "URL"; - private static final String PASSWORD_PROPERTY_KEY = "password"; - - public final static String ACCOUNTING_MANAGER_BUCKET_NAME = "AccountingManager"; - - /* The environment configuration */ - protected static final CouchbaseEnvironment ENV; - protected static final PersistTo PERSIST_TO; - - static { - ENV = DefaultCouchbaseEnvironment.builder() - .connectTimeout(CouchBaseConnector.CONNECTION_TIMEOUT) - .maxRequestLifetime(CouchBaseConnector.MAX_REQUEST_LIFE_TIME) - .queryTimeout(CouchBaseConnector.CONNECTION_TIMEOUT) - .viewTimeout(CouchBaseConnector.VIEW_TIMEOUT_BUCKET) - .keepAliveInterval(CouchBaseConnector.KEEP_ALIVE_INTERVAL) - .kvTimeout(5000) - .autoreleaseAfter(CouchBaseConnector.AUTO_RELEASE_AFTER).build(); - - PERSIST_TO = PersistTo.MASTER; - - } - - protected static CouchBaseConnector couchBaseConnector; - - protected AggregatorPersitenceConfiguration configuration; - protected Cluster cluster; - protected Map connectionMap; - protected Map> recordTypeMap; - - public synchronized static CouchBaseConnector getInstance() throws Exception{ - if(couchBaseConnector==null){ - couchBaseConnector = new CouchBaseConnector(); - } - return couchBaseConnector; - } - - protected CouchBaseConnector() throws Exception { - this.configuration = new AggregatorPersitenceConfiguration(AggregatorPersistence.class); - this.cluster = getCluster(); - createConnectionMap(); - } - - - private Cluster getCluster() throws Exception { - try { - String url = configuration.getProperty(URL_PROPERTY_KEY); - return CouchbaseCluster.create(ENV, url); - } catch (Exception e) { - throw e; - } - } - - public static enum SUFFIX { - src, dst - }; - - private static String getBucketKey(String recordType, AggregationType aggregationType, SUFFIX suffix){ - return recordType + "-" + aggregationType.name() + "-" + suffix.name(); - } - - private Map createConnectionMap() throws Exception { - connectionMap = new HashMap<>(); - recordTypeMap = new HashMap<>(); - - try { - Bucket b = cluster.openBucket( - ACCOUNTING_MANAGER_BUCKET_NAME, - configuration.getProperty(PASSWORD_PROPERTY_KEY)); - connectionMap.put(ACCOUNTING_MANAGER_BUCKET_NAME, b); - }catch (Exception e) { - logger.error("Unable to open Bucket used for Accounting Aggregation Management", e); - throw e; - } - - Map> recordClasses = RecordUtility.getRecordClassesFound(); - for (Class recordClass : recordClasses.values()) { - Record recordInstance = recordClass.newInstance(); - if (recordInstance instanceof UsageRecord && !(recordInstance instanceof AggregatedUsageRecord)) { - String recordType = recordInstance.getRecordType(); - recordTypeMap.put(recordType, recordClass); - - for(AggregationType aggregationType : AggregationType.values()){ - for(SUFFIX suffix : SUFFIX.values()){ - logger.debug("Trying to get the Bucket for {} {} {}", suffix, recordType, aggregationType); - String bucketKey = getBucketKey(recordType, aggregationType, suffix); - String bucketName = configuration.getProperty(bucketKey); - logger.debug("Bucket for {} {} {} is {}. Going to open it.", suffix, recordType, aggregationType, bucketName); - try { - Bucket bucket = cluster.openBucket(bucketName, configuration.getProperty(PASSWORD_PROPERTY_KEY)); - connectionMap.put(bucketKey, bucket); - }catch (Exception e) { - logger.warn("Unable to open Bucket {} for {} {} {}. This normally means that is not configured.", bucketName, suffix, recordType, aggregationType, recordClass); - } - } - } - } - } - - return connectionMap; - } - - public Set getConnectionMapKeys(){ - return connectionMap.keySet(); - } - - public Set getRecordTypes(){ - return recordTypeMap.keySet(); - } - - public Bucket getBucket(String recordType, AggregationType aggregationType, SUFFIX suffix){ - return connectionMap.get(getBucketKey(recordType, aggregationType, suffix)); - } - - public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME); - - /* - * SELECT * - * FROM AccountingManager - * WHERE - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000" - * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000" - * ORDER BY `aggregationInfo`.`aggregationStartDate` DESC LIMIT 1 - */ - - Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType)); - expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name()))); - - - String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`"; - if(aggregationStartDate!=null){ - expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate)))); - } - - if(aggregationEndDate!=null){ - expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate)))); - } - - Sort sort = Sort.desc(aggregationStartDateField); - - Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort).limit(1); - - logger.trace("Going to query : {}", statement.toString()); - - N1qlQueryResult result = bucket.query(statement); - if (!result.finalSuccess()) { - logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); - return null; - } - - List rows = result.allRows(); - - if(rows.size()>1){ - String error = String.format("More than one Document found for query %. This is really strange and should not occur. Please contact the Administrator.", statement.toString()); - logger.error(error); - throw new Exception(error); - } - - if(rows.size()==1){ - N1qlQueryRow row = rows.get(0); - try { - JsonObject jsonObject = row.value().getObject(bucket.name()); - logger.trace("JsonObject : {}", jsonObject.toString()); - return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class); - } catch (Exception e) { - logger.warn("Unable to elaborate result for {}", row.toString()); - } - } - - return null; - } - - public static List getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - return getUnterminated(null, null, aggregationStartDate, aggregationEndDate); - } - - public static List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME); - - /* - * SELECT * - * FROM AccountingManager - * WHERE - * `aggregationState` != "COMPLETED" AND - * `lastUpdateTime` < "2017-07-31 09:31:10.984 +0000" AND - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000" - * `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000" - * - * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC - */ - - Calendar now = Utility.getUTCCalendarInstance(); - now.add(Constant.CALENDAR_FIELD_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED, -Constant.UNIT_TO_SUBSTRACT_TO_CONSIDER_UNTERMINATED); - - Expression expression = x("`aggregationState`").ne(s(AggregationState.COMPLETED.name())); - expression = expression.and(x("`lastUpdateTime`").lt(s(Constant.DEFAULT_DATE_FORMAT.format(now.getTime())))); - - if(recordType!=null){ - expression = expression.and(x("`aggregationInfo`.`recordType`").eq(s(recordType))); - } - - if(aggregationType!=null){ - expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name()))); - } - - String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`"; - if(aggregationStartDate!=null){ - expression = expression.and(x(aggregationStartDateField).gte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate)))); - } - - if(aggregationEndDate!=null){ - expression = expression.and(x(aggregationStartDateField).lte(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationEndDate)))); - } - - Sort sort = Sort.asc(aggregationStartDateField); - - Statement statement = select("*").from(bucket.name()).where(expression).orderBy(sort); - - logger.trace("Going to query : {}", statement.toString()); - - N1qlQueryResult result = bucket.query(statement); - if (!result.finalSuccess()) { - logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); - return null; - } - - List rows = result.allRows(); - List aggregationStatuses = new ArrayList<>(rows.size()); - for(N1qlQueryRow row: rows){ - try { - JsonObject jsonObject = row.value().getObject(bucket.name()); - logger.trace("JsonObject : {}", jsonObject.toString()); - AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class); - aggregationStatuses.add(aggregationStatus); - } catch (Exception e) { - logger.warn("Unable to elaborate result for {}", row.toString()); - } - } - - return aggregationStatuses; - - } - - public static List getAll() throws Exception{ - Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME); - - /* - * SELECT * - * FROM AccountingManager - * ORDER BY `aggregationInfo`.`aggregationStartDate` ASC - */ - - - String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`"; - - Sort sort = Sort.asc(aggregationStartDateField); - - Statement statement = select("*").from(bucket.name()).orderBy(sort); - - logger.trace("Going to query : {}", statement.toString()); - - N1qlQueryResult result = bucket.query(statement); - if (!result.finalSuccess()) { - logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); - return null; - } - - List rows = result.allRows(); - List aggregationStatuses = new ArrayList<>(rows.size()); - for(N1qlQueryRow row: rows){ - try { - JsonObject jsonObject = row.value().getObject(bucket.name()); - logger.trace("JsonObject : {}", jsonObject.toString()); - AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class); - aggregationStatuses.add(aggregationStatus); - } catch (Exception e) { - logger.warn("Unable to elaborate result for {}", row.toString()); - } - } - - return aggregationStatuses; - - } - - - public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ - Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME); - - /* - * SELECT * - * FROM AccountingManager - * WHERE - * `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND - * `aggregationInfo`.`aggregationType` = "DAILY" AND - * `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000" - */ - - Expression expression = x("`aggregationInfo`.`recordType`").eq(s(recordType)); - expression = expression.and(x("`aggregationInfo`.`aggregationType`").eq(s(aggregationType.name()))); - - expression = expression.and(x("`aggregationInfo`.`aggregationStartDate`").eq(s(Constant.DEFAULT_DATE_FORMAT.format(aggregationStartDate)))); - - Statement statement = select("*").from(bucket.name()).where(expression); - - logger.trace("Going to query : {}", statement.toString()); - - N1qlQueryResult result = bucket.query(statement); - if (!result.finalSuccess()) { - logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors()); - return null; - } - - List rows = result.allRows(); - - if(rows.size()>1){ - String error = String.format("More than one Document found for query %s. This is really strange and should not occur. Please contact the Administrator.", statement.toString()); - logger.error(error); - throw new Exception(error); - } - - if(rows.size()==1){ - N1qlQueryRow row = rows.get(0); - try { - JsonObject jsonObject = row.value().getObject(bucket.name()); - logger.trace("JsonObject : {}", jsonObject.toString()); - return DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class); - } catch (Exception e) { - logger.warn("Unable to elaborate result for {}", row.toString()); - } - } - - return null; - } - - - public static void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception{ - Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME); - JsonObject jsonObject = JsonObject.fromJson(DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); - JsonDocument jsonDocument = JsonDocument.create(aggregationStatus.getUUID().toString(), jsonObject); - try{ - bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); - }catch (DocumentAlreadyExistsException e) { - // OK it can happen when the insert procedure were started but was interrupted - } - } -} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 1637b89..f1e9af4 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -21,6 +21,9 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.postgresql.core.Utils; +/** + * @author Luca Frosini (ISTI-CNR) + */ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { private static final String UTC_TIME_ZONE = "UTC"; @@ -219,7 +222,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } - public static List getAll() throws Exception{ + public List getAll() throws Exception{ /* * SELECT * @@ -235,7 +238,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { } - public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ + public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ /* * SELECT * @@ -246,7 +249,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { * `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000" */ - return null; + return null; } } diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index de616eb..9bb3260 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -130,6 +130,7 @@ public class AccountingAggregatorPlugin extends Plugin { Date persistStartTime = null; Date persistEndTime = null; + String recordType = null; Class usageRecordClass = null; boolean forceEarlyAggregation = false; @@ -192,14 +193,16 @@ public class AccountingAggregatorPlugin extends Plugin { } if (inputs.containsKey(RECORD_TYPE_INPUT_PARAMETER)) { - usageRecordClass = (Class) RecordUtility.getRecordClass((String) inputs.get(RECORD_TYPE_INPUT_PARAMETER)); + recordType = (String) inputs.get(RECORD_TYPE_INPUT_PARAMETER); + usageRecordClass = (Class) RecordUtility.getRecordClass(recordType); + logger.debug("Requested record type is {} which is implemented by {}", recordType, usageRecordClass); } AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate); aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation); aggregatorManager.setForceRerun(forceRerun); aggregatorManager.setForceRestart(forceRestart); - aggregatorManager.elaborate(persistStartTime, persistEndTime, usageRecordClass); + aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType); break; diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index bbcd44a..6a7c731 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -8,7 +8,7 @@ import java.util.UUID; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; +import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; @@ -62,20 +62,26 @@ public class AggregationStatus { @JsonProperty protected List aggregationStateEvents; + private static PostgreSQLConnector postgreSQLConnector; + + static { + // TODO + postgreSQLConnector = null; + } + // Needed for Jackon Unmarshalling - @SuppressWarnings("unused") - private AggregationStatus(){} + protected AggregationStatus(){} public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{ - return CouchBaseConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); + return postgreSQLConnector.getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate); } public static List getUnterminated(String recordType, AggregationType aggregationType) throws Exception{ - return CouchBaseConnector.getUnterminated(recordType, aggregationType, null, null); + return postgreSQLConnector.getUnterminated(recordType, aggregationType, null, null); } public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{ - return CouchBaseConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate); + return postgreSQLConnector.getAggregationStatus(recordType, aggregationType, aggregationStartDate); } public AggregationStatus(AggregationInfo aggregationInfo) throws Exception { @@ -115,7 +121,7 @@ public class AggregationStatus { aggregationStateEvents.add(aggregationStatusEvent); if(sync){ - CouchBaseConnector.upsertAggregationStatus(this); + postgreSQLConnector.upsertAggregationStatus(this); } } @@ -188,7 +194,7 @@ public class AggregationStatus { public void updateLastUpdateTime(boolean sync) throws Exception { this.lastUpdateTime = Utility.getUTCCalendarInstance(); if(sync){ - CouchBaseConnector.upsertAggregationStatus(this); + postgreSQLConnector.upsertAggregationStatus(this); } } diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java index ab034ed..a5379e3 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/CouchBaseConnectorTest.java @@ -6,7 +6,6 @@ import java.util.List; import org.gcube.accounting.aggregator.aggregation.AggregationInfo; import org.gcube.accounting.aggregator.aggregation.AggregationType; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; @@ -43,7 +42,7 @@ public class CouchBaseConnectorTest extends ContextTest { @Test public void insertAllInPostgreSQL() throws Exception { ContextTest.setContextByName(ROOT_DEV_SCOPE); - List aggregationStatuses = CouchBaseConnector.getAll(); + List aggregationStatuses = postgreSQLConnector.getAll(); for(AggregationStatus aggregationStatus : aggregationStatuses) { analyseAggregationStatus(aggregationStatus); } @@ -51,13 +50,13 @@ public class CouchBaseConnectorTest extends ContextTest { @Test public void getLastTest() throws Exception { - AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); + AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @Test public void getUnterminatedTest() throws Exception{ - List aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); + List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -68,7 +67,7 @@ public class CouchBaseConnectorTest extends ContextTest { Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31); - AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); + AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -77,7 +76,7 @@ public class CouchBaseConnectorTest extends ContextTest { Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1); Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30); - List aggregationStatuses = CouchBaseConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); + List aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime()); for(AggregationStatus aggregationStatus : aggregationStatuses){ logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } @@ -86,7 +85,7 @@ public class CouchBaseConnectorTest extends ContextTest { @Test public void getAggregationStatusTest() throws Exception{ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15); - AggregationStatus aggregationStatus = CouchBaseConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime()); + AggregationStatus aggregationStatus = postgreSQLConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime()); logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java index 6f04e71..2293382 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/MyTest.java @@ -19,6 +19,7 @@ import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; +import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.common.storagehub.client.dsl.ContainerType; import org.gcube.common.storagehub.client.dsl.FolderContainer; import org.gcube.common.storagehub.client.dsl.ItemContainer; @@ -26,6 +27,7 @@ import org.gcube.common.storagehub.client.dsl.ListResolver; import org.gcube.common.storagehub.client.dsl.StorageHubClient; import org.gcube.common.storagehub.model.items.Item; import org.gcube.documentstore.records.AggregatedRecord; +import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.junit.Ignore; @@ -33,8 +35,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.document.json.JsonObject; - public class MyTest { private static Logger logger = LoggerFactory.getLogger(Elaborator.class); @@ -66,17 +66,17 @@ public class MyTest { String jsonString = "{\"operationCount\":41.0,\"creationTime\":1.454284803916E12,\"consumerId\":\"wps.statisticalmanager\",\"resourceOwner\":\"wps.statisticalmanager\",\"recordType\":\"SingleStorageUsageRecord\",\"dataType\":\"STORAGE\",\"_rev\":\"1-05c467553141723f51dad0fa1aab2ce0\",\"resourceURI\":\"56aea0025caf8b5b69c0071e\",\"providerURI\":\"data.d4science.org\",\"resourceScope\":\"/d4science.research-infrastructures.eu\",\"dataVolume\":95807.0,\"scope\":\"/d4science.research-infrastructures.eu\",\"startTime\":1.454284803059E12,\"operationType\":\"CREATE\",\"endTime\":1.454284803059E12,\"id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"_id\":\"ff5f2669-0abb-45e9-99df-f401db579680\",\"operationResult\":\"SUCCESS\"};"; - JsonObject content = JsonObject.fromJson(jsonString); + ObjectNode content = (ObjectNode) DSMapper.asJsonNode(jsonString); - if(content.containsKey(USAGE_RECORD_TYPE)){ - String recordType = content.getString(USAGE_RECORD_TYPE); - content.removeKey(USAGE_RECORD_TYPE); + if(content.has(USAGE_RECORD_TYPE)){ + String recordType = content.get(USAGE_RECORD_TYPE).asText(); + content.remove(USAGE_RECORD_TYPE); content.put(Record.RECORD_TYPE, recordType); } Boolean aggregated = false; - if(content.containsKey(AggregatedRecord.CREATION_TIME)) { + if(content.has(AggregatedRecord.CREATION_TIME)) { Object object = content.get(AggregatedRecord.CREATION_TIME); if(object instanceof Double) { Double d = ((Double) object); @@ -84,7 +84,7 @@ public class MyTest { } } - if(content.containsKey(AggregatedRecord.START_TIME)) { + if(content.has(AggregatedRecord.START_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.START_TIME); if(object instanceof Double) { @@ -93,7 +93,7 @@ public class MyTest { } } - if(content.containsKey(AggregatedRecord.END_TIME)) { + if(content.has(AggregatedRecord.END_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.END_TIME); if(object instanceof Double) { @@ -102,14 +102,14 @@ public class MyTest { } } - if(content.containsKey(AggregatedRecord.OPERATION_COUNT)) { + if(content.has(AggregatedRecord.OPERATION_COUNT)) { Object object = content.get(AggregatedRecord.OPERATION_COUNT); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); } - if(content.getInt(AggregatedRecord.OPERATION_COUNT)>1) { + if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) { aggregated = true; } } @@ -118,7 +118,7 @@ public class MyTest { content.put(AggregatedRecord.AGGREGATED, true); } - String recordType = content.getString(Record.RECORD_TYPE); + String recordType = content.get(Record.RECORD_TYPE).asText(); if(!aggregated){ if(recordType.startsWith(SIMPLE)){ diff --git a/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java b/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java index 0eb4b94..74f332e 100644 --- a/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java +++ b/src/test/java/org/gcube/accounting/aggregator/recover/RecoverOriginalRecords.java @@ -13,6 +13,7 @@ import org.gcube.accounting.aggregator.aggregation.AggregatorBuffer; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; @@ -20,8 +21,6 @@ import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.couchbase.client.java.document.json.JsonObject; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -96,8 +95,8 @@ public class RecoverOriginalRecords { } protected void elaborateLine(String line) throws Exception { - JsonObject jsonObject = JsonObject.fromJson(line); - String id = jsonObject.getString(ID); + JsonNode jsonNode = DSMapper.asJsonNode(line); + String id = jsonNode.get(ID).asText(); if(uniqueRecords.containsKey(id)){ logger.trace("Duplicated Original Record with ID {}", id); Utility.printLine(duplicatedFile, line); @@ -122,8 +121,8 @@ public class RecoverOriginalRecords { List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); for (AggregatedRecord aggregatedRecord : aggregatedRecords) { String marshalled = DSMapper.marshal(aggregatedRecord); - JsonObject jsonObject = JsonObject.fromJson(marshalled); - Utility.printLine(aggregatedFile, jsonObject.toString()); + JsonNode jsonNode = DSMapper.asJsonNode(marshalled); + Utility.printLine(aggregatedFile, jsonNode.toString()); aggregated++; } }