Improved code

This commit is contained in:
Luca Frosini 2024-02-29 15:28:00 +01:00
parent e7759dd97f
commit 58529a17ca
7 changed files with 185 additions and 106 deletions

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.gcube.tools</groupId> <groupId>org.gcube.tools</groupId>
<artifactId>maven-parent</artifactId> <artifactId>maven-parent</artifactId>
<version>1.2.0</version> <version>1.1.0</version>
</parent> </parent>
<groupId>org.gcube.accounting</groupId> <groupId>org.gcube.accounting</groupId>

View File

@ -186,6 +186,20 @@ public class Elaborator {
} }
public static File getOriginalRecordsBackupFile(File elaborationDirectory, AggregationInfo aggregationInfo) {
String recordType = aggregationInfo.getRecordType();
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
AggregationType aggregationType = aggregationInfo.getAggregationType();
DateFormat dateFormat = aggregationType.getDateFormat();
String dateString = dateFormat.format(aggregationStartDate);
String[] splittedDate = dateString.split(AggregationType.DATE_SEPARATOR);
String backupFileName = splittedDate[splittedDate.length-1] + "-" + recordType;
File originalRecordsbackupFile = new File(elaborationDirectory, backupFileName + ORIGINAL_SUFFIX);
return originalRecordsbackupFile;
}
protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception { protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception {
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
Date aggregationStartDate = aggregationInfo.getAggregationStartDate(); Date aggregationStartDate = aggregationInfo.getAggregationStartDate();

View File

@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence;
class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst { class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst {
protected PostgreSQLConnectorStatusDst() throws Exception { protected PostgreSQLConnectorStatusDst() throws Exception {
super(AggregatorPersistenceSrc.class); super(AggregatorPersistenceDst.class);
} }

View File

@ -6,7 +6,7 @@ package org.gcube.accounting.aggregator.persistence;
class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc { class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc {
protected PostgreSQLConnectorStatusSrc() throws Exception { protected PostgreSQLConnectorStatusSrc() throws Exception {
super(AggregatorPersistenceSrc.class); super(AggregatorPersistenceDst.class);
} }
} }

View File

@ -0,0 +1,129 @@
package org.gcube.accounting.aggregator.file;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import org.gcube.accounting.aggregator.ContextTest;
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
import org.gcube.accounting.aggregator.elaboration.Elaborator;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
* This class has been used to eliminates duplicates from a file
* because the aggregation was started concurrently twice.
*/
public class EliminateDuplicates extends ContextTest {
private static Logger logger = LoggerFactory.getLogger(EliminateDuplicates.class);
public final static String ORIGINAL_NO_DUPLICATES_SUFFIX = ".original-no-duplicates.json";
protected AggregationInfo aggregationInfo;
protected File originalRecordsbackupFile;
protected File noDuplicatesRecordsbackupFile;
protected int readLines;
protected int discardedLines;
protected int uniqueLines;
protected Set<String> ids;
protected File getAggregatedRecordsBackupFile() throws Exception {
File aggregateRecordsBackupFile = new File(originalRecordsbackupFile.getParentFile(),
originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ORIGINAL_NO_DUPLICATES_SUFFIX));
return aggregateRecordsBackupFile;
}
@Ignore
@Test
public void eliminateDuplicates() throws Exception {
ids = new HashSet<>();
String recordType = StorageStatusRecord.class.getSimpleName();
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar start = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1);
Calendar end = Utility.getEndCalendarFromStartCalendar(aggregationType, start, 1);
Date aggregationStartDate = start.getTime();
Date aggregationEndDate = end.getTime();
aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
originalRecordsbackupFile = Elaborator.getOriginalRecordsBackupFile(elaborationDirectory, aggregationInfo);
noDuplicatesRecordsbackupFile = getAggregatedRecordsBackupFile();
noDuplicatesRecordsbackupFile.delete();
// readFile();
}
protected void elaborateLine(String line) throws Exception {
JsonNode jsonNode = DSMapper.asJsonNode(line);
String id = jsonNode.get(Record.ID).asText();
if(!ids.contains(id)) {
ids.add(id);
++uniqueLines;
Utility.printLine(noDuplicatesRecordsbackupFile, line);
}else {
logger.trace("Record with id {} was already found, it will be discarded.", id);
++discardedLines;
}
}
protected void readFile() throws Exception {
FileInputStream fstream = null;
DataInputStream in = null;
BufferedReader br = null;
try {
// Open the file that is the first // command line parameter
fstream = new FileInputStream(originalRecordsbackupFile);
// Get the object of DataInputStream
in = new DataInputStream(fstream);
br = new BufferedReader(new InputStreamReader(in));
readLines = 0;
discardedLines = 0;
uniqueLines = 0;
String line;
// Read File Line By Line
while((line = br.readLine()) != null) {
elaborateLine(line);
++readLines;
}
logger.info("Original records are {}. Unique records are {}. Discarded duplicates records are {}", readLines, uniqueLines, discardedLines);
Assert.assertTrue(readLines == (uniqueLines+discardedLines));
} finally {
if(br != null) {
br.close();
}
if(in != null) {
in.close();
}
if(fstream != null) {
fstream.close();
}
}
}
}

View File

@ -184,18 +184,18 @@ public class PostgreSQLConnectorTest extends ContextTest {
} }
@Ignore @Ignore
// @Test @Test
public void moveStatusFromSrcToDst() throws Exception { public void moveStatusFromSrcToDst() throws Exception {
ContextTest.setContextByName(GCUBE); ContextTest.setContextByName(ROOT_PROD);
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
List<AggregationStatus> aggregationStatuses = apsSrc.getAll(); List<AggregationStatus> aggregationStatuses = apsSrc.getAll();
for(AggregationStatus aggregationStatus : aggregationStatuses) { for(AggregationStatus aggregationStatus : aggregationStatuses) {
analyseAggregationStatus(aggregationStatus); // analyseAggregationStatus(aggregationStatus);
} }
} }
@Ignore // @Ignore
// @Test @Test
public void testAggregatorPersistenceStatusSrcAndDst() throws Exception { public void testAggregatorPersistenceStatusSrcAndDst() throws Exception {
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
logger.debug("{}", apsSrc); logger.debug("{}", apsSrc);

View File

@ -2,7 +2,9 @@ package org.gcube.accounting.aggregator.plugin;
import java.util.Calendar; import java.util.Calendar;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.ContextTest;
@ -77,103 +79,37 @@ public class AccountingAggregatorPluginTest extends ContextTest {
} }
// @Ignore @Ignore
@Test
public void aggregateRecordDaily() throws Exception {
Set<String> types = new HashSet<>();
types.add(ServiceUsageRecord.class.newInstance().getRecordType());
types.add(StorageUsageRecord.class.newInstance().getRecordType());
types.add(JobUsageRecord.class.newInstance().getRecordType());
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
for(String recordType : types) {
AggregationType aggregationType = AggregationType.DAILY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
}
@Ignore
@Test @Test
public void aggregateAllServiceUsageRecord() throws Exception { public void aggregateAllServiceUsageRecord() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
aggregateOneShot(recordType); // aggregateOneShot(recordType);
// aggregateEverything(recordType); // aggregateEverything(recordType);
} }
@Test
public void aggregateServiceUsageRecordDaily() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.DAILY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 26);
Calendar end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
@Test
public void aggregateMonthlySecondHalf2022ServiceUsageRecord() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.DECEMBER, 1);
Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.JANUARY, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
@Test
public void aggregateMonthlyFirstHalf2023ServiceUsageRecord() throws Exception {
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.SEPTEMBER, 1);
Calendar end = Utility.getAggregationStartCalendar(2023, Calendar.OCTOBER, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
@Test
public void aggregateFirstQuarter2022StorageStatusRecord() throws Exception {
String recordType = StorageStatusRecord.class.newInstance().getRecordType();
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1);
Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.FEBRUARY, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
@Test
public void aggregateSecondQuarter2022StorageStatusRecord() throws Exception {
String recordType = StorageStatusRecord.class.newInstance().getRecordType();
boolean forceRestart = true;
boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2022, Calendar.MAY, 1);
Calendar end = Utility.getAggregationStartCalendar(2022, Calendar.JULY, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
}
@Ignore @Ignore
@Test @Test
@ -183,19 +119,19 @@ public class AccountingAggregatorPluginTest extends ContextTest {
// aggregateEverything(recordType); // aggregateEverything(recordType);
} }
// @Ignore @Ignore
@Test @Test
public void aggregateAllJobUsageRecord() throws Exception { public void aggregateAllJobUsageRecord() throws Exception {
String recordType = JobUsageRecord.class.newInstance().getRecordType(); String recordType = JobUsageRecord.class.newInstance().getRecordType();
aggregateOneShot(recordType); // aggregateOneShot(recordType);
// aggregateEverything(recordType); // aggregateEverything(recordType);
} }
// @Ignore @Ignore
@Test @Test
public void aggregateAllStorageUsageRecord() throws Exception { public void aggregateAllStorageUsageRecord() throws Exception {
String recordType = StorageUsageRecord.class.newInstance().getRecordType(); String recordType = StorageUsageRecord.class.newInstance().getRecordType();
aggregateOneShot(recordType); // aggregateOneShot(recordType);
// aggregateEverything(recordType); // aggregateEverything(recordType);
} }
@ -204,7 +140,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
boolean forceRestart = true; boolean forceRestart = true;
boolean forceEarlyAggregation = true; boolean forceEarlyAggregation = true;
AggregationType aggregationType = AggregationType.DAILY; AggregationType aggregationType = AggregationType.DAILY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 27); Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
} }
@ -237,7 +173,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
aggregationType = AggregationType.DAILY; aggregationType = AggregationType.DAILY;
aggregationStartCalendar = end; aggregationStartCalendar = end;
end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 14); end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
while (aggregationStartCalendar.before(end)) { while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1); Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false);