From 210361a0ff0ad7bd54f0205828e7360547c446ea Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Fri, 16 Feb 2024 17:06:49 +0100 Subject: [PATCH] Fixing plugin --- pom.xml | 14 +- .../aggregator/aggregation/Aggregator.java | 1 - .../persist/DocumentElaboration.java | 22 ++- .../aggregator/persist/InsertDocument.java | 8 +- .../persistence/PostgreSQLConnector.java | 14 +- .../PostgreSQLConnectorStatus.java | 4 +- .../persistence/PostgreSQLConnectorTest.java | 5 +- .../AccountingAggregatorPluginTest.java | 130 +++++++++++++++--- 8 files changed, 139 insertions(+), 59 deletions(-) diff --git a/pom.xml b/pom.xml index 054ed27..f9d6520 100644 --- a/pom.xml +++ b/pom.xml @@ -57,15 +57,15 @@ document-store-lib-postgresql [1.0.0, 2.0.0-SNAPSHOT) - - org.gcube.accounting - accounting-analytics-persistence-postgresql - [1.0.0, 2.0.0-SNAPSHOT) - + + + + + org.gcube.accounting accounting-analytics - [3.0.0, 4.0.0-SNAPSHOT) + [4.0.0, 5.0.0-SNAPSHOT) org.slf4j @@ -135,4 +135,4 @@ - \ No newline at end of file + diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java index bfcd68d..bec7c44 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java @@ -219,7 +219,6 @@ public class Aggregator { Class> clz = RecordUtility.getAggregatedRecordClass(type); RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz); - Set requiredFields = clz.newInstance().getRequiredFields(); malformedRecordNumber = 0; diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java index 89e80ab..f411fdb 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java @@ -24,9 +24,6 @@ public abstract class DocumentElaboration { protected static final String ID = Record.ID; - protected static final int THRESHOLD_FOR_FIVE_PERCENT = 10000; - protected static final int THRESHOLD_FOR_ONE_PERCENT = 100000; - public static final int MAX_RETRY = 7; protected final AggregationStatus aggregationStatus; @@ -59,13 +56,9 @@ public abstract class DocumentElaboration { logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated); - int percentOfNumberOfRows = (rowToBeElaborated / 10) + 1; - if(rowToBeElaborated >= THRESHOLD_FOR_FIVE_PERCENT) { - percentOfNumberOfRows = percentOfNumberOfRows / 2; - } - - if(rowToBeElaborated >= THRESHOLD_FOR_ONE_PERCENT) { - percentOfNumberOfRows = percentOfNumberOfRows / 5; + int numberOfRowsForEachRecoveryPoint = (rowToBeElaborated / 10) + 1; + if(numberOfRowsForEachRecoveryPoint>500) { + numberOfRowsForEachRecoveryPoint = 500; } currentlyElaborated = 0; @@ -109,7 +102,7 @@ public abstract class DocumentElaboration { } ++currentlyElaborated; - if(currentlyElaborated % percentOfNumberOfRows == 0) { + if(currentlyElaborated % numberOfRowsForEachRecoveryPoint == 0) { if(currentlyElaborated>=restartFrom) { aggregationStatus.setRestartFrom(currentlyElaborated, true); } @@ -152,11 +145,12 @@ public abstract class DocumentElaboration { startTime = Utility.getUTCCalendarInstance(); try { readFile(); + aggregationStatus.setAggregationState(finalAggregationState, startTime, true); }catch (Exception e) { - + throw e; + } finally { + afterElaboration(); } - afterElaboration(); - aggregationStatus.setAggregationState(finalAggregationState, startTime, true); } protected abstract void elaborateLine(String line) throws Exception; diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index 549285d..b94e855 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -113,10 +113,10 @@ public class InsertDocument extends DocumentElaboration { aggregatorPersistenceDst.insert(record); ++count; - if(count==100) { - aggregatorPersistenceDst.commitAndClose(); - count = 0; - } +// if(count==100) { +// aggregatorPersistenceDst.commitAndClose(); +// count = 0; +// } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index 50fabe1..c91a7d9 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -12,7 +12,6 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.TimeZone; import java.util.UUID; @@ -29,10 +28,8 @@ import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.documentstore.persistence.PersistencePostgreSQL; -import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; -import org.gcube.documentstore.records.RecordUtility; import org.postgresql.core.Utils; /** @@ -49,16 +46,9 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg protected Connection connection; protected PostgreSQLConnector(Class clazz) throws Exception { + super(); this.configuration = new AccountingPersistenceConfiguration(clazz); - Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); - for(String typeName : aggregatedRecords.keySet()) { - try { - Class> clz = aggregatedRecords.get(typeName); - RecordToDBMapping.addRecordToDB(clz, configuration); - } catch (Exception e) { - new RuntimeException(e); - } - } + prepareConnection(configuration); } protected Connection getConnection() throws Exception { diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java index 2db60dd..51cc12a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java @@ -1,14 +1,12 @@ package org.gcube.accounting.aggregator.persistence; -import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; - /** * @author Luca Frosini (ISTI-CNR) */ public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceStatus { protected PostgreSQLConnectorStatus() throws Exception { - super(AccountingPersistenceQueryPostgreSQL.class); + super(AggregatorPersistenceSrc.class); } protected PostgreSQLConnectorStatus(Class clazz) throws Exception { diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java index 39037f4..cb37fe4 100644 --- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java @@ -26,16 +26,17 @@ public class PostgreSQLConnectorTest extends ContextTest { public PostgreSQLConnectorTest() throws Exception { aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus(); + logger.debug("{}", aggregatorPersistenceStatus); } - @Ignore +// @Ignore @Test public void getAggregatorPersistenceDst() throws Exception { AggregatorPersistenceDst dst = AggregatorPersistenceFactory.getAggregatorPersistenceDst(); logger.debug("{}", dst); } - @Ignore +// @Ignore @Test public void getAggregatorPersistenceSrc() throws Exception { AggregatorPersistenceSrc src = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index e431e58..8edc33a 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -74,40 +74,139 @@ public class AccountingAggregatorPluginTest extends ContextTest { } } - + @JsonIgnore @Test public void aggregateAllServiceUsageRecord() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); +// aggregateOneShot(recordType); aggregateEverything(recordType); } +// @JsonIgnore + @Test + public void aggregateDailyServiceUsageRecord() throws Exception { + String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.DAILY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 6); + Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + +// @JsonIgnore + @Test + public void aggregateMonthlyFirstHalf2021ServiceUsageRecord() throws Exception { + String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.FEBRUARY, 1); + Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + +// @JsonIgnore + @Test + public void aggregateMonthlySecondHalf2021ServiceUsageRecord() throws Exception { + String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.AUGUST, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + + @JsonIgnore @Test - public void aggregateAllStorageUsageRecord() throws Exception { - String recordType = StorageUsageRecord.class.newInstance().getRecordType(); - aggregateEverything(recordType); + public void aggregateAllStorageStatusRecord() throws Exception { + String recordType = StorageStatusRecord.class.newInstance().getRecordType(); +// aggregateOneShot(recordType); +// aggregateEverything(recordType); + } + +// @JsonIgnore + @Test + public void aggregateMonthlyStorageStatusRecord() throws Exception { + String recordType = StorageStatusRecord.class.newInstance().getRecordType(); + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + +// @JsonIgnore + @Test + public void aggregateSecondHalf2022StorageStatusRecord() throws Exception { + String recordType = StorageStatusRecord.class.newInstance().getRecordType(); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JUNE, 1); + Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } } @JsonIgnore @Test public void aggregateAllJobUsageRecord() throws Exception { String recordType = JobUsageRecord.class.newInstance().getRecordType(); - aggregateEverything(recordType); - } - - @Test - public void aggregateAllStorageStatusRecord() throws Exception { - String recordType = StorageStatusRecord.class.newInstance().getRecordType(); aggregateOneShot(recordType); - // aggregateEverything(recordType); +// aggregateEverything(recordType); + } + + @JsonIgnore + @Test + public void aggregateAllStorageUsageRecord() throws Exception { + String recordType = StorageUsageRecord.class.newInstance().getRecordType(); + aggregateOneShot(recordType); +// aggregateEverything(recordType); } public void aggregateOneShot(String recordType) throws Exception { boolean forceRestart = true; - boolean forceEarlyAggregation = false; - AggregationType aggregationType = AggregationType.YEARLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1); + boolean forceEarlyAggregation = true; + AggregationType aggregationType = AggregationType.DAILY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation); } @@ -117,9 +216,8 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceRestart = true; boolean forceEarlyAggregation = false; - AggregationType aggregationType = AggregationType.YEARLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2015, Calendar.JANUARY, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2016, Calendar.JANUARY, 1); Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JANUARY, 1); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);