From d5352602efecb3adb9e2f733166ad02b21f98484 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Mon, 8 Nov 2021 14:57:35 +0100 Subject: [PATCH] Implementation seems working --- .../aggregator/aggregation/Aggregator.java | 92 +++++++++---- .../aggregator/persist/DeleteDocument.java | 5 +- .../aggregator/persist/InsertDocument.java | 5 - .../persistence/PostgreSQLConnector.java | 123 +++++++++++++++--- .../plugin/AccountingAggregatorPlugin.java | 10 +- .../aggregator/utility/Utility.java | 11 +- .../AccountingAggregatorPluginTest.java | 82 ++++++++++-- 7 files changed, 254 insertions(+), 74 deletions(-) diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index af7fd6f..d2544e8 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -2,12 +2,14 @@ package org.gcube.accounting.aggregator.aggregation; import java.io.File; import java.sql.ResultSet; -import java.text.DateFormat; +import java.time.OffsetDateTime; import java.util.Calendar; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; @@ -16,6 +18,8 @@ import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; +import org.gcube.accounting.utility.postgresql.RecordToDBFields; +import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.documentstore.exception.InvalidValueException; @@ -59,32 +63,13 @@ public class Aggregator { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); - - - // TODO query - ResultSet resultSet = null; - - + PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); + ResultSet resultSet = postgreSQLConnector.getResultSetOfRecordToBeAggregated(aggregationStatus); retrieveAndAggregate(resultSet); } } - - protected ResultSet getViewResult() throws Exception { - - DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat(); - - String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate()); - String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate()); - - - // TODO query here - - return null; - - } - private static final String USAGE_RECORD_TYPE = "usageRecordType"; private static final String SINGLE = "Single"; private static final String SIMPLE = "Simple"; @@ -191,6 +176,32 @@ public class Aggregator { private static final int MAX_RETRY = 3; + + protected void addProperty(ObjectNode objectNode, String key, Object value) { + + if(value instanceof Number) { + + if(value instanceof Integer) { + objectNode.put(key, (int) value); + return; + } + + Long longValue = Long.valueOf(value.toString()); + objectNode.put(key, longValue); + return; + } + + objectNode.put(key, (String) value.toString()); + + } + + protected Calendar getCalendar(OffsetDateTime offsetDateTime) { + Calendar calendar = Calendar.getInstance(); + long epochMillis = offsetDateTime.toInstant().toEpochMilli(); + calendar.setTimeInMillis(epochMillis); + return calendar; + } + protected void retrieveAndAggregate(ResultSet resultSet) throws Exception { AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); @@ -201,16 +212,47 @@ public class Aggregator { aggregateRecordsBackupFile.delete(); malformedRecordsFile.delete(); + + AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); + String type = aggregationInfo.getRecordType(); + Class> clz = RecordUtility.getAggregatedRecordClass(type); + RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz); + + + Set requiredFields = clz.newInstance().getRequiredFields(); + malformedRecordNumber = 0; int originalRecordsCounter = 0; while (resultSet.next()) { for(int i=1; i<=MAX_RETRY; i++){ try { - ObjectNode content = objectMapper.createObjectNode(); + ObjectNode objectNode = objectMapper.createObjectNode(); - // todo set data from resultset + addProperty(objectNode, Record.RECORD_TYPE, type); - originalRecordsCounter = elaborateRow(content, aggregatorBuffer, originalRecordsCounter); + + for(String recordField : requiredFields) { + String tableField = recordToDBFields.getTableField(recordField); + + Object obj = null; + + switch (recordField) { + case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME: + OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class); + Calendar calendar = getCalendar(offsetDateTime); + obj = calendar.getTimeInMillis(); + break; + + default: + obj = resultSet.getObject(tableField); + break; + } + + + addProperty(objectNode, recordField, obj); + } + + originalRecordsCounter = elaborateRow(objectNode, aggregatorBuffer, originalRecordsCounter); TimeUnit.MILLISECONDS.sleep(3); break; }catch (RuntimeException e) { diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java index c05ca47..4f6c035 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java @@ -2,6 +2,7 @@ package org.gcube.accounting.aggregator.persist; import java.io.File; +import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.com.fasterxml.jackson.databind.JsonNode; @@ -20,7 +21,9 @@ public class DeleteDocument extends DocumentElaboration { protected void elaborateLine(String line) throws Exception { JsonNode jsonNode = DSMapper.asJsonNode(line); String id = jsonNode.get(ID).asText(); - + logger.trace("Going to delete record with id {}", id); + PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector(); + postgreSQLConnector.deleteRecord(jsonNode); } @Override diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index da795ba..8660ed3 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -109,11 +109,6 @@ public class InsertDocument extends DocumentElaboration { @Override protected void elaborateLine(String line) throws Exception { JsonNode jsonNode = analyseLine(line); - /* - * String id = jsonObject.getString(ID); - * JsonDocument jsonDocument = JsonDocument.create(id, jsonObject); - * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); - */ Record record = RecordUtility.getRecord(jsonNode.toString()); persistencePostgreSQL.insert(record); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 4b4878b..9d08140 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -6,11 +6,13 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.UUID; @@ -23,7 +25,15 @@ import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; +import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; +import org.gcube.accounting.utility.postgresql.RecordToDBFields; +import org.gcube.accounting.utility.postgresql.RecordToDBMapping; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.documentstore.records.AggregatedRecord; +import org.gcube.documentstore.records.DSMapper; +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.RecordUtility; import org.postgresql.core.Utils; /** @@ -47,6 +57,15 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { private PostgreSQLConnector() throws Exception { this.configuration = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryPostgreSQL.class); + Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); + for(String typeName : aggregatedRecords.keySet()) { + try { + Class> clz = aggregatedRecords.get(typeName); + RecordToDBMapping.addRecordToDB(clz, configuration); + } catch (Exception e) { + new RuntimeException(e); + } + } } protected Connection getConnection() throws Exception { @@ -84,8 +103,9 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { Calendar calendar = (Calendar) serializable; SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN); simpleDateFormat.setTimeZone(DEFAULT_TIME_ZONE); - String date = simpleDateFormat.format(calendar.getTime()); - return getQuotedString(date); + Date date = calendar.getTime(); + String dateString = simpleDateFormat.format(date); + return getQuotedString(dateString); } if(serializable instanceof Date) { @@ -186,7 +206,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception { - Statement statement = getConnection().createStatement(); + Connection connection = getConnection(); + Statement statement = connection.createStatement(); String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true); statement.executeUpdate(sqlCommand); sqlCommand = getInsertAggregationStateQuery(aggregationStatus); @@ -202,14 +223,21 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { // return calendar; // } - private AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { + private Date getDateFromResultSet(ResultSet resultSet, String columnLabel) throws Exception { + String dateString = resultSet.getString(columnLabel); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssX"); + Date date = dateFormat.parse(dateString); + return date; + } + + public AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception { String recordType = resultSet.getString("record_type"); String aggregationTypeString = resultSet.getString("aggregation_type"); AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString); - Date aggregationStartDate = resultSet.getDate("aggregation_start_date"); - Date aggregationEndDate = resultSet.getDate("aggregation_end_date"); + Date aggregationStartDate = getDateFromResultSet(resultSet,"aggregation_start_date"); + Date aggregationEndDate = getDateFromResultSet(resultSet,"aggregation_end_date"); AggregationInfo aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate); AggregationStatus aggregationStatus = new AggregationStatus(aggregationInfo); @@ -280,10 +308,10 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { Connection connection = getConnection(); Statement statement = connection.createStatement(); - String sqlCommand = stringBuffer.toString(); + String sqlQuery = stringBuffer.toString(); - logger.trace("Going to request the following query: {}", sqlCommand); - ResultSet resultSet = statement.executeQuery(sqlCommand); + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); AggregationStatus aggregationStatus = null; @@ -351,10 +379,10 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { Connection connection = getConnection(); Statement statement = connection.createStatement(); - String sqlCommand = stringBuffer.toString(); + String sqlQuery = stringBuffer.toString(); - logger.trace("Going to request the following query: {}", sqlCommand); - ResultSet resultSet = statement.executeQuery(sqlCommand); + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); List aggregationStatuses = new ArrayList<>(); @@ -383,10 +411,10 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { Connection connection = getConnection(); Statement statement = connection.createStatement(); - String sqlCommand = stringBuffer.toString(); + String sqlQuery = stringBuffer.toString(); - logger.trace("Going to request the following query: {}", sqlCommand); - ResultSet resultSet = statement.executeQuery(sqlCommand); + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); List aggregationStatuses = new ArrayList<>(); @@ -436,10 +464,10 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { Connection connection = getConnection(); Statement statement = connection.createStatement(); - String sqlCommand = stringBuffer.toString(); + String sqlQuery = stringBuffer.toString(); - logger.trace("Going to request the following query: {}", sqlCommand); - ResultSet resultSet = statement.executeQuery(sqlCommand); + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); AggregationStatus aggregationStatus = null; @@ -450,5 +478,64 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL { return aggregationStatus; } + + public void deleteRecord(JsonNode jsonNode) throws Exception { + Record record = DSMapper.unmarshal(Record.class, jsonNode.toString()); + Class clz = record.getClass(); + String type = RecordToDBMapping.getRecordTypeByClass(clz); + String tableName = RecordToDBFields.getKey(type); + + String id = jsonNode.get(Record.ID).asText(); + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("DELETE "); + stringBuffer.append("FROM "); + stringBuffer.append(tableName); + stringBuffer.append(" WHERE "); + stringBuffer.append("id = "); + stringBuffer.append(getValue(id)); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlCommand = stringBuffer.toString(); + logger.trace("Going to execute {}", sqlCommand); + statement.execute(sqlCommand); + + statement.close(); + connection.commit(); + + } + + public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception { + AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); + + String tableName = RecordToDBFields.getKey(aggregationInfo.getRecordType()); + + String startTimeColumnName = RecordToDBFields.getKey(AggregatedUsageRecord.START_TIME); + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("SELECT *"); + stringBuffer.append(" FROM "); + stringBuffer.append(tableName); + stringBuffer.append(" WHERE "); + stringBuffer.append(startTimeColumnName); + stringBuffer.append(" >= "); + stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate())); + stringBuffer.append(" AND "); + stringBuffer.append(startTimeColumnName); + stringBuffer.append(" < "); + stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate())); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + String sqlQuery = stringBuffer.toString(); + + logger.trace("Going to request the following query: {}", sqlQuery); + ResultSet resultSet = statement.executeQuery(sqlQuery); + + return resultSet; + } } diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index 9bb3260..db5baab 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -151,12 +151,14 @@ public class AccountingAggregatorPlugin extends Plugin { if (inputs.containsKey(AGGREGATION_START_DATE_INPUT_PARAMETER)) { String aggregationStartDateString = (String) inputs.get(AGGREGATION_START_DATE_INPUT_PARAMETER); - aggregationStartDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationStartDateString + " " + UTC); + aggregationStartDateString = aggregationStartDateString + " " + UTC; + aggregationStartDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationStartDateString); } if (inputs.containsKey(AGGREGATION_END_DATE_INPUT_PARAMETER)) { String aggregationEndDateString = (String) inputs.get(AGGREGATION_END_DATE_INPUT_PARAMETER); - aggregationEndDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationEndDateString + " " + UTC); + aggregationEndDateString = aggregationEndDateString + " " + UTC; + aggregationEndDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationEndDateString); } @@ -167,10 +169,6 @@ public class AccountingAggregatorPlugin extends Plugin { } aggregationType = AggregationType.valueOf((String) inputs.get(AGGREGATION_TYPE_INPUT_PARAMETER)); - if (inputs.containsKey(AGGREGATION_START_DATE_INPUT_PARAMETER)) { - String aggregationStartDateString = (String) inputs.get(AGGREGATION_START_DATE_INPUT_PARAMETER); - aggregationStartDate = AGGREGATION_START_END_DATE_UTC_DATE_FORMAT.parse(aggregationStartDateString + " " + UTC); - } if(inputs.containsKey(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER)){ restartFromLastAggregationDate = (boolean) inputs.get(RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER); diff --git a/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java b/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java index ef31e79..65979a9 100644 --- a/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java +++ b/src/main/java/org/gcube/accounting/aggregator/utility/Utility.java @@ -93,15 +93,10 @@ public class Utility { Date date = new Date(); persistTimeString = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format( date) + " " + persistTimeString + " " + LOCALE_DATE_FORMAT.format(date); - // Local Date Format (not UTC) - DateFormat dateFormat = new SimpleDateFormat( - AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT_PATTERN + " " - + AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT_PATTERN); - - Date persistTime = dateFormat.parse(persistTimeString); - - return persistTime; + DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm Z"); + Date ret = dateFormat.parse(persistTimeString); + return ret; } public static boolean isTimeElapsed(Calendar now, Date date) throws ParseException { diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index 403da08..1c90e27 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -13,6 +13,7 @@ import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord; import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; +import org.gcube.com.fasterxml.jackson.annotation.JsonIgnore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,10 +25,11 @@ public class AccountingAggregatorPluginTest extends ContextTest { public static final String ROOT_PROD = "/d4science.research-infrastructures.eu"; + @JsonIgnore @Test public void aggregateJob() throws Exception { - //ContextTest.setContextByName(ROOT_DEV_SCOPE); - ContextTest.setContextByName(ROOT_PROD); + ContextTest.setContextByName(ROOT_DEV_SCOPE); + // ContextTest.setContextByName(ROOT_PROD); Map inputs = new HashMap(); @@ -79,10 +81,11 @@ public class AccountingAggregatorPluginTest extends ContextTest { } + @JsonIgnore @Test public void aggregateStorageStatus() throws Exception { - //ContextTest.setContextByName(ROOT_DEV_SCOPE); - ContextTest.setContextByName(ROOT_PROD); + ContextTest.setContextByName(ROOT_DEV_SCOPE); + // ContextTest.setContextByName(ROOT_PROD); Map inputs = new HashMap(); @@ -134,11 +137,11 @@ public class AccountingAggregatorPluginTest extends ContextTest { } - + @JsonIgnore @Test public void aggregateStorageUsage() throws Exception { - //ContextTest.setContextByName(ROOT_DEV_SCOPE); - ContextTest.setContextByName(ROOT_PROD); + ContextTest.setContextByName(ROOT_DEV_SCOPE); + // ContextTest.setContextByName(ROOT_PROD); Map inputs = new HashMap(); @@ -192,13 +195,69 @@ public class AccountingAggregatorPluginTest extends ContextTest { @Test - public void aggregateService() throws Exception { + public void aggregateYearlyServiceProd() throws Exception { //ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(ROOT_PROD); Map inputs = new HashMap(); + AggregationType aggregationType = AggregationType.YEARLY; + //type aggregation + inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name()); + + inputs.put(AccountingAggregatorPlugin.ELABORATION_TYPE_INPUT_PARAMETER, ElaborationType.AGGREGATE.name()); + + + inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(0, 10)); + inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(23, 59)); + + + inputs.put(AccountingAggregatorPlugin.RECORD_TYPE_INPUT_PARAMETER, ServiceUsageRecord.class.newInstance().getRecordType()); + + inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, false); + + inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true); + inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, true); + inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true); + + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2015, Calendar.JANUARY, 1); + String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); + logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); + inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); + + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 3); + // Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); + /* + String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime()); + logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate); + inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate); + */ + + AccountingAggregatorPlugin plugin = new AccountingAggregatorPlugin(); + logger.debug("Going to launch {} with inputs {}", plugin.getName(), inputs); + + + + while(aggregationStartCalendar.before(aggregationEndCalendar)) { + plugin.launch(inputs); + //Thread.sleep(TimeUnit.MINUTES.toMillis(1)); + aggregationStartCalendar.add(aggregationType.getCalendarField(), 1); + aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); + inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); + } + + } + + // @JsonIgnore + @Test + public void aggregateService() throws Exception { + ContextTest.setContextByName(ROOT_DEV_SCOPE); + //ContextTest.setContextByName(ROOT_PROD); + + Map inputs = new HashMap(); + + AggregationType aggregationType = AggregationType.MONTHLY; //type aggregation inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name()); @@ -218,13 +277,13 @@ public class AccountingAggregatorPluginTest extends ContextTest { inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, true); inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true); - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JUNE, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.SEPTEMBER, 1); String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); - // Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1); - Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + // Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); /* String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate); @@ -256,6 +315,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { } */ + @JsonIgnore @Test public void testRecovery() throws Exception { ContextTest.setContextByName(ROOT_DEV_SCOPE);