diff --git a/pom.xml b/pom.xml
index e7d1d69..f9d6520 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
org.gcube.tools
maven-parent
- 1.2.0
+ 1.1.0
org.gcube.accounting
diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
index 531d737..bac2155 100644
--- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
+++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
@@ -186,6 +186,20 @@ public class Elaborator {
}
+ public static File getOriginalRecordsBackupFile(File elaborationDirectory, AggregationInfo aggregationInfo) {
+ String recordType = aggregationInfo.getRecordType();
+ Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
+ AggregationType aggregationType = aggregationInfo.getAggregationType();
+
+ DateFormat dateFormat = aggregationType.getDateFormat();
+ String dateString = dateFormat.format(aggregationStartDate);
+ String[] splittedDate = dateString.split(AggregationType.DATE_SEPARATOR);
+
+ String backupFileName = splittedDate[splittedDate.length-1] + "-" + recordType;
+ File originalRecordsbackupFile = new File(elaborationDirectory, backupFileName + ORIGINAL_SUFFIX);
+ return originalRecordsbackupFile;
+ }
+
protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception {
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java
index 0b24cd7..87572c3 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java
@@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence;
class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst {
protected PostgreSQLConnectorStatusDst() throws Exception {
- super(AggregatorPersistenceSrc.class);
+ super(AggregatorPersistenceDst.class);
}
diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java
index c490b51..aaa5fe4 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java
@@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence;
class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc {
protected PostgreSQLConnectorStatusSrc() throws Exception {
- super(AggregatorPersistenceSrc.class);
+ super(AggregatorPersistenceDst.class);
}
}
diff --git a/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java b/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java
new file mode 100644
index 0000000..cb61338
--- /dev/null
+++ b/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java
@@ -0,0 +1,129 @@
+package org.gcube.accounting.aggregator.file;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.gcube.accounting.aggregator.ContextTest;
+import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
+import org.gcube.accounting.aggregator.aggregation.AggregationType;
+import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
+import org.gcube.accounting.aggregator.elaboration.Elaborator;
+import org.gcube.accounting.aggregator.utility.Utility;
+import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord;
+import org.gcube.com.fasterxml.jackson.databind.JsonNode;
+import org.gcube.documentstore.records.DSMapper;
+import org.gcube.documentstore.records.Record;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Luca Frosini (ISTI - CNR)
+ * This class has been used to eliminates duplicates from a file
+ * because the aggregation was started concurrently twice.
+ */
+public class EliminateDuplicates extends ContextTest {
+
+ private static Logger logger = LoggerFactory.getLogger(EliminateDuplicates.class);
+
+ public final static String ORIGINAL_NO_DUPLICATES_SUFFIX = ".original-no-duplicates.json";
+
+ protected AggregationInfo aggregationInfo;
+ protected File originalRecordsbackupFile;
+ protected File noDuplicatesRecordsbackupFile;
+
+ protected int readLines;
+ protected int discardedLines;
+ protected int uniqueLines;
+ protected Set ids;
+
+ protected File getAggregatedRecordsBackupFile() throws Exception {
+ File aggregateRecordsBackupFile = new File(originalRecordsbackupFile.getParentFile(),
+ originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ORIGINAL_NO_DUPLICATES_SUFFIX));
+ return aggregateRecordsBackupFile;
+ }
+
+ @Ignore
+ @Test
+ public void eliminateDuplicates() throws Exception {
+ ids = new HashSet<>();
+ String recordType = StorageStatusRecord.class.getSimpleName();
+ AggregationType aggregationType = AggregationType.MONTHLY;
+ Calendar start = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1);
+ Calendar end = Utility.getEndCalendarFromStartCalendar(aggregationType, start, 1);
+ Date aggregationStartDate = start.getTime();
+ Date aggregationEndDate = end.getTime();
+ aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
+ FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
+ File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
+ originalRecordsbackupFile = Elaborator.getOriginalRecordsBackupFile(elaborationDirectory, aggregationInfo);
+ noDuplicatesRecordsbackupFile = getAggregatedRecordsBackupFile();
+ noDuplicatesRecordsbackupFile.delete();
+ // readFile();
+ }
+
+ protected void elaborateLine(String line) throws Exception {
+ JsonNode jsonNode = DSMapper.asJsonNode(line);
+ String id = jsonNode.get(Record.ID).asText();
+ if(!ids.contains(id)) {
+ ids.add(id);
+ ++uniqueLines;
+ Utility.printLine(noDuplicatesRecordsbackupFile, line);
+ }else {
+ logger.trace("Record with id {} was already found, it will be discarded.", id);
+ ++discardedLines;
+ }
+
+ }
+
+
+ protected void readFile() throws Exception {
+ FileInputStream fstream = null;
+ DataInputStream in = null;
+ BufferedReader br = null;
+ try {
+ // Open the file that is the first // command line parameter
+ fstream = new FileInputStream(originalRecordsbackupFile);
+ // Get the object of DataInputStream
+ in = new DataInputStream(fstream);
+ br = new BufferedReader(new InputStreamReader(in));
+
+ readLines = 0;
+ discardedLines = 0;
+ uniqueLines = 0;
+
+ String line;
+ // Read File Line By Line
+ while((line = br.readLine()) != null) {
+ elaborateLine(line);
+ ++readLines;
+ }
+
+ logger.info("Original records are {}. Unique records are {}. Discarded duplicates records are {}", readLines, uniqueLines, discardedLines);
+
+ Assert.assertTrue(readLines == (uniqueLines+discardedLines));
+
+ } finally {
+ if(br != null) {
+ br.close();
+ }
+ if(in != null) {
+ in.close();
+ }
+ if(fstream != null) {
+ fstream.close();
+ }
+
+ }
+
+ }
+}
diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
index cb37fe4..c00ba3d 100644
--- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java
@@ -184,18 +184,18 @@ public class PostgreSQLConnectorTest extends ContextTest {
}
@Ignore
-// @Test
+ @Test
public void moveStatusFromSrcToDst() throws Exception {
- ContextTest.setContextByName(GCUBE);
+ ContextTest.setContextByName(ROOT_PROD);
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
List aggregationStatuses = apsSrc.getAll();
for(AggregationStatus aggregationStatus : aggregationStatuses) {
- analyseAggregationStatus(aggregationStatus);
+// analyseAggregationStatus(aggregationStatus);
}
}
- @Ignore
-// @Test
+// @Ignore
+ @Test
public void testAggregatorPersistenceStatusSrcAndDst() throws Exception {
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
logger.debug("{}", apsSrc);
diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
index f1d7a6c..56827f6 100644
--- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
+++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java
@@ -2,7 +2,9 @@ package org.gcube.accounting.aggregator.plugin;
import java.util.Calendar;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.ContextTest;
@@ -77,103 +79,37 @@ public class AccountingAggregatorPluginTest extends ContextTest {
}
-// @Ignore
+ @Ignore
+ @Test
+ public void aggregateRecordDaily() throws Exception {
+ Set types = new HashSet<>();
+ types.add(ServiceUsageRecord.class.newInstance().getRecordType());
+ types.add(StorageUsageRecord.class.newInstance().getRecordType());
+ types.add(JobUsageRecord.class.newInstance().getRecordType());
+
+ boolean forceRestart = true;
+ boolean forceEarlyAggregation = true;
+
+ for(String recordType : types) {
+ AggregationType aggregationType = AggregationType.DAILY;
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
+ Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 1);
+ while (aggregationStartCalendar.before(end)) {
+ Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
+ aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
+ aggregationStartCalendar = Calendar.getInstance();
+ aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
+ }
+ }
+ }
+
+ @Ignore
@Test
public void aggregateAllServiceUsageRecord() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
- aggregateOneShot(recordType);
+// aggregateOneShot(recordType);
// aggregateEverything(recordType);
}
-
- @Test
- public void aggregateServiceUsageRecordDaily() throws Exception {
- String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
-
- boolean forceRestart = true;
- boolean forceEarlyAggregation = true;
-
- AggregationType aggregationType = AggregationType.DAILY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 26);
- Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27);
- while (aggregationStartCalendar.before(end)) {
- Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
- aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
- aggregationStartCalendar = Calendar.getInstance();
- aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
- }
- }
-
- @Test
- public void aggregateMonthlySecondHalf2022ServiceUsageRecord() throws Exception {
- String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
-
- boolean forceRestart = true;
- boolean forceEarlyAggregation = true;
-
- AggregationType aggregationType = AggregationType.MONTHLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.DECEMBER, 1);
- Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1);
- while (aggregationStartCalendar.before(end)) {
- Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
- aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
- aggregationStartCalendar = Calendar.getInstance();
- aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
- }
- }
-
- @Test
- public void aggregateMonthlyFirstHalf2023ServiceUsageRecord() throws Exception {
- String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
-
- boolean forceRestart = true;
- boolean forceEarlyAggregation = true;
-
- AggregationType aggregationType = AggregationType.MONTHLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.SEPTEMBER, 1);
- Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.OCTOBER, 1);
- while (aggregationStartCalendar.before(end)) {
- Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
- aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
- aggregationStartCalendar = Calendar.getInstance();
- aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
- }
- }
-
- @Test
- public void aggregateFirstQuarter2022StorageStatusRecord() throws Exception {
- String recordType = StorageStatusRecord.class.newInstance().getRecordType();
-
- boolean forceRestart = true;
- boolean forceEarlyAggregation = true;
-
- AggregationType aggregationType = AggregationType.MONTHLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1);
- Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1);
- while (aggregationStartCalendar.before(end)) {
- Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
- aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
- aggregationStartCalendar = Calendar.getInstance();
- aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
- }
- }
-
- @Test
- public void aggregateSecondQuarter2022StorageStatusRecord() throws Exception {
- String recordType = StorageStatusRecord.class.newInstance().getRecordType();
-
- boolean forceRestart = true;
- boolean forceEarlyAggregation = true;
-
- AggregationType aggregationType = AggregationType.MONTHLY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.MAY, 1);
- Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1);
- while (aggregationStartCalendar.before(end)) {
- Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
- aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
- aggregationStartCalendar = Calendar.getInstance();
- aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
- }
- }
@Ignore
@Test
@@ -183,19 +119,19 @@ public class AccountingAggregatorPluginTest extends ContextTest {
// aggregateEverything(recordType);
}
-// @Ignore
+ @Ignore
@Test
public void aggregateAllJobUsageRecord() throws Exception {
String recordType = JobUsageRecord.class.newInstance().getRecordType();
- aggregateOneShot(recordType);
+// aggregateOneShot(recordType);
// aggregateEverything(recordType);
}
-// @Ignore
+ @Ignore
@Test
public void aggregateAllStorageUsageRecord() throws Exception {
String recordType = StorageUsageRecord.class.newInstance().getRecordType();
- aggregateOneShot(recordType);
+// aggregateOneShot(recordType);
// aggregateEverything(recordType);
}
@@ -204,7 +140,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.DAILY;
- Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27);
+ Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
}
@@ -237,7 +173,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
aggregationType = AggregationType.DAILY;
aggregationStartCalendar = end;
- end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14);
+ end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false);