From 58529a17cad9d8745836ebb135d2a4659887fff8 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Thu, 29 Feb 2024 15:28:00 +0100 Subject: [PATCH] Improved code --- pom.xml | 2 +- .../aggregator/elaboration/Elaborator.java | 14 ++ .../PostgreSQLConnectorStatusDst.java | 2 +- .../PostgreSQLConnectorStatusSrc.java | 2 +- .../aggregator/file/EliminateDuplicates.java | 129 +++++++++++++++++ .../persistence/PostgreSQLConnectorTest.java | 10 +- .../AccountingAggregatorPluginTest.java | 132 +++++------------- 7 files changed, 185 insertions(+), 106 deletions(-) create mode 100644 src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java diff --git a/pom.xml b/pom.xml index e7d1d69..f9d6520 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.gcube.tools maven-parent - 1.2.0 + 1.1.0 org.gcube.accounting diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java index 531d737..bac2155 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java @@ -186,6 +186,20 @@ public class Elaborator { } + public static File getOriginalRecordsBackupFile(File elaborationDirectory, AggregationInfo aggregationInfo) { + String recordType = aggregationInfo.getRecordType(); + Date aggregationStartDate = aggregationInfo.getAggregationStartDate(); + AggregationType aggregationType = aggregationInfo.getAggregationType(); + + DateFormat dateFormat = aggregationType.getDateFormat(); + String dateString = dateFormat.format(aggregationStartDate); + String[] splittedDate = dateString.split(AggregationType.DATE_SEPARATOR); + + String backupFileName = splittedDate[splittedDate.length-1] + "-" + recordType; + File originalRecordsbackupFile = new File(elaborationDirectory, backupFileName + ORIGINAL_SUFFIX); + return originalRecordsbackupFile; + } + protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception { AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); Date aggregationStartDate = aggregationInfo.getAggregationStartDate(); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java index 0b24cd7..87572c3 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java @@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence; class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst { protected PostgreSQLConnectorStatusDst() throws Exception { - super(AggregatorPersistenceSrc.class); + super(AggregatorPersistenceDst.class); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java index c490b51..aaa5fe4 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java @@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence; class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc { protected PostgreSQLConnectorStatusSrc() throws Exception { - super(AggregatorPersistenceSrc.class); + super(AggregatorPersistenceDst.class); } } diff --git a/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java b/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java new file mode 100644 index 0000000..cb61338 --- /dev/null +++ b/src/test/java/org/gcube/accounting/aggregator/file/EliminateDuplicates.java @@ -0,0 +1,129 @@ +package org.gcube.accounting.aggregator.file; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.Calendar; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import org.gcube.accounting.aggregator.ContextTest; +import org.gcube.accounting.aggregator.aggregation.AggregationInfo; +import org.gcube.accounting.aggregator.aggregation.AggregationType; +import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure; +import org.gcube.accounting.aggregator.elaboration.Elaborator; +import org.gcube.accounting.aggregator.utility.Utility; +import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord; +import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.documentstore.records.DSMapper; +import org.gcube.documentstore.records.Record; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Luca Frosini (ISTI - CNR) + * This class has been used to eliminates duplicates from a file + * because the aggregation was started concurrently twice. + */ +public class EliminateDuplicates extends ContextTest { + + private static Logger logger = LoggerFactory.getLogger(EliminateDuplicates.class); + + public final static String ORIGINAL_NO_DUPLICATES_SUFFIX = ".original-no-duplicates.json"; + + protected AggregationInfo aggregationInfo; + protected File originalRecordsbackupFile; + protected File noDuplicatesRecordsbackupFile; + + protected int readLines; + protected int discardedLines; + protected int uniqueLines; + protected Set ids; + + protected File getAggregatedRecordsBackupFile() throws Exception { + File aggregateRecordsBackupFile = new File(originalRecordsbackupFile.getParentFile(), + originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ORIGINAL_NO_DUPLICATES_SUFFIX)); + return aggregateRecordsBackupFile; + } + + @Ignore + @Test + public void eliminateDuplicates() throws Exception { + ids = new HashSet<>(); + String recordType = StorageStatusRecord.class.getSimpleName(); + AggregationType aggregationType = AggregationType.MONTHLY; + Calendar start = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1); + Calendar end = Utility.getEndCalendarFromStartCalendar(aggregationType, start, 1); + Date aggregationStartDate = start.getTime(); + Date aggregationEndDate = end.getTime(); + aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate); + FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure(); + File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate); + originalRecordsbackupFile = Elaborator.getOriginalRecordsBackupFile(elaborationDirectory, aggregationInfo); + noDuplicatesRecordsbackupFile = getAggregatedRecordsBackupFile(); + noDuplicatesRecordsbackupFile.delete(); + // readFile(); + } + + protected void elaborateLine(String line) throws Exception { + JsonNode jsonNode = DSMapper.asJsonNode(line); + String id = jsonNode.get(Record.ID).asText(); + if(!ids.contains(id)) { + ids.add(id); + ++uniqueLines; + Utility.printLine(noDuplicatesRecordsbackupFile, line); + }else { + logger.trace("Record with id {} was already found, it will be discarded.", id); + ++discardedLines; + } + + } + + + protected void readFile() throws Exception { + FileInputStream fstream = null; + DataInputStream in = null; + BufferedReader br = null; + try { + // Open the file that is the first // command line parameter + fstream = new FileInputStream(originalRecordsbackupFile); + // Get the object of DataInputStream + in = new DataInputStream(fstream); + br = new BufferedReader(new InputStreamReader(in)); + + readLines = 0; + discardedLines = 0; + uniqueLines = 0; + + String line; + // Read File Line By Line + while((line = br.readLine()) != null) { + elaborateLine(line); + ++readLines; + } + + logger.info("Original records are {}. Unique records are {}. Discarded duplicates records are {}", readLines, uniqueLines, discardedLines); + + Assert.assertTrue(readLines == (uniqueLines+discardedLines)); + + } finally { + if(br != null) { + br.close(); + } + if(in != null) { + in.close(); + } + if(fstream != null) { + fstream.close(); + } + + } + + } +} diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java index cb37fe4..c00ba3d 100644 --- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java @@ -184,18 +184,18 @@ public class PostgreSQLConnectorTest extends ContextTest { } @Ignore -// @Test + @Test public void moveStatusFromSrcToDst() throws Exception { - ContextTest.setContextByName(GCUBE); + ContextTest.setContextByName(ROOT_PROD); AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); List aggregationStatuses = apsSrc.getAll(); for(AggregationStatus aggregationStatus : aggregationStatuses) { - analyseAggregationStatus(aggregationStatus); +// analyseAggregationStatus(aggregationStatus); } } - @Ignore -// @Test +// @Ignore + @Test public void testAggregatorPersistenceStatusSrcAndDst() throws Exception { AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); logger.debug("{}", apsSrc); diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index f1d7a6c..56827f6 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -2,7 +2,9 @@ package org.gcube.accounting.aggregator.plugin; import java.util.Calendar; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.ContextTest; @@ -77,103 +79,37 @@ public class AccountingAggregatorPluginTest extends ContextTest { } -// @Ignore + @Ignore + @Test + public void aggregateRecordDaily() throws Exception { + Set types = new HashSet<>(); + types.add(ServiceUsageRecord.class.newInstance().getRecordType()); + types.add(StorageUsageRecord.class.newInstance().getRecordType()); + types.add(JobUsageRecord.class.newInstance().getRecordType()); + + boolean forceRestart = true; + boolean forceEarlyAggregation = true; + + for(String recordType : types) { + AggregationType aggregationType = AggregationType.DAILY; + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29); + Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 1); + while (aggregationStartCalendar.before(end)) { + Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); + aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); + aggregationStartCalendar = Calendar.getInstance(); + aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); + } + } + } + + @Ignore @Test public void aggregateAllServiceUsageRecord() throws Exception { String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); - aggregateOneShot(recordType); +// aggregateOneShot(recordType); // aggregateEverything(recordType); } - - @Test - public void aggregateServiceUsageRecordDaily() throws Exception { - String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); - - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.DAILY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 26); - Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } - - @Test - public void aggregateMonthlySecondHalf2022ServiceUsageRecord() throws Exception { - String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); - - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.DECEMBER, 1); - Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } - - @Test - public void aggregateMonthlyFirstHalf2023ServiceUsageRecord() throws Exception { - String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); - - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.SEPTEMBER, 1); - Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.OCTOBER, 1); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } - - @Test - public void aggregateFirstQuarter2022StorageStatusRecord() throws Exception { - String recordType = StorageStatusRecord.class.newInstance().getRecordType(); - - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1); - Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } - - @Test - public void aggregateSecondQuarter2022StorageStatusRecord() throws Exception { - String recordType = StorageStatusRecord.class.newInstance().getRecordType(); - - boolean forceRestart = true; - boolean forceEarlyAggregation = true; - - AggregationType aggregationType = AggregationType.MONTHLY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.MAY, 1); - Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1); - while (aggregationStartCalendar.before(end)) { - Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); - aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); - aggregationStartCalendar = Calendar.getInstance(); - aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis()); - } - } @Ignore @Test @@ -183,19 +119,19 @@ public class AccountingAggregatorPluginTest extends ContextTest { // aggregateEverything(recordType); } -// @Ignore + @Ignore @Test public void aggregateAllJobUsageRecord() throws Exception { String recordType = JobUsageRecord.class.newInstance().getRecordType(); - aggregateOneShot(recordType); +// aggregateOneShot(recordType); // aggregateEverything(recordType); } -// @Ignore + @Ignore @Test public void aggregateAllStorageUsageRecord() throws Exception { String recordType = StorageUsageRecord.class.newInstance().getRecordType(); - aggregateOneShot(recordType); +// aggregateOneShot(recordType); // aggregateEverything(recordType); } @@ -204,7 +140,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { boolean forceRestart = true; boolean forceEarlyAggregation = true; AggregationType aggregationType = AggregationType.DAILY; - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); } @@ -237,7 +173,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { aggregationType = AggregationType.DAILY; aggregationStartCalendar = end; - end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); + end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29); while (aggregationStartCalendar.before(end)) { Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false);