diff --git a/pom.xml b/pom.xml
index 054ed27..f9d6520 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,15 +57,15 @@
document-store-lib-postgresql
[1.0.0, 2.0.0-SNAPSHOT)
-
- org.gcube.accounting
- accounting-analytics-persistence-postgresql
- [1.0.0, 2.0.0-SNAPSHOT)
-
+
+
+
+
+
org.gcube.accounting
accounting-analytics
- [3.0.0, 4.0.0-SNAPSHOT)
+ [4.0.0, 5.0.0-SNAPSHOT)
org.slf4j
@@ -135,4 +135,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
index bfcd68d..bec7c44 100644
--- a/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
+++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java
@@ -219,7 +219,6 @@ public class Aggregator {
Class extends AggregatedRecord, ?>> clz = RecordUtility.getAggregatedRecordClass(type);
RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz);
-
Set requiredFields = clz.newInstance().getRequiredFields();
malformedRecordNumber = 0;
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
index 89e80ab..f411fdb 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java
@@ -24,9 +24,6 @@ public abstract class DocumentElaboration {
protected static final String ID = Record.ID;
- protected static final int THRESHOLD_FOR_FIVE_PERCENT = 10000;
- protected static final int THRESHOLD_FOR_ONE_PERCENT = 100000;
-
public static final int MAX_RETRY = 7;
protected final AggregationStatus aggregationStatus;
@@ -59,13 +56,9 @@ public abstract class DocumentElaboration {
logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated);
- int percentOfNumberOfRows = (rowToBeElaborated / 10) + 1;
- if(rowToBeElaborated >= THRESHOLD_FOR_FIVE_PERCENT) {
- percentOfNumberOfRows = percentOfNumberOfRows / 2;
- }
-
- if(rowToBeElaborated >= THRESHOLD_FOR_ONE_PERCENT) {
- percentOfNumberOfRows = percentOfNumberOfRows / 5;
+ int numberOfRowsForEachRecoveryPoint = (rowToBeElaborated / 10) + 1;
+ if(numberOfRowsForEachRecoveryPoint>500) {
+ numberOfRowsForEachRecoveryPoint = 500;
}
currentlyElaborated = 0;
@@ -109,7 +102,7 @@ public abstract class DocumentElaboration {
}
++currentlyElaborated;
- if(currentlyElaborated % percentOfNumberOfRows == 0) {
+ if(currentlyElaborated % numberOfRowsForEachRecoveryPoint == 0) {
if(currentlyElaborated>=restartFrom) {
aggregationStatus.setRestartFrom(currentlyElaborated, true);
}
@@ -152,11 +145,12 @@ public abstract class DocumentElaboration {
startTime = Utility.getUTCCalendarInstance();
try {
readFile();
+ aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
}catch (Exception e) {
-
+ throw e;
+ } finally {
+ afterElaboration();
}
- afterElaboration();
- aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
}
protected abstract void elaborateLine(String line) throws Exception;
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
index 549285d..b94e855 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
@@ -113,10 +113,10 @@ public class InsertDocument extends DocumentElaboration {
aggregatorPersistenceDst.insert(record);
++count;
- if(count==100) {
- aggregatorPersistenceDst.commitAndClose();
- count = 0;
- }
+// if(count==100) {
+// aggregatorPersistenceDst.commitAndClose();
+// count = 0;
+// }
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
index 50fabe1..c91a7d9 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java
@@ -12,7 +12,6 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
-import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
@@ -29,10 +28,8 @@ import org.gcube.accounting.utility.postgresql.RecordToDBFields;
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
-import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
-import org.gcube.documentstore.records.RecordUtility;
import org.postgresql.core.Utils;
/**
@@ -49,16 +46,9 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg
protected Connection connection;
protected PostgreSQLConnector(Class> clazz) throws Exception {
+ super();
this.configuration = new AccountingPersistenceConfiguration(clazz);
- Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound();
- for(String typeName : aggregatedRecords.keySet()) {
- try {
- Class extends AggregatedRecord,?>> clz = aggregatedRecords.get(typeName);
- RecordToDBMapping.addRecordToDB(clz, configuration);
- } catch (Exception e) {
- new RuntimeException(e);
- }
- }
+ prepareConnection(configuration);
}
protected Connection getConnection() throws Exception {
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java
index 2db60dd..51cc12a 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java
@@ -1,14 +1,12 @@
package org.gcube.accounting.aggregator.persistence;
-import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
-
/**
* @author Luca Frosini (ISTI-CNR)
*/
public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceStatus {
protected PostgreSQLConnectorStatus() throws Exception {
- super(AccountingPersistenceQueryPostgreSQL.class);
+ super(AggregatorPersistenceSrc.class);
}
protected PostgreSQLConnectorStatus(Class> clazz) throws Exception {
diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
index 39037f4..cb37fe4 100644
--- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
@@ -26,16 +26,17 @@ public class PostgreSQLConnectorTest extends ContextTest {
public PostgreSQLConnectorTest() throws Exception {
aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus();
+ logger.debug("{}", aggregatorPersistenceStatus);
}
- @Ignore
+// @Ignore
@Test
public void getAggregatorPersistenceDst() throws Exception {
AggregatorPersistenceDst dst = AggregatorPersistenceFactory.getAggregatorPersistenceDst();
logger.debug("{}", dst);
}
- @Ignore
+// @Ignore
@Test
public void getAggregatorPersistenceSrc() throws Exception {
AggregatorPersistenceSrc src = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
index e431e58..8edc33a 100644
--- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
@@ -74,40 +74,139 @@ public class AccountingAggregatorPluginTest extends ContextTest {
}
}
-
+
@JsonIgnore
@Test
public void aggregateAllServiceUsageRecord() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
+// aggregateOneShot(recordType);
aggregateEverything(recordType);
}
+// @JsonIgnore
+ @Test
+ public void aggregateDailyServiceUsageRecord() throws Exception {
+ String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
+
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ AggregationType aggregationType = AggregationType.DAILY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 6);
+ Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
+ }
+
+// @JsonIgnore
+ @Test
+ public void aggregateMonthlyFirstHalf2021ServiceUsageRecord() throws Exception {
+ String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
+
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ AggregationType aggregationType = AggregationType.MONTHLY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.FEBRUARY, 1);
+ Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
+ }
+
+// @JsonIgnore
+ @Test
+ public void aggregateMonthlySecondHalf2021ServiceUsageRecord() throws Exception {
+ String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
+
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ AggregationType aggregationType = AggregationType.MONTHLY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.AUGUST, 1);
+ Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
+ }
+
+
@JsonIgnore
@Test
- public void aggregateAllStorageUsageRecord() throws Exception {
- String recordType = StorageUsageRecord.class.newInstance().getRecordType();
- aggregateEverything(recordType);
+ public void aggregateAllStorageStatusRecord() throws Exception {
+ String recordType = StorageStatusRecord.class.newInstance().getRecordType();
+// aggregateOneShot(recordType);
+// aggregateEverything(recordType);
+ }
+
+// @JsonIgnore
+ @Test
+ public void aggregateMonthlyStorageStatusRecord() throws Exception {
+ String recordType = StorageStatusRecord.class.newInstance().getRecordType();
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ AggregationType aggregationType = AggregationType.MONTHLY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.JULY, 1);
+ Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
+ }
+
+// @JsonIgnore
+ @Test
+ public void aggregateSecondHalf2022StorageStatusRecord() throws Exception {
+ String recordType = StorageStatusRecord.class.newInstance().getRecordType();
+
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ AggregationType aggregationType = AggregationType.MONTHLY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JUNE, 1);
+ Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
}
@JsonIgnore
@Test
public void aggregateAllJobUsageRecord() throws Exception {
String recordType = JobUsageRecord.class.newInstance().getRecordType();
- aggregateEverything(recordType);
- }
-
- @Test
- public void aggregateAllStorageStatusRecord() throws Exception {
- String recordType = StorageStatusRecord.class.newInstance().getRecordType();
aggregateOneShot(recordType);
- // aggregateEverything(recordType);
+// aggregateEverything(recordType);
+ }
+
+ @JsonIgnore
+ @Test
+ public void aggregateAllStorageUsageRecord() throws Exception {
+ String recordType = StorageUsageRecord.class.newInstance().getRecordType();
+ aggregateOneShot(recordType);
+// aggregateEverything(recordType);
}
public void aggregateOneShot(String recordType) throws Exception {
boolean forceRestart = true;
- boolean forceEarlyAggregation = false;
- AggregationType aggregationType = AggregationType.YEARLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
+ boolean forceEarlyAggregation = true;
+ AggregationType aggregationType = AggregationType.DAILY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14);
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
}
@@ -117,9 +216,8 @@ public class AccountingAggregatorPluginTest extends ContextTest {
boolean forceRestart = true;
boolean forceEarlyAggregation = false;
-
AggregationType aggregationType = AggregationType.YEARLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2015, Calendar.JANUARY, 1);
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2016, Calendar.JANUARY, 1);
Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JANUARY, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);