Compare commits
25 Commits
7948a98365
...
74349c3253
Author | SHA1 | Date |
---|---|---|
Luca Frosini | 74349c3253 | |
Luca Frosini | e2bcfc016a | |
Luca Frosini | 0c7f0cee5e | |
Luca Frosini | 0abad39443 | |
Luca Frosini | 02a67aa9b8 | |
Luca Frosini | daaf484e17 | |
Luca Frosini | 225b0aacb7 | |
Luca Frosini | 5e8f31f384 | |
Luca Frosini | 58529a17ca | |
Luca Frosini | e7759dd97f | |
Luca Frosini | 2c927dda9d | |
Luca Frosini | a80d2b496a | |
Luca Frosini | 3d9fd9d42f | |
Luca Frosini | 0c236e845d | |
Luca Frosini | 210361a0ff | |
Luca Frosini | 13282e6236 | |
Luca Frosini | 3194c529d5 | |
Luca Frosini | feefa712c4 | |
Luca Frosini | e9eed9f889 | |
Luca Frosini | 9ff6bdc135 | |
Luca Frosini | d2a67855c1 | |
Luca Frosini | a11a3ba8e8 | |
Luca Frosini | 28b6498967 | |
Luca Frosini | 750c5ebb19 | |
Luca Frosini | abec9f97f8 |
17
pom.xml
17
pom.xml
|
@ -57,20 +57,25 @@
|
||||||
<artifactId>document-store-lib-postgresql</artifactId>
|
<artifactId>document-store-lib-postgresql</artifactId>
|
||||||
<version>[1.0.0, 2.0.0-SNAPSHOT)</version>
|
<version>[1.0.0, 2.0.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>org.gcube.accounting</groupId>
|
<!-- <groupId>org.gcube.accounting</groupId>-->
|
||||||
<artifactId>accounting-analytics-persistence-postgresql</artifactId>
|
<!-- <artifactId>accounting-analytics-persistence-postgresql</artifactId>-->
|
||||||
<version>[1.0.0, 2.0.0-SNAPSHOT)</version>
|
<!-- <version>[2.0.1-SNAPSHOT, 3.0.0-SNAPSHOT)</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.accounting</groupId>
|
<groupId>org.gcube.accounting</groupId>
|
||||||
<artifactId>accounting-analytics</artifactId>
|
<artifactId>accounting-analytics</artifactId>
|
||||||
<version>[3.0.0, 4.0.0-SNAPSHOT)</version>
|
<version>[4.0.0, 5.0.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
<artifactId>slf4j-api</artifactId>
|
<artifactId>slf4j-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.gcube.common</groupId>
|
||||||
|
<artifactId>authorization-utils</artifactId>
|
||||||
|
<version>[2.2.0, 3.0.0-SNAPSHOT)</version>
|
||||||
|
</dependency>
|
||||||
<!-- Test Dependencies -->
|
<!-- Test Dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package org.gcube.accounting.aggregator.aggregation;
|
package org.gcube.accounting.aggregator.aggregation;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
|
@ -9,7 +11,8 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationState;
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.accounting.aggregator.utility.Constant;
|
import org.gcube.accounting.aggregator.utility.Constant;
|
||||||
|
@ -43,12 +46,19 @@ public class Aggregator {
|
||||||
protected final File originalRecordsbackupFile;
|
protected final File originalRecordsbackupFile;
|
||||||
protected final File aggregateRecordsBackupFile;
|
protected final File aggregateRecordsBackupFile;
|
||||||
protected final File malformedRecordsFile;
|
protected final File malformedRecordsFile;
|
||||||
|
|
||||||
|
protected int estimatedRecordsNumber;
|
||||||
|
protected int originalRecordsNumber;
|
||||||
protected int malformedRecordNumber;
|
protected int malformedRecordNumber;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
protected ObjectMapper objectMapper;
|
protected ObjectMapper objectMapper;
|
||||||
|
|
||||||
protected Calendar startTime;
|
protected Calendar startTime;
|
||||||
|
|
||||||
|
protected boolean skipAggregation;
|
||||||
|
|
||||||
public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
|
public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
|
||||||
this.aggregationStatus = aggregationStatus;
|
this.aggregationStatus = aggregationStatus;
|
||||||
|
|
||||||
|
@ -59,12 +69,19 @@ public class Aggregator {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSkipAggregation(boolean skipAggregation) {
|
||||||
|
this.skipAggregation = skipAggregation;
|
||||||
|
}
|
||||||
|
|
||||||
public void aggregate() throws Exception {
|
public void aggregate() throws Exception {
|
||||||
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
|
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
|
||||||
startTime = Utility.getUTCCalendarInstance();
|
startTime = Utility.getUTCCalendarInstance();
|
||||||
|
|
||||||
PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector();
|
AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
|
||||||
ResultSet resultSet = postgreSQLConnector.getResultSetOfRecordToBeAggregated(aggregationStatus);
|
|
||||||
|
estimatedRecordsNumber = aggregatorPersistenceSrc.getEstimatedRecordRecordToBeAggregated(aggregationStatus);
|
||||||
|
logger.info("Estimated records to be aggregated are {}", estimatedRecordsNumber);
|
||||||
|
ResultSet resultSet = aggregatorPersistenceSrc.getResultSetOfRecordToBeAggregated(aggregationStatus);
|
||||||
|
|
||||||
retrieveAndAggregate(resultSet);
|
retrieveAndAggregate(resultSet);
|
||||||
}
|
}
|
||||||
|
@ -75,7 +92,7 @@ public class Aggregator {
|
||||||
private static final String SIMPLE = "Simple";
|
private static final String SIMPLE = "Simple";
|
||||||
|
|
||||||
|
|
||||||
protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
|
protected void elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer) throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
if(content.has(USAGE_RECORD_TYPE)){
|
if(content.has(USAGE_RECORD_TYPE)){
|
||||||
|
@ -154,21 +171,27 @@ public class Aggregator {
|
||||||
|
|
||||||
String record = content.toString();
|
String record = content.toString();
|
||||||
|
|
||||||
// Aggregate the Record
|
if(!skipAggregation) {
|
||||||
aggregateRow(aggregatorBuffer, record);
|
// Aggregate the Record
|
||||||
|
aggregateRow(aggregatorBuffer, record);
|
||||||
|
}
|
||||||
|
|
||||||
++originalRecordsCounter;
|
++originalRecordsNumber;
|
||||||
if(originalRecordsCounter%1000==0){
|
if(originalRecordsNumber%1000==0){
|
||||||
int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
|
int aggregatedRecordsNumber = 0;
|
||||||
int diff = originalRecordsCounter - aggregatedRecordsNumber;
|
if(!skipAggregation) {
|
||||||
float percentage = (100 * diff) / originalRecordsCounter;
|
aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
|
||||||
logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents",
|
}else {
|
||||||
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
|
aggregatedRecordsNumber = originalRecordsNumber;
|
||||||
|
}
|
||||||
|
int diff = originalRecordsNumber - aggregatedRecordsNumber;
|
||||||
|
float percentage = (100 * diff) / originalRecordsNumber;
|
||||||
|
logger.info("{} At the moment, the elaborated original records are {} (Total Estimated Number is {}). The Aggregated records are {}. Difference {}. We are recovering {}% of Records",
|
||||||
|
aggregationStatus.getAggregationInfo(), originalRecordsNumber, estimatedRecordsNumber, aggregatedRecordsNumber, diff, percentage);
|
||||||
}
|
}
|
||||||
|
|
||||||
Utility.printLine(originalRecordsbackupFile, record);
|
Utility.printLine(originalRecordsbackupFile, record);
|
||||||
|
|
||||||
return originalRecordsCounter;
|
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -218,11 +241,10 @@ public class Aggregator {
|
||||||
Class<? extends AggregatedRecord<?, ?>> clz = RecordUtility.getAggregatedRecordClass(type);
|
Class<? extends AggregatedRecord<?, ?>> clz = RecordUtility.getAggregatedRecordClass(type);
|
||||||
RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz);
|
RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz);
|
||||||
|
|
||||||
|
|
||||||
Set<String> requiredFields = clz.newInstance().getRequiredFields();
|
Set<String> requiredFields = clz.newInstance().getRequiredFields();
|
||||||
|
|
||||||
|
originalRecordsNumber = 0;
|
||||||
malformedRecordNumber = 0;
|
malformedRecordNumber = 0;
|
||||||
int originalRecordsCounter = 0;
|
|
||||||
while (resultSet.next()) {
|
while (resultSet.next()) {
|
||||||
for(int i=1; i<=MAX_RETRY; i++){
|
for(int i=1; i<=MAX_RETRY; i++){
|
||||||
try {
|
try {
|
||||||
|
@ -252,7 +274,7 @@ public class Aggregator {
|
||||||
addProperty(objectNode, recordField, obj);
|
addProperty(objectNode, recordField, obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
originalRecordsCounter = elaborateRow(objectNode, aggregatorBuffer, originalRecordsCounter);
|
elaborateRow(objectNode, aggregatorBuffer);
|
||||||
TimeUnit.MILLISECONDS.sleep(3);
|
TimeUnit.MILLISECONDS.sleep(3);
|
||||||
break;
|
break;
|
||||||
}catch (RuntimeException e) {
|
}catch (RuntimeException e) {
|
||||||
|
@ -269,25 +291,32 @@ public class Aggregator {
|
||||||
logger.debug("{} Elaboration of Records terminated at {}. Duration {}",
|
logger.debug("{} Elaboration of Records terminated at {}. Duration {}",
|
||||||
aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman);
|
aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman);
|
||||||
|
|
||||||
File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(),
|
|
||||||
aggregateRecordsBackupFile.getName() + TMP_SUFFIX);
|
|
||||||
aggregateRecordsBackupFileTmp.delete();
|
|
||||||
|
|
||||||
// Saving Aggregated Record on local file
|
|
||||||
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
|
|
||||||
aggregateRecordsBackupFile);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
|
if(!skipAggregation) {
|
||||||
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
|
File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(),
|
||||||
String marshalled = DSMapper.marshal(aggregatedRecord);
|
aggregateRecordsBackupFile.getName() + TMP_SUFFIX);
|
||||||
Utility.printLine(aggregateRecordsBackupFileTmp, marshalled);
|
aggregateRecordsBackupFileTmp.delete();
|
||||||
|
|
||||||
|
// Saving Aggregated Record on local file
|
||||||
|
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
|
||||||
|
aggregateRecordsBackupFile);
|
||||||
|
|
||||||
|
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
|
||||||
|
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
|
||||||
|
String marshalled = DSMapper.marshal(aggregatedRecord);
|
||||||
|
Utility.printLine(aggregateRecordsBackupFileTmp, marshalled);
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
|
||||||
|
|
||||||
|
aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecords.size(), malformedRecordNumber);
|
||||||
|
}else {
|
||||||
|
Files.copy(originalRecordsbackupFile.toPath(), aggregateRecordsBackupFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
aggregationStatus.setRecordNumbers(originalRecordsNumber, originalRecordsNumber, malformedRecordNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
|
|
||||||
|
|
||||||
aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber);
|
|
||||||
aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true);
|
aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ public class AggregatorManager {
|
||||||
protected boolean forceRerun;
|
protected boolean forceRerun;
|
||||||
protected boolean forceRestart;
|
protected boolean forceRestart;
|
||||||
|
|
||||||
|
protected boolean skipAggregation;
|
||||||
|
|
||||||
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
|
public AggregatorManager(AggregationType aggregationType, boolean restartFromLastAggregationDate,
|
||||||
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
|
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
|
||||||
this.aggregationType = aggregationType;
|
this.aggregationType = aggregationType;
|
||||||
|
@ -54,6 +56,10 @@ public class AggregatorManager {
|
||||||
this.forceRestart = forceRestart;
|
this.forceRestart = forceRestart;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSkipAggregation(boolean skipAggregation) {
|
||||||
|
this.skipAggregation = skipAggregation;
|
||||||
|
}
|
||||||
|
|
||||||
protected Date getEndDateFromStartDate() {
|
protected Date getEndDateFromStartDate() {
|
||||||
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
|
return Utility.getEndDateFromStartDate(aggregationType, aggregationStartDate, 1);
|
||||||
}
|
}
|
||||||
|
@ -99,7 +105,7 @@ public class AggregatorManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
Elaborator elaborator = new Elaborator(aggregationStatus);
|
Elaborator elaborator = new Elaborator(aggregationStatus);
|
||||||
elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart);
|
elaborator.elaborate(forceEarlyAggregation, forceRerun, forceRestart, skipAggregation);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class Elaborator {
|
||||||
return allowed;
|
return allowed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart) throws Exception {
|
public void elaborate(boolean forceEarlyAggregation, boolean forceRerun, boolean forceRestart, boolean skipAggregation) throws Exception {
|
||||||
Calendar startTime = Utility.getUTCCalendarInstance();
|
Calendar startTime = Utility.getUTCCalendarInstance();
|
||||||
|
|
||||||
final AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
final AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
||||||
|
@ -164,6 +164,7 @@ public class Elaborator {
|
||||||
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
|
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
|
||||||
|
|
||||||
Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile);
|
Aggregator aggregator = new Aggregator(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile);
|
||||||
|
aggregator.setSkipAggregation(skipAggregation);
|
||||||
aggregator.aggregate();
|
aggregator.aggregate();
|
||||||
|
|
||||||
Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
|
Persist persist = new Persist(aggregationStatus, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
|
||||||
|
@ -185,6 +186,20 @@ public class Elaborator {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static File getOriginalRecordsBackupFile(File elaborationDirectory, AggregationInfo aggregationInfo) {
|
||||||
|
String recordType = aggregationInfo.getRecordType();
|
||||||
|
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
||||||
|
AggregationType aggregationType = aggregationInfo.getAggregationType();
|
||||||
|
|
||||||
|
DateFormat dateFormat = aggregationType.getDateFormat();
|
||||||
|
String dateString = dateFormat.format(aggregationStartDate);
|
||||||
|
String[] splittedDate = dateString.split(AggregationType.DATE_SEPARATOR);
|
||||||
|
|
||||||
|
String backupFileName = splittedDate[splittedDate.length-1] + "-" + recordType;
|
||||||
|
File originalRecordsbackupFile = new File(elaborationDirectory, backupFileName + ORIGINAL_SUFFIX);
|
||||||
|
return originalRecordsbackupFile;
|
||||||
|
}
|
||||||
|
|
||||||
protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception {
|
protected File getOriginalRecordsBackupFile(File elaborationDirectory, String name) throws Exception {
|
||||||
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
||||||
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
Date aggregationStartDate = aggregationInfo.getAggregationStartDate();
|
||||||
|
|
|
@ -5,7 +5,8 @@ import java.util.List;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceStatus;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.documentstore.records.DSMapper;
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -35,8 +36,8 @@ public class RecoveryManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void recovery() throws Exception {
|
public void recovery() throws Exception {
|
||||||
PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector();
|
AggregatorPersistenceStatus aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus();
|
||||||
List<AggregationStatus> aggregationStatusList = postgreSQLConnector.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart);
|
List<AggregationStatus> aggregationStatusList = aggregatorPersistenceStatus.getUnterminated(recordType, aggregationType, aggregationStartDate, aggregationEndDate, forceRestart);
|
||||||
if(aggregationStatusList.size()==0){
|
if(aggregationStatusList.size()==0){
|
||||||
logger.info("Nothing to recover :)");
|
logger.info("Nothing to recover :)");
|
||||||
}
|
}
|
||||||
|
@ -48,7 +49,7 @@ public class RecoveryManager {
|
||||||
logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.info("Going to Recover unterminated elaboration {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
|
|
||||||
Elaborator elaborator = new Elaborator(aggregationStatus);
|
Elaborator elaborator = new Elaborator(aggregationStatus);
|
||||||
elaborator.elaborate(true, true, forceRestart);
|
elaborator.elaborate(true, true, forceRestart, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
package org.gcube.accounting.aggregator.persist;
|
package org.gcube.accounting.aggregator.persist;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationState;
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
import org.gcube.documentstore.records.DSMapper;
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -13,22 +16,39 @@ import org.gcube.documentstore.records.DSMapper;
|
||||||
*/
|
*/
|
||||||
public class DeleteDocument extends DocumentElaboration {
|
public class DeleteDocument extends DocumentElaboration {
|
||||||
|
|
||||||
public DeleteDocument(AggregationStatus aggregationStatus, File file){
|
protected AggregatorPersistenceSrc aggregatorPersistenceSrc;
|
||||||
|
protected ArrayNode arrayNode;
|
||||||
|
|
||||||
|
public DeleteDocument(AggregationStatus aggregationStatus, File file) throws Exception{
|
||||||
super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber());
|
super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber());
|
||||||
|
arrayNode = DSMapper.getObjectMapper().createArrayNode();
|
||||||
|
aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void elaborateLine(String line) throws Exception {
|
protected void elaborateLine(String line) throws Exception {
|
||||||
JsonNode jsonNode = DSMapper.asJsonNode(line);
|
JsonNode jsonNode = DSMapper.asJsonNode(line);
|
||||||
String id = jsonNode.get(ID).asText();
|
if(aggregatorPersistenceSrc.isBulkDeleteAllowed()) {
|
||||||
logger.trace("Going to delete record with id {}", id);
|
arrayNode.add(jsonNode);
|
||||||
PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector();
|
if(arrayNode.size()>=effectiveMaxRowPerStep) {
|
||||||
postgreSQLConnector.deleteRecord(jsonNode);
|
aggregatorPersistenceSrc.deleteRecords(arrayNode);
|
||||||
|
arrayNode = DSMapper.getObjectMapper().createArrayNode();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
String id = jsonNode.get(ID).asText();
|
||||||
|
logger.trace("Going to delete record with id {}", id);
|
||||||
|
aggregatorPersistenceSrc.deleteRecord(jsonNode);
|
||||||
|
TimeUnit.MILLISECONDS.sleep(2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterElaboration() {
|
protected void afterElaboration() throws Exception {
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
|
if(aggregatorPersistenceSrc.isBulkDeleteAllowed() && arrayNode.size()>0) {
|
||||||
|
aggregatorPersistenceSrc.deleteRecords(arrayNode);
|
||||||
|
arrayNode = DSMapper.getObjectMapper().createArrayNode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,24 +24,27 @@ public abstract class DocumentElaboration {
|
||||||
|
|
||||||
protected static final String ID = Record.ID;
|
protected static final String ID = Record.ID;
|
||||||
|
|
||||||
protected static final int THRESHOLD_FOR_FIVE_PERCENT = 100000;
|
|
||||||
protected static final int THRESHOLD_FOR_ONE_PERCENT = 1000000;
|
|
||||||
|
|
||||||
public static final int MAX_RETRY = 7;
|
public static final int MAX_RETRY = 7;
|
||||||
|
|
||||||
|
public static final int MAX_ROWS_PER_STEP = 500;
|
||||||
|
|
||||||
protected final AggregationStatus aggregationStatus;
|
protected final AggregationStatus aggregationStatus;
|
||||||
protected final File file;
|
protected final File file;
|
||||||
protected final AggregationState finalAggregationState;
|
protected final AggregationState finalAggregationState;
|
||||||
|
|
||||||
protected final int rowToBeElaborated;
|
protected final int rowToBeElaborated;
|
||||||
|
protected int currentlyElaborated;
|
||||||
|
|
||||||
protected Calendar startTime;
|
protected Calendar startTime;
|
||||||
|
|
||||||
|
protected int effectiveMaxRowPerStep;
|
||||||
|
|
||||||
protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) {
|
protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) {
|
||||||
this.aggregationStatus = statusManager;
|
this.aggregationStatus = statusManager;
|
||||||
this.finalAggregationState = finalAggregationState;
|
this.finalAggregationState = finalAggregationState;
|
||||||
this.file = file;
|
this.file = file;
|
||||||
this.rowToBeElaborated = rowToBeElaborated;
|
this.rowToBeElaborated = rowToBeElaborated;
|
||||||
|
this.currentlyElaborated = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void readFile() throws Exception {
|
protected void readFile() throws Exception {
|
||||||
|
@ -57,16 +60,17 @@ public abstract class DocumentElaboration {
|
||||||
|
|
||||||
logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated);
|
logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated);
|
||||||
|
|
||||||
int percentOfNumberOfRows = (rowToBeElaborated / 10) + 1;
|
effectiveMaxRowPerStep = (rowToBeElaborated / 10) + 1;
|
||||||
if(rowToBeElaborated >= THRESHOLD_FOR_FIVE_PERCENT) {
|
if(effectiveMaxRowPerStep>MAX_ROWS_PER_STEP) {
|
||||||
percentOfNumberOfRows = percentOfNumberOfRows / 2;
|
effectiveMaxRowPerStep = MAX_ROWS_PER_STEP;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(rowToBeElaborated >= THRESHOLD_FOR_ONE_PERCENT) {
|
currentlyElaborated = 0;
|
||||||
percentOfNumberOfRows = percentOfNumberOfRows / 5;
|
int restartFrom = aggregationStatus.getRestartFrom();
|
||||||
|
if(restartFrom>0) {
|
||||||
|
logger.info("The elaboration will be restarted from record number {}", aggregationStatus.getRestartFrom());
|
||||||
}
|
}
|
||||||
|
|
||||||
int elaborated = 0;
|
|
||||||
String line;
|
String line;
|
||||||
// Read File Line By Line
|
// Read File Line By Line
|
||||||
while((line = br.readLine()) != null) {
|
while((line = br.readLine()) != null) {
|
||||||
|
@ -75,9 +79,11 @@ public abstract class DocumentElaboration {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while(elaborate) {
|
while(elaborate) {
|
||||||
++i;
|
++i;
|
||||||
|
if(currentlyElaborated<restartFrom) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
elaborateLine(line);
|
elaborateLine(line);
|
||||||
TimeUnit.MILLISECONDS.sleep(3);
|
|
||||||
elaborate = false;
|
elaborate = false;
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
if(i != 1) {
|
if(i != 1) {
|
||||||
|
@ -98,23 +104,26 @@ public abstract class DocumentElaboration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
++elaborated;
|
++currentlyElaborated;
|
||||||
if(elaborated % percentOfNumberOfRows == 0) {
|
if(currentlyElaborated % effectiveMaxRowPerStep == 0) {
|
||||||
int elaboratedPercentage = elaborated * 100 / rowToBeElaborated;
|
if(currentlyElaborated>=restartFrom) {
|
||||||
|
aggregationStatus.setRestartFrom(currentlyElaborated, true);
|
||||||
|
}
|
||||||
|
int elaboratedPercentage = currentlyElaborated * 100 / rowToBeElaborated;
|
||||||
logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(),
|
logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(),
|
||||||
elaborated, rowToBeElaborated, elaboratedPercentage);
|
currentlyElaborated, rowToBeElaborated, elaboratedPercentage);
|
||||||
}
|
}
|
||||||
if(elaborated > rowToBeElaborated) {
|
if(currentlyElaborated > rowToBeElaborated) {
|
||||||
throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated
|
throw new Exception("Elaborated file line is number " + currentlyElaborated + " > " + rowToBeElaborated
|
||||||
+ " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
|
+ " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(elaborated != rowToBeElaborated) {
|
if(currentlyElaborated != rowToBeElaborated) {
|
||||||
throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated
|
throw new Exception("Elaborated file line is number " + currentlyElaborated + " != " + rowToBeElaborated
|
||||||
+ "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
|
+ "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated,
|
logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), currentlyElaborated,
|
||||||
rowToBeElaborated, 100);
|
rowToBeElaborated, 100);
|
||||||
|
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
|
@ -137,9 +146,14 @@ public abstract class DocumentElaboration {
|
||||||
|
|
||||||
public void elaborate() throws Exception {
|
public void elaborate() throws Exception {
|
||||||
startTime = Utility.getUTCCalendarInstance();
|
startTime = Utility.getUTCCalendarInstance();
|
||||||
readFile();
|
try {
|
||||||
afterElaboration();
|
readFile();
|
||||||
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
|
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
|
||||||
|
}catch (Exception e) {
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
afterElaboration();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void elaborateLine(String line) throws Exception;
|
protected abstract void elaborateLine(String line) throws Exception;
|
||||||
|
|
|
@ -6,17 +6,18 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.elaboration.Elaborator;
|
import org.gcube.accounting.aggregator.elaboration.Elaborator;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceDst;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationState;
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.accounting.aggregator.utility.Utility;
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
|
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
||||||
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
|
|
||||||
import org.gcube.com.fasterxml.jackson.core.JsonProcessingException;
|
import org.gcube.com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
||||||
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
|
|
||||||
import org.gcube.documentstore.records.DSMapper;
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
import org.gcube.documentstore.records.Record;
|
import org.gcube.documentstore.records.Record;
|
||||||
import org.gcube.documentstore.records.RecordUtility;
|
import org.gcube.documentstore.records.RecordUtility;
|
||||||
|
@ -42,7 +43,8 @@ public class InsertDocument extends DocumentElaboration {
|
||||||
|
|
||||||
protected boolean serviceUsageRecordElaboration;
|
protected boolean serviceUsageRecordElaboration;
|
||||||
protected File calledMethodCSVFile;
|
protected File calledMethodCSVFile;
|
||||||
protected PersistencePostgreSQL persistencePostgreSQL;
|
|
||||||
|
protected AggregatorPersistenceDst aggregatorPersistenceDst;
|
||||||
|
|
||||||
protected int count;
|
protected int count;
|
||||||
|
|
||||||
|
@ -56,9 +58,7 @@ public class InsertDocument extends DocumentElaboration {
|
||||||
File destinationFolder = file.getParentFile();
|
File destinationFolder = file.getParentFile();
|
||||||
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
|
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
|
||||||
|
|
||||||
AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class);
|
aggregatorPersistenceDst = AggregatorPersistenceFactory.getAggregatorPersistenceDst();
|
||||||
persistencePostgreSQL = new PersistencePostgreSQL();
|
|
||||||
persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration);
|
|
||||||
|
|
||||||
count = 0;
|
count = 0;
|
||||||
}
|
}
|
||||||
|
@ -111,19 +111,20 @@ public class InsertDocument extends DocumentElaboration {
|
||||||
JsonNode jsonNode = analyseLine(line);
|
JsonNode jsonNode = analyseLine(line);
|
||||||
Record record = RecordUtility.getRecord(jsonNode.toString());
|
Record record = RecordUtility.getRecord(jsonNode.toString());
|
||||||
|
|
||||||
persistencePostgreSQL.insert(record);
|
aggregatorPersistenceDst.insert(record);
|
||||||
++count;
|
++count;
|
||||||
|
|
||||||
if(count==100) {
|
// if(count==100) {
|
||||||
persistencePostgreSQL.commitAndClose();
|
// aggregatorPersistenceDst.commitAndClose();
|
||||||
count = 0;
|
// count = 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
TimeUnit.MILLISECONDS.sleep(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterElaboration() throws Exception {
|
protected void afterElaboration() throws Exception {
|
||||||
persistencePostgreSQL.commitAndClose();
|
aggregatorPersistenceDst.commitAndClose();
|
||||||
count = 0;
|
count = 0;
|
||||||
|
|
||||||
if(serviceUsageRecordElaboration) {
|
if(serviceUsageRecordElaboration) {
|
||||||
|
|
|
@ -5,8 +5,4 @@ package org.gcube.accounting.aggregator.persistence;
|
||||||
*/
|
*/
|
||||||
public interface AggregatorPersistence {
|
public interface AggregatorPersistence {
|
||||||
|
|
||||||
public static final int KEY_VALUES_LIMIT = 25;
|
|
||||||
|
|
||||||
public void prepareConnection(AggregatorPersitenceConfiguration configuration) throws Exception;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,14 +4,13 @@ import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI-CNR)
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
* @author Alessandro Pieve (ISTI-CNR)
|
|
||||||
*/
|
*/
|
||||||
public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration {
|
public class AggregatorPersistenceConfiguration extends AccountingPersistenceConfiguration {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default Constructor
|
* Default Constructor
|
||||||
*/
|
*/
|
||||||
public AggregatorPersitenceConfiguration() {
|
public AggregatorPersistenceConfiguration() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +19,7 @@ public class AggregatorPersitenceConfiguration extends AccountingPersistenceConf
|
||||||
* @throws Exception if fails
|
* @throws Exception if fails
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings({ "rawtypes" })
|
@SuppressWarnings({ "rawtypes" })
|
||||||
public AggregatorPersitenceConfiguration(Class<?> persistence) throws Exception {
|
public AggregatorPersistenceConfiguration(Class<?> persistence) throws Exception {
|
||||||
super((Class) persistence);
|
super((Class) persistence);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
import org.gcube.documentstore.records.Record;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
*/
|
||||||
|
public interface AggregatorPersistenceDst extends AggregatorPersistence {
|
||||||
|
|
||||||
|
public void insert(Record record) throws Exception;
|
||||||
|
|
||||||
|
public void commitAndClose() throws Exception;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
public class AggregatorPersistenceFactory {
|
||||||
|
|
||||||
|
private static AggregatorPersistenceStatus aggregatorPersistenceStatus;
|
||||||
|
|
||||||
|
private static AggregatorPersistenceStatusSrc aggregatorPersistenceStatusSrc;
|
||||||
|
private static AggregatorPersistenceStatusDst aggregatorPersistenceStatusDst;
|
||||||
|
|
||||||
|
private static AggregatorPersistenceSrc aggregatorPersistenceSrc;
|
||||||
|
private static AggregatorPersistenceDst aggregatorPersistenceDst;
|
||||||
|
|
||||||
|
public static AggregatorPersistenceStatus getAggregatorPersistenceStatus() throws Exception {
|
||||||
|
if(aggregatorPersistenceStatus == null) {
|
||||||
|
aggregatorPersistenceStatus = new PostgreSQLConnectorStatus();
|
||||||
|
}
|
||||||
|
return aggregatorPersistenceStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static AggregatorPersistenceStatusSrc getAggregatorPersistenceStatusSrc() throws Exception {
|
||||||
|
if(aggregatorPersistenceStatusSrc == null) {
|
||||||
|
aggregatorPersistenceStatusSrc = new PostgreSQLConnectorStatusSrc();
|
||||||
|
}
|
||||||
|
return aggregatorPersistenceStatusSrc;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static AggregatorPersistenceStatusDst getAggregatorPersistenceStatusDst() throws Exception {
|
||||||
|
if(aggregatorPersistenceStatusDst == null) {
|
||||||
|
aggregatorPersistenceStatusDst = new PostgreSQLConnectorStatusDst();
|
||||||
|
}
|
||||||
|
return aggregatorPersistenceStatusDst;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AggregatorPersistenceSrc getAggregatorPersistenceSrc() throws Exception {
|
||||||
|
if(aggregatorPersistenceSrc == null) {
|
||||||
|
aggregatorPersistenceSrc = new PostgreSQLConnectorSrc();
|
||||||
|
}
|
||||||
|
return aggregatorPersistenceSrc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AggregatorPersistenceDst getAggregatorPersistenceDst() throws Exception {
|
||||||
|
if(aggregatorPersistenceDst == null) {
|
||||||
|
aggregatorPersistenceDst = new PostgreSQLConnectorDst();
|
||||||
|
}
|
||||||
|
return aggregatorPersistenceDst;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
|
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
*/
|
||||||
|
public interface AggregatorPersistenceSrc extends AggregatorPersistence {
|
||||||
|
|
||||||
|
public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception;
|
||||||
|
|
||||||
|
public void deleteRecord(JsonNode jsonNode) throws Exception;
|
||||||
|
|
||||||
|
public boolean isBulkDeleteAllowed();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It must be implemented only and only if isBulkDeleteAllowed()
|
||||||
|
* return true. It must raise UnsupportedOperationException if bulk delete is not allowed
|
||||||
|
*/
|
||||||
|
public void deleteRecords(ArrayNode array) throws UnsupportedOperationException, Exception;
|
||||||
|
|
||||||
|
public int getEstimatedRecordRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
*/
|
||||||
|
public interface AggregatorPersistenceStatus {
|
||||||
|
|
||||||
|
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType,
|
||||||
|
Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception;
|
||||||
|
|
||||||
|
public List<AggregationStatus> getAll() throws Exception;
|
||||||
|
|
||||||
|
public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType,
|
||||||
|
Date aggregationStartDate) throws Exception;
|
||||||
|
|
||||||
|
public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate,
|
||||||
|
Date aggregationEndDate) throws Exception;
|
||||||
|
|
||||||
|
public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
* Used only to migrate status to move database.
|
||||||
|
* Do not use for normal operation of aggregation
|
||||||
|
*/
|
||||||
|
interface AggregatorPersistenceStatusDst extends AggregatorPersistenceStatus {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
* Used only to migrate status to move database.
|
||||||
|
* Do not use for normal operation of aggregation
|
||||||
|
*/
|
||||||
|
interface AggregatorPersistenceStatusSrc extends AggregatorPersistenceStatus {
|
||||||
|
|
||||||
|
}
|
|
@ -12,9 +12,9 @@ import java.util.ArrayList;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
|
@ -23,55 +23,46 @@ import org.gcube.accounting.aggregator.status.AggregationStateEvent;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.accounting.aggregator.utility.Constant;
|
import org.gcube.accounting.aggregator.utility.Constant;
|
||||||
import org.gcube.accounting.aggregator.utility.Utility;
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
|
|
||||||
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
|
|
||||||
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
|
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
|
||||||
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
|
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
|
||||||
import org.gcube.accounting.utility.postgresql.RecordToDBFields;
|
import org.gcube.accounting.utility.postgresql.RecordToDBFields;
|
||||||
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
|
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
|
||||||
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
||||||
import org.gcube.documentstore.records.AggregatedRecord;
|
import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
|
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
|
||||||
import org.gcube.documentstore.records.DSMapper;
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
import org.gcube.documentstore.records.Record;
|
import org.gcube.documentstore.records.Record;
|
||||||
import org.gcube.documentstore.records.RecordUtility;
|
|
||||||
import org.postgresql.core.Utils;
|
import org.postgresql.core.Utils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI-CNR)
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
*/
|
*/
|
||||||
public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
public class PostgreSQLConnector extends PersistencePostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceDst, AggregatorPersistenceStatus {
|
||||||
|
|
||||||
|
public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
|
||||||
|
|
||||||
private static final String UTC_TIME_ZONE = "UTC";
|
private static final String UTC_TIME_ZONE = "UTC";
|
||||||
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE);
|
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE);
|
||||||
|
|
||||||
|
protected AccountingPersistenceConfiguration configuration;
|
||||||
protected Connection connection;
|
protected Connection connection;
|
||||||
|
|
||||||
private static PostgreSQLConnector postgreSQLConnector;
|
protected PostgreSQLConnector(Class<?> clazz) throws Exception {
|
||||||
|
super();
|
||||||
public static PostgreSQLConnector getPostgreSQLConnector() throws Exception {
|
this.configuration = new AccountingPersistenceConfiguration(clazz);
|
||||||
if(postgreSQLConnector == null) {
|
prepareConnection(configuration);
|
||||||
postgreSQLConnector = new PostgreSQLConnector();
|
|
||||||
}
|
|
||||||
return postgreSQLConnector;
|
|
||||||
}
|
|
||||||
|
|
||||||
private PostgreSQLConnector() throws Exception {
|
|
||||||
this.configuration = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryPostgreSQL.class);
|
|
||||||
Map<String, Class<? extends AggregatedRecord<?,?>>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound();
|
|
||||||
for(String typeName : aggregatedRecords.keySet()) {
|
|
||||||
try {
|
|
||||||
Class<? extends AggregatedRecord<?,?>> clz = aggregatedRecords.get(typeName);
|
|
||||||
RecordToDBMapping.addRecordToDB(clz, configuration);
|
|
||||||
} catch (Exception e) {
|
|
||||||
new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connection getConnection() throws Exception {
|
protected Connection getConnection() throws Exception {
|
||||||
if(connection==null) {
|
if(connection==null || connection.isClosed()) {
|
||||||
|
String url = configuration.getProperty(AccountingPersistenceConfiguration.URL_PROPERTY_KEY);
|
||||||
|
|
||||||
|
if(connection!=null && connection.isClosed()) {
|
||||||
|
logger.warn("The connection was closed. We should investigate why. Going to reconnect to {}.", url);
|
||||||
|
}
|
||||||
|
|
||||||
Class.forName("org.postgresql.Driver");
|
Class.forName("org.postgresql.Driver");
|
||||||
String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY);
|
|
||||||
String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY);
|
String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY);
|
||||||
String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY);
|
String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY);
|
||||||
|
|
||||||
|
@ -126,7 +117,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return getQuotedString(serializable.toString());
|
return getQuotedString(serializable.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException {
|
protected String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException {
|
||||||
StringBuffer stringBuffer = new StringBuffer();
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
|
||||||
stringBuffer.append("INSERT INTO ");
|
stringBuffer.append("INSERT INTO ");
|
||||||
|
@ -134,6 +125,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
stringBuffer.append(" (id, ");
|
stringBuffer.append(" (id, ");
|
||||||
stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, ");
|
stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, ");
|
||||||
stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, ");
|
stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, ");
|
||||||
|
stringBuffer.append("restart_from, ");
|
||||||
stringBuffer.append("context, current_aggregation_state, last_update_time, previous)");
|
stringBuffer.append("context, current_aggregation_state, last_update_time, previous)");
|
||||||
stringBuffer.append(" VALUES (");
|
stringBuffer.append(" VALUES (");
|
||||||
stringBuffer.append(getValue(aggregationStatus.getUUID().toString()));
|
stringBuffer.append(getValue(aggregationStatus.getUUID().toString()));
|
||||||
|
@ -158,6 +150,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber()));
|
stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber()));
|
||||||
stringBuffer.append(", ");
|
stringBuffer.append(", ");
|
||||||
stringBuffer.append(getValue(aggregationStatus.getPercentage()));
|
stringBuffer.append(getValue(aggregationStatus.getPercentage()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getRestartFrom()));
|
||||||
|
|
||||||
stringBuffer.append(", ");
|
stringBuffer.append(", ");
|
||||||
stringBuffer.append(getValue(aggregationStatus.getContext()));
|
stringBuffer.append(getValue(aggregationStatus.getContext()));
|
||||||
|
@ -176,8 +170,14 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
if(upsert) {
|
if(upsert) {
|
||||||
stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET ");
|
stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET ");
|
||||||
stringBuffer.append("original_records_number=EXCLUDED.original_records_number, aggregated_records_number=EXCLUDED.aggregated_records_number, recovered_records_number=EXCLUDED.recovered_records_number, malformed_records_number=EXCLUDED.malformed_records_number, percentage=EXCLUDED.percentage, ");
|
stringBuffer.append("original_records_number=EXCLUDED.original_records_number, ");
|
||||||
stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;");
|
stringBuffer.append("aggregated_records_number=EXCLUDED.aggregated_records_number, ");
|
||||||
|
stringBuffer.append("recovered_records_number=EXCLUDED.recovered_records_number, ");
|
||||||
|
stringBuffer.append("malformed_records_number=EXCLUDED.malformed_records_number, ");
|
||||||
|
stringBuffer.append("percentage=EXCLUDED.percentage, ");
|
||||||
|
stringBuffer.append("restart_from=EXCLUDED.restart_from, ");
|
||||||
|
stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, ");
|
||||||
|
stringBuffer.append("last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;");
|
||||||
}else {
|
}else {
|
||||||
stringBuffer.append(");");
|
stringBuffer.append(");");
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return stringBuffer.toString();
|
return stringBuffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException {
|
protected String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException {
|
||||||
StringBuffer stringBuffer = new StringBuffer();
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
for(AggregationStateEvent aggregationStateEvent : aggregationStatus.getAggregationStateEvents()) {
|
for(AggregationStateEvent aggregationStateEvent : aggregationStatus.getAggregationStateEvents()) {
|
||||||
stringBuffer.append("INSERT INTO ");
|
stringBuffer.append("INSERT INTO ");
|
||||||
|
@ -204,16 +204,32 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return stringBuffer.toString();
|
return stringBuffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
||||||
Connection connection = getConnection();
|
for(int i=0; i<3; i++){
|
||||||
Statement statement = connection.createStatement();
|
try {
|
||||||
String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true);
|
Connection connection = getConnection();
|
||||||
statement.executeUpdate(sqlCommand);
|
Statement statement = connection.createStatement();
|
||||||
sqlCommand = getInsertAggregationStateQuery(aggregationStatus);
|
String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true);
|
||||||
statement.executeUpdate(sqlCommand);
|
statement.executeUpdate(sqlCommand);
|
||||||
statement.close();
|
sqlCommand = getInsertAggregationStateQuery(aggregationStatus);
|
||||||
connection.commit();
|
statement.executeUpdate(sqlCommand);
|
||||||
|
statement.close();
|
||||||
|
connection.commit();
|
||||||
|
break;
|
||||||
|
}catch (Throwable e) {
|
||||||
|
if(i<3) {
|
||||||
|
long delay = TimeUnit.MILLISECONDS.toMillis(100);
|
||||||
|
logger.error("Unable to upsert aggregation status at attemp {}. Retrying in {} millis.", i, delay, e);
|
||||||
|
Thread.sleep(delay);
|
||||||
|
}else {
|
||||||
|
logger.error("Unable to upsert aggregation status.", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// private Calendar getCalendar(ResultSet resultSet, String columnLabel) throws SQLException {
|
// private Calendar getCalendar(ResultSet resultSet, String columnLabel) throws SQLException {
|
||||||
|
@ -230,7 +246,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return date;
|
return date;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception {
|
protected AggregationStatus getAggregationStatusFromResultSet(ResultSet resultSet) throws Exception {
|
||||||
String recordType = resultSet.getString("record_type");
|
String recordType = resultSet.getString("record_type");
|
||||||
String aggregationTypeString = resultSet.getString("aggregation_type");
|
String aggregationTypeString = resultSet.getString("aggregation_type");
|
||||||
AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString);
|
AggregationType aggregationType = AggregationType.valueOf(aggregationTypeString);
|
||||||
|
@ -250,6 +266,9 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
int malformedRecordNumber = resultSet.getInt("malformed_records_number");
|
int malformedRecordNumber = resultSet.getInt("malformed_records_number");
|
||||||
aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber);
|
aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber);
|
||||||
|
|
||||||
|
int restartFrom = resultSet.getInt("restart_from");
|
||||||
|
aggregationStatus.setRestartFrom(restartFrom, false);
|
||||||
|
|
||||||
String context = resultSet.getString("context");
|
String context = resultSet.getString("context");
|
||||||
aggregationStatus.setContext(context);
|
aggregationStatus.setContext(context);
|
||||||
|
|
||||||
|
@ -265,6 +284,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return aggregationStatus;
|
return aggregationStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -276,7 +296,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
* aggregation_start_date >= '2017-05-01 00:00:00.000 +0000' AND
|
* aggregation_start_date >= '2017-05-01 00:00:00.000 +0000' AND
|
||||||
* aggregation_start_date <= '2017-05-31 00:00:00.000 +0000'
|
* aggregation_start_date <= '2017-05-31 00:00:00.000 +0000'
|
||||||
*
|
*
|
||||||
* ORDER BY last_update_time DESC LIMIT 1
|
* ORDER BY aggregation_start_date DESC LIMIT 1
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -328,6 +348,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
// return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart);
|
// return getUnterminated(null, null, aggregationStartDate, aggregationEndDate, forceRestart);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
@Override
|
||||||
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{
|
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception{
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -405,6 +426,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public List<AggregationStatus> getAll() throws Exception{
|
public List<AggregationStatus> getAll() throws Exception{
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -415,8 +437,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
StringBuffer stringBuffer = new StringBuffer();
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
stringBuffer.append("SELECT * ");
|
stringBuffer.append("SELECT * ");
|
||||||
stringBuffer.append("FROM aggregation_status");
|
stringBuffer.append("FROM aggregation_status ");
|
||||||
stringBuffer.append("ORDER BY aggregation_start_date ASC");
|
stringBuffer.append("ORDER BY aggregation_start_date ASC");
|
||||||
|
|
||||||
Connection connection = getConnection();
|
Connection connection = getConnection();
|
||||||
Statement statement = connection.createStatement();
|
Statement statement = connection.createStatement();
|
||||||
|
@ -438,7 +460,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
public AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -489,6 +511,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return aggregationStatus;
|
return aggregationStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void deleteRecord(JsonNode jsonNode) throws Exception {
|
public void deleteRecord(JsonNode jsonNode) throws Exception {
|
||||||
Record record = DSMapper.unmarshal(Record.class, jsonNode.toString());
|
Record record = DSMapper.unmarshal(Record.class, jsonNode.toString());
|
||||||
Class<? extends Record> clz = record.getClass();
|
Class<? extends Record> clz = record.getClass();
|
||||||
|
@ -517,6 +540,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception {
|
public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception {
|
||||||
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
||||||
|
|
||||||
|
@ -548,4 +572,81 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
return resultSet;
|
return resultSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getEstimatedRecordRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception {
|
||||||
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
||||||
|
|
||||||
|
String tableName = RecordToDBFields.getKey(aggregationInfo.getRecordType());
|
||||||
|
|
||||||
|
String startTimeColumnName = RecordToDBFields.getKey(AggregatedUsageRecord.START_TIME);
|
||||||
|
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
stringBuffer.append("SELECT COUNT(*)");
|
||||||
|
stringBuffer.append(" FROM ");
|
||||||
|
stringBuffer.append(tableName);
|
||||||
|
stringBuffer.append(" WHERE ");
|
||||||
|
stringBuffer.append(startTimeColumnName);
|
||||||
|
stringBuffer.append(" >= ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate()));
|
||||||
|
stringBuffer.append(" AND ");
|
||||||
|
stringBuffer.append(startTimeColumnName);
|
||||||
|
stringBuffer.append(" < ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate()));
|
||||||
|
|
||||||
|
Connection connection = getConnection();
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
|
||||||
|
String sqlQuery = stringBuffer.toString();
|
||||||
|
|
||||||
|
logger.trace("Going to request the following query: {}", sqlQuery);
|
||||||
|
ResultSet resultSet = statement.executeQuery(sqlQuery);
|
||||||
|
|
||||||
|
resultSet.next();
|
||||||
|
return resultSet.getInt(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isBulkDeleteAllowed() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteRecords(ArrayNode array) throws UnsupportedOperationException, Exception {
|
||||||
|
if(array.size()<1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Record record = DSMapper.unmarshal(Record.class, array.get(0).toString());
|
||||||
|
Class<? extends Record> clz = record.getClass();
|
||||||
|
String type = RecordToDBMapping.getRecordTypeByClass(clz);
|
||||||
|
String tableName = RecordToDBFields.getKey(type);
|
||||||
|
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
stringBuffer.append("DELETE ");
|
||||||
|
stringBuffer.append("FROM ");
|
||||||
|
stringBuffer.append(tableName);
|
||||||
|
stringBuffer.append(" WHERE ");
|
||||||
|
String id = record.getId();
|
||||||
|
stringBuffer.append("id = ");
|
||||||
|
stringBuffer.append(getValue(id));
|
||||||
|
|
||||||
|
for(int i=1; i<array.size(); i++) {
|
||||||
|
stringBuffer.append(" OR ");
|
||||||
|
id = array.get(i).get(Record.ID).asText();
|
||||||
|
stringBuffer.append("id = ");
|
||||||
|
stringBuffer.append(getValue(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Connection connection = getConnection();
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
|
||||||
|
String sqlCommand = stringBuffer.toString();
|
||||||
|
logger.trace("Going to execute {}", sqlCommand);
|
||||||
|
statement.execute(sqlCommand);
|
||||||
|
|
||||||
|
statement.close();
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
public class PostgreSQLConnectorDst extends PostgreSQLConnector implements AggregatorPersistenceDst {
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorDst() throws Exception {
|
||||||
|
super(AggregatorPersistenceDst.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc {
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorSrc() throws Exception {
|
||||||
|
super(AggregatorPersistenceSrc.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceStatus {
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorStatus() throws Exception {
|
||||||
|
super(AggregatorPersistenceSrc.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorStatus(Class<?> clazz) throws Exception {
|
||||||
|
super(clazz);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst {
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorStatusDst() throws Exception {
|
||||||
|
super(AggregatorPersistenceDst.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI-CNR)
|
||||||
|
*/
|
||||||
|
class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc {
|
||||||
|
|
||||||
|
protected PostgreSQLConnectorStatusSrc() throws Exception {
|
||||||
|
super(AggregatorPersistenceSrc.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -61,6 +61,20 @@ public class AccountingAggregatorPlugin extends Plugin {
|
||||||
public static final String FORCE_RERUN = "forceRerun";
|
public static final String FORCE_RERUN = "forceRerun";
|
||||||
public static final String FORCE_RESTART = "forceRestart";
|
public static final String FORCE_RESTART = "forceRestart";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This parameter is used to skip aggregation. Default is false.
|
||||||
|
* It is used mainly to move the data from a db/table to another
|
||||||
|
* when the data has been already aggregated and any attempt to aggregate again
|
||||||
|
* does not have any effect.
|
||||||
|
* Please note that aggregation could take a lot of time when the amount of
|
||||||
|
* not aggregable records is more than 50.000/60.000.
|
||||||
|
* The reason is simple. Any records must be compared with the previous 50.000
|
||||||
|
* before deciding that is not aggregable and to be inserted in the list.
|
||||||
|
* Furthermore it requires a lot of memory.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public static final String SKIP_AGGREGATION = "skipAggregation";
|
||||||
|
|
||||||
public enum ElaborationType {
|
public enum ElaborationType {
|
||||||
AGGREGATE, // Aggregate
|
AGGREGATE, // Aggregate
|
||||||
RECOVERY // Recover unterminated executions
|
RECOVERY // Recover unterminated executions
|
||||||
|
@ -128,6 +142,8 @@ public class AccountingAggregatorPlugin extends Plugin {
|
||||||
boolean forceRerun = false;
|
boolean forceRerun = false;
|
||||||
boolean forceRestart = false;
|
boolean forceRestart = false;
|
||||||
|
|
||||||
|
boolean skipAggregation = false;
|
||||||
|
|
||||||
if (inputs == null || inputs.isEmpty()) {
|
if (inputs == null || inputs.isEmpty()) {
|
||||||
throw new IllegalArgumentException("The can only be launched providing valid input parameters");
|
throw new IllegalArgumentException("The can only be launched providing valid input parameters");
|
||||||
}
|
}
|
||||||
|
@ -187,10 +203,16 @@ public class AccountingAggregatorPlugin extends Plugin {
|
||||||
throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'.");
|
throw new IllegalArgumentException("Aggregation Start Date cannot be found. Please provide it as parameter or set '" + RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER + "' input parameter to 'true'.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(inputs.containsKey(SKIP_AGGREGATION)) {
|
||||||
|
skipAggregation = (boolean) inputs.get(SKIP_AGGREGATION);
|
||||||
|
}
|
||||||
|
|
||||||
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
|
AggregatorManager aggregatorManager = new AggregatorManager(aggregationType, restartFromLastAggregationDate, aggregationStartDate, aggregationEndDate);
|
||||||
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
|
aggregatorManager.setForceEarlyAggregation(forceEarlyAggregation);
|
||||||
aggregatorManager.setForceRerun(forceRerun);
|
aggregatorManager.setForceRerun(forceRerun);
|
||||||
aggregatorManager.setForceRestart(forceRestart);
|
aggregatorManager.setForceRestart(forceRestart);
|
||||||
|
aggregatorManager.setSkipAggregation(skipAggregation);
|
||||||
|
|
||||||
// aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType);
|
// aggregatorManager.elaborate(persistStartTime, persistEndTime, recordType);
|
||||||
aggregatorManager.elaborate(recordType);
|
aggregatorManager.elaborate(recordType);
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import java.util.UUID;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
|
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
|
||||||
import org.gcube.accounting.aggregator.utility.Constant;
|
import org.gcube.accounting.aggregator.utility.Constant;
|
||||||
import org.gcube.accounting.aggregator.utility.Utility;
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
|
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
|
||||||
|
@ -43,19 +43,22 @@ public class AggregationStatus {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
protected float percentage;
|
protected float percentage;
|
||||||
|
|
||||||
@JsonProperty(required=false)
|
@JsonProperty(required = false)
|
||||||
protected String context;
|
protected String context;
|
||||||
|
|
||||||
@JsonProperty(required=false)
|
@JsonProperty
|
||||||
|
protected int restartFrom;
|
||||||
|
|
||||||
|
@JsonProperty(required = false)
|
||||||
protected AggregationStatus previous;
|
protected AggregationStatus previous;
|
||||||
|
|
||||||
// Last observed status
|
// Last observed status
|
||||||
@JsonFormat(shape= JsonFormat.Shape.STRING)
|
@JsonFormat(shape = JsonFormat.Shape.STRING)
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
protected AggregationState aggregationState;
|
protected AggregationState aggregationState;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@JsonFormat(shape= JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN)
|
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN)
|
||||||
protected Calendar lastUpdateTime;
|
protected Calendar lastUpdateTime;
|
||||||
|
|
||||||
// List of Status Event Changes
|
// List of Status Event Changes
|
||||||
|
@ -63,18 +66,25 @@ public class AggregationStatus {
|
||||||
protected List<AggregationStateEvent> aggregationStateEvents;
|
protected List<AggregationStateEvent> aggregationStateEvents;
|
||||||
|
|
||||||
// Needed for Jackon Unmarshalling
|
// Needed for Jackon Unmarshalling
|
||||||
protected AggregationStatus(){}
|
protected AggregationStatus() {
|
||||||
|
|
||||||
public static AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
|
||||||
return PostgreSQLConnector.getPostgreSQLConnector().getLast(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, boolean forceEarlyAggregation) throws Exception{
|
public static AggregationStatus getLast(String recordType, AggregationType aggregationType,
|
||||||
return PostgreSQLConnector.getPostgreSQLConnector().getUnterminated(recordType, aggregationType, null, null, forceEarlyAggregation);
|
Date aggregationStartDate, Date aggregationEndDate) throws Exception {
|
||||||
|
return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getLast(recordType, aggregationType,
|
||||||
|
aggregationStartDate, aggregationEndDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
public static List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType,
|
||||||
return PostgreSQLConnector.getPostgreSQLConnector().getAggregationStatus(recordType, aggregationType, aggregationStartDate);
|
boolean forceEarlyAggregation) throws Exception {
|
||||||
|
return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getUnterminated(recordType,
|
||||||
|
aggregationType, null, null, forceEarlyAggregation);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType,
|
||||||
|
Date aggregationStartDate) throws Exception {
|
||||||
|
return AggregatorPersistenceFactory.getAggregatorPersistenceStatus().getAggregationStatus(recordType,
|
||||||
|
aggregationType, aggregationStartDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationStatus(AggregationInfo aggregationInfo) throws Exception {
|
public AggregationStatus(AggregationInfo aggregationInfo) throws Exception {
|
||||||
|
@ -83,6 +93,7 @@ public class AggregationStatus {
|
||||||
this.uuid = UUID.randomUUID();
|
this.uuid = UUID.randomUUID();
|
||||||
this.malformedRecordNumber = 0;
|
this.malformedRecordNumber = 0;
|
||||||
this.previous = null;
|
this.previous = null;
|
||||||
|
this.restartFrom = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
public AggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
||||||
|
@ -91,38 +102,43 @@ public class AggregationStatus {
|
||||||
this.uuid = aggregationStatus.getUUID();
|
this.uuid = aggregationStatus.getUUID();
|
||||||
this.malformedRecordNumber = 0;
|
this.malformedRecordNumber = 0;
|
||||||
this.previous = aggregationStatus;
|
this.previous = aggregationStatus;
|
||||||
|
this.restartFrom = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationInfo getAggregationInfo() {
|
public AggregationInfo getAggregationInfo() {
|
||||||
return aggregationInfo;
|
return aggregationInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync) throws Exception {
|
public synchronized void setAggregationState(AggregationState aggregationState, Calendar startTime, boolean sync)
|
||||||
|
throws Exception {
|
||||||
Calendar endTime = Utility.getUTCCalendarInstance();
|
Calendar endTime = Utility.getUTCCalendarInstance();
|
||||||
|
|
||||||
logger.info("Going to Set {} for {} to {}. StartTime {}, EndTime {} [Duration : {}]",
|
logger.info("Going to Set {} for {} to {}. StartTime {}, EndTime {} [Duration : {}]",
|
||||||
AggregationState.class.getSimpleName(),
|
AggregationState.class.getSimpleName(), aggregationInfo, aggregationState.name(),
|
||||||
aggregationInfo, aggregationState.name(),
|
|
||||||
Constant.DEFAULT_DATE_FORMAT.format(startTime.getTime()),
|
Constant.DEFAULT_DATE_FORMAT.format(startTime.getTime()),
|
||||||
Constant.DEFAULT_DATE_FORMAT.format(endTime.getTime()),
|
Constant.DEFAULT_DATE_FORMAT.format(endTime.getTime()),
|
||||||
Utility.getHumanReadableDuration(endTime.getTimeInMillis() - startTime.getTimeInMillis()));
|
Utility.getHumanReadableDuration(endTime.getTimeInMillis() - startTime.getTimeInMillis()));
|
||||||
|
|
||||||
this.aggregationState = aggregationState;
|
this.aggregationState = aggregationState;
|
||||||
this.lastUpdateTime = endTime;
|
this.lastUpdateTime = endTime;
|
||||||
|
this.restartFrom = 0;
|
||||||
|
|
||||||
|
|
||||||
AggregationStateEvent aggregationStatusEvent = new AggregationStateEvent(aggregationState, startTime, endTime);
|
AggregationStateEvent aggregationStatusEvent = new AggregationStateEvent(aggregationState, startTime, endTime);
|
||||||
aggregationStateEvents.add(aggregationStatusEvent);
|
aggregationStateEvents.add(aggregationStatusEvent);
|
||||||
|
|
||||||
if(sync){
|
if (sync) {
|
||||||
PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this);
|
AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRecordNumbers(int originalRecordsNumber, int aggregatedRecordsNumber, int malformedRecordNumber) {
|
public void setRecordNumbers(int originalRecordsNumber, int aggregatedRecordsNumber, int malformedRecordNumber) {
|
||||||
this.recoveredRecordNumber = originalRecordsNumber - aggregatedRecordsNumber;
|
this.recoveredRecordNumber = originalRecordsNumber - aggregatedRecordsNumber;
|
||||||
this.percentage = originalRecordsNumber!=0 ? (100 * recoveredRecordNumber) / originalRecordsNumber : 0;
|
this.percentage = originalRecordsNumber != 0 ? (100 * recoveredRecordNumber) / originalRecordsNumber : 0;
|
||||||
logger.info("Original records are {} ({} were malformed). Aggregated records are {}. Difference {}. We recover {}% of Documents",
|
logger.info(
|
||||||
originalRecordsNumber, malformedRecordNumber, aggregatedRecordsNumber, recoveredRecordNumber, percentage);
|
"Original records are {} ({} were malformed). Aggregated records are {}. Difference {}. We recover {}% of Documents",
|
||||||
|
originalRecordsNumber, malformedRecordNumber, aggregatedRecordsNumber, recoveredRecordNumber,
|
||||||
|
percentage);
|
||||||
this.malformedRecordNumber = malformedRecordNumber;
|
this.malformedRecordNumber = malformedRecordNumber;
|
||||||
this.originalRecordsNumber = originalRecordsNumber;
|
this.originalRecordsNumber = originalRecordsNumber;
|
||||||
this.aggregatedRecordsNumber = aggregatedRecordsNumber;
|
this.aggregatedRecordsNumber = aggregatedRecordsNumber;
|
||||||
|
@ -132,7 +148,6 @@ public class AggregationStatus {
|
||||||
return uuid;
|
return uuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void setUUID(UUID uuid) {
|
public void setUUID(UUID uuid) {
|
||||||
this.uuid = uuid;
|
this.uuid = uuid;
|
||||||
}
|
}
|
||||||
|
@ -177,6 +192,18 @@ public class AggregationStatus {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getRestartFrom() {
|
||||||
|
return restartFrom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRestartFrom(int restartFrom, boolean sync) throws Exception {
|
||||||
|
this.restartFrom = restartFrom;
|
||||||
|
if (sync) {
|
||||||
|
AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public AggregationStatus getPrevious() {
|
public AggregationStatus getPrevious() {
|
||||||
return previous;
|
return previous;
|
||||||
}
|
}
|
||||||
|
@ -199,8 +226,8 @@ public class AggregationStatus {
|
||||||
|
|
||||||
public void updateLastUpdateTime(boolean sync) throws Exception {
|
public void updateLastUpdateTime(boolean sync) throws Exception {
|
||||||
this.lastUpdateTime = Utility.getUTCCalendarInstance();
|
this.lastUpdateTime = Utility.getUTCCalendarInstance();
|
||||||
if(sync){
|
if (sync) {
|
||||||
PostgreSQLConnector.getPostgreSQLConnector().upsertAggregationStatus(this);
|
AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,12 +14,7 @@ import java.util.TimeZone;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
|
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
|
||||||
import org.gcube.common.authorization.client.Constants;
|
import org.gcube.common.authorization.utils.manager.SecretManagerProvider;
|
||||||
import org.gcube.common.authorization.library.AuthorizationEntry;
|
|
||||||
import org.gcube.common.authorization.library.provider.AuthorizationProvider;
|
|
||||||
import org.gcube.common.authorization.library.provider.ClientInfo;
|
|
||||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
|
||||||
import org.gcube.common.authorization.library.utils.Caller;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -31,8 +26,7 @@ public class Utility {
|
||||||
private static Logger logger = LoggerFactory.getLogger(Utility.class);
|
private static Logger logger = LoggerFactory.getLogger(Utility.class);
|
||||||
|
|
||||||
public static String getCurrentContext() throws Exception {
|
public static String getCurrentContext() throws Exception {
|
||||||
String token = SecurityTokenProvider.instance.get();
|
return SecretManagerProvider.instance.get().getContext();
|
||||||
return Constants.authorizationService().get(token).getContext();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -170,29 +164,8 @@ public class Utility {
|
||||||
return aggregationEndDate.getTime();
|
return aggregationEndDate.getTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static ClientInfo getClientInfo() throws Exception {
|
public static String getUsername() {
|
||||||
Caller caller = AuthorizationProvider.instance.get();
|
return SecretManagerProvider.instance.get().getUser().getUsername();
|
||||||
if(caller!=null){
|
|
||||||
return caller.getClient();
|
|
||||||
}else{
|
|
||||||
String token = SecurityTokenProvider.instance.get();
|
|
||||||
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
|
|
||||||
return authorizationEntry.getClientInfo();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getUsername() throws Exception{
|
|
||||||
try {
|
|
||||||
ClientInfo clientInfo = getClientInfo();
|
|
||||||
String clientId = clientInfo.getId();
|
|
||||||
if (clientId != null && clientId.compareTo("") != 0) {
|
|
||||||
return clientId;
|
|
||||||
}
|
|
||||||
throw new Exception("Username null or empty");
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Unable to retrieve user.");
|
|
||||||
throw new Exception("Unable to retrieve user.", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,14 +7,14 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.gcube.common.authorization.client.Constants;
|
import org.gcube.common.authorization.utils.manager.SecretManager;
|
||||||
import org.gcube.common.authorization.client.exceptions.ObjectNotFound;
|
import org.gcube.common.authorization.utils.manager.SecretManagerProvider;
|
||||||
import org.gcube.common.authorization.library.AuthorizationEntry;
|
import org.gcube.common.authorization.utils.secret.JWTSecret;
|
||||||
import org.gcube.common.authorization.library.provider.AuthorizationProvider;
|
import org.gcube.common.authorization.utils.secret.Secret;
|
||||||
import org.gcube.common.authorization.library.provider.ClientInfo;
|
import org.gcube.common.authorization.utils.secret.SecretUtility;
|
||||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
import org.gcube.common.keycloak.KeycloakClientFactory;
|
||||||
import org.gcube.common.authorization.library.utils.Caller;
|
import org.gcube.common.keycloak.KeycloakClientHelper;
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
import org.gcube.common.keycloak.model.TokenResponse;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -22,82 +22,154 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Luca Frosini (ISTI - CNR)
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class ContextTest {
|
public class ContextTest {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ContextTest.class);
|
private static final Logger logger = LoggerFactory.getLogger(ContextTest.class);
|
||||||
|
|
||||||
protected static Properties properties;
|
protected static final String CONFIG_INI_FILENAME = "config.ini";
|
||||||
protected static final String PROPERTIES_FILENAME = "token.properties";
|
|
||||||
|
|
||||||
public static final String ROOT_DEV_SCOPE;
|
public static final String DEFAULT_TEST_SCOPE;
|
||||||
public static final String VO_DEFAULT_TEST_SCOPE;
|
|
||||||
public static final String VRE_DEFAULT_TEST_SCOPE;
|
|
||||||
|
|
||||||
public static final String VO_DEFAULT_TEST_SCOPE_ANOTHER_USER;
|
public static final String GCUBE;
|
||||||
|
public static final String DEVNEXT;
|
||||||
|
public static final String NEXTNEXT;
|
||||||
|
public static final String DEVSEC;
|
||||||
|
public static final String DEVVRE;
|
||||||
|
|
||||||
|
private static final String ROOT_PROD;
|
||||||
|
private static final String ROOT_PRE;
|
||||||
|
|
||||||
|
protected static final Properties properties;
|
||||||
|
|
||||||
|
public static final String TYPE_PROPERTY_KEY = "type";
|
||||||
|
public static final String USERNAME_PROPERTY_KEY = "username";
|
||||||
|
public static final String PASSWORD_PROPERTY_KEY = "password";
|
||||||
|
public static final String CLIENT_ID_PROPERTY_KEY = "clientId";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
properties = new Properties();
|
GCUBE = "/gcube";
|
||||||
InputStream input = ContextTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME);
|
DEVNEXT = GCUBE + "/devNext";
|
||||||
|
NEXTNEXT = DEVNEXT + "/NextNext";
|
||||||
|
DEVSEC = GCUBE + "/devsec";
|
||||||
|
DEVVRE = DEVSEC + "/devVRE";
|
||||||
|
|
||||||
|
ROOT_PROD = "/d4science.research-infrastructures.eu";
|
||||||
|
ROOT_PRE = "/pred4s";
|
||||||
|
|
||||||
|
DEFAULT_TEST_SCOPE = GCUBE;
|
||||||
|
|
||||||
|
|
||||||
|
properties = new Properties();
|
||||||
|
InputStream input = ContextTest.class.getClassLoader().getResourceAsStream(CONFIG_INI_FILENAME);
|
||||||
try {
|
try {
|
||||||
// load the properties file
|
// load the properties file
|
||||||
properties.load(input);
|
properties.load(input);
|
||||||
} catch(IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// PARENT_DEFAULT_TEST_SCOPE = "/pred4s"
|
}
|
||||||
// DEFAULT_TEST_SCOPE_NAME = PARENT_DEFAULT_TEST_SCOPE + "/preprod";
|
|
||||||
// ALTERNATIVE_TEST_SCOPE = DEFAULT_TEST_SCOPE_NAME + "/preVRE";
|
private enum Type{
|
||||||
|
USER, CLIENT_ID
|
||||||
|
};
|
||||||
|
|
||||||
|
public static void set(Secret secret) throws Exception {
|
||||||
|
SecretManagerProvider.instance.reset();
|
||||||
|
SecretManager secretManager = new SecretManager();
|
||||||
|
secretManager.addSecret(secret);
|
||||||
|
SecretManagerProvider.instance.set(secretManager);
|
||||||
|
SecretManagerProvider.instance.get().set();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setContextByName(String fullContextName) throws Exception {
|
||||||
|
logger.debug("Going to set credentials for context {}", fullContextName);
|
||||||
|
Secret secret = getSecretByContextName(fullContextName);
|
||||||
|
set(secret);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
ROOT_DEV_SCOPE = "/gcube";
|
private static TokenResponse getJWTAccessToken(String context) throws Exception {
|
||||||
VO_DEFAULT_TEST_SCOPE = ROOT_DEV_SCOPE + "/devNext";
|
Type type = Type.valueOf(properties.get(TYPE_PROPERTY_KEY).toString());
|
||||||
VRE_DEFAULT_TEST_SCOPE = VO_DEFAULT_TEST_SCOPE + "/NextNext";
|
|
||||||
|
|
||||||
VO_DEFAULT_TEST_SCOPE_ANOTHER_USER = "lucio.lelii_" + VO_DEFAULT_TEST_SCOPE;
|
TokenResponse tr = null;
|
||||||
|
|
||||||
|
int index = context.indexOf('/', 1);
|
||||||
|
String root = context.substring(0, index == -1 ? context.length() : index);
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case CLIENT_ID:
|
||||||
|
String clientId = properties.getProperty(CLIENT_ID_PROPERTY_KEY);
|
||||||
|
String clientSecret = properties.getProperty(root);
|
||||||
|
|
||||||
|
tr = KeycloakClientFactory.newInstance().queryUMAToken(context, clientId, clientSecret, context, null);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case USER:
|
||||||
|
default:
|
||||||
|
String username = properties.getProperty(USERNAME_PROPERTY_KEY);
|
||||||
|
String password = properties.getProperty(PASSWORD_PROPERTY_KEY);
|
||||||
|
|
||||||
|
switch (root) {
|
||||||
|
case "/gcube":
|
||||||
|
default:
|
||||||
|
clientId = "next.d4science.org";
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "/pred4s":
|
||||||
|
clientId = "pre.d4science.org";
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "/d4science.research-infrastructures.eu":
|
||||||
|
clientId = "services.d4science.org";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
clientSecret = null;
|
||||||
|
|
||||||
|
tr = KeycloakClientHelper.getTokenForUser(context, username, password);
|
||||||
|
break;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return tr;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Secret getSecretByContextName(String context) throws Exception {
|
||||||
|
TokenResponse tr = getJWTAccessToken(context);
|
||||||
|
Secret secret = new JWTSecret(tr.getAccessToken());
|
||||||
|
return secret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setContext(String token) throws Exception {
|
||||||
|
Secret secret = getSecret(token);
|
||||||
|
set(secret);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Secret getSecret(String token) throws Exception {
|
||||||
|
Secret secret = SecretUtility.getSecretByTokenString(token);
|
||||||
|
return secret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getUser() {
|
||||||
|
String user = "UNKNOWN";
|
||||||
try {
|
try {
|
||||||
setContextByName(VO_DEFAULT_TEST_SCOPE);
|
user = SecretManagerProvider.instance.get().getUser().getUsername();
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
throw new RuntimeException(e);
|
logger.error("Unable to retrieve user. {} will be used", user);
|
||||||
}
|
}
|
||||||
}
|
return user;
|
||||||
|
|
||||||
public static String getCurrentScope(String token) throws ObjectNotFound, Exception {
|
|
||||||
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
|
|
||||||
String context = authorizationEntry.getContext();
|
|
||||||
logger.info("Context of token {} is {}", token, context);
|
|
||||||
return context;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void setContextByName(String fullContextName) throws ObjectNotFound, Exception {
|
|
||||||
String token = ContextTest.properties.getProperty(fullContextName);
|
|
||||||
setContext(token);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setContext(String token) throws ObjectNotFound, Exception {
|
|
||||||
SecurityTokenProvider.instance.set(token);
|
|
||||||
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
|
|
||||||
ClientInfo clientInfo = authorizationEntry.getClientInfo();
|
|
||||||
logger.debug("User : {} - Type : {}", clientInfo.getId(), clientInfo.getType().name());
|
|
||||||
String qualifier = authorizationEntry.getQualifier();
|
|
||||||
Caller caller = new Caller(clientInfo, qualifier);
|
|
||||||
AuthorizationProvider.instance.set(caller);
|
|
||||||
ScopeProvider.instance.set(getCurrentScope(token));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
setContextByName(VO_DEFAULT_TEST_SCOPE);
|
setContextByName(DEFAULT_TEST_SCOPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
SecurityTokenProvider.instance.reset();
|
SecretManagerProvider.instance.reset();
|
||||||
ScopeProvider.instance.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,129 @@
|
||||||
|
package org.gcube.accounting.aggregator.file;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregator.ContextTest;
|
||||||
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
|
import org.gcube.accounting.aggregator.directory.FileSystemDirectoryStructure;
|
||||||
|
import org.gcube.accounting.aggregator.elaboration.Elaborator;
|
||||||
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
|
import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord;
|
||||||
|
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
|
import org.gcube.documentstore.records.Record;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Luca Frosini (ISTI - CNR)
|
||||||
|
* This class has been used to eliminates duplicates from a file
|
||||||
|
* because the aggregation was started concurrently twice.
|
||||||
|
*/
|
||||||
|
public class EliminateDuplicates extends ContextTest {
|
||||||
|
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(EliminateDuplicates.class);
|
||||||
|
|
||||||
|
public final static String ORIGINAL_NO_DUPLICATES_SUFFIX = ".original-no-duplicates.json";
|
||||||
|
|
||||||
|
protected AggregationInfo aggregationInfo;
|
||||||
|
protected File originalRecordsbackupFile;
|
||||||
|
protected File noDuplicatesRecordsbackupFile;
|
||||||
|
|
||||||
|
protected int readLines;
|
||||||
|
protected int discardedLines;
|
||||||
|
protected int uniqueLines;
|
||||||
|
protected Set<String> ids;
|
||||||
|
|
||||||
|
protected File getAggregatedRecordsBackupFile() throws Exception {
|
||||||
|
File aggregateRecordsBackupFile = new File(originalRecordsbackupFile.getParentFile(),
|
||||||
|
originalRecordsbackupFile.getName().replace(Elaborator.ORIGINAL_SUFFIX, ORIGINAL_NO_DUPLICATES_SUFFIX));
|
||||||
|
return aggregateRecordsBackupFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void eliminateDuplicates() throws Exception {
|
||||||
|
ids = new HashSet<>();
|
||||||
|
String recordType = StorageStatusRecord.class.getSimpleName();
|
||||||
|
AggregationType aggregationType = AggregationType.MONTHLY;
|
||||||
|
Calendar start = Utility.getAggregationStartCalendar(2022, Calendar.JANUARY, 1);
|
||||||
|
Calendar end = Utility.getEndCalendarFromStartCalendar(aggregationType, start, 1);
|
||||||
|
Date aggregationStartDate = start.getTime();
|
||||||
|
Date aggregationEndDate = end.getTime();
|
||||||
|
aggregationInfo = new AggregationInfo(recordType, aggregationType, aggregationStartDate, aggregationEndDate);
|
||||||
|
FileSystemDirectoryStructure fileSystemDirectoryStructure = new FileSystemDirectoryStructure();
|
||||||
|
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
|
||||||
|
originalRecordsbackupFile = Elaborator.getOriginalRecordsBackupFile(elaborationDirectory, aggregationInfo);
|
||||||
|
noDuplicatesRecordsbackupFile = getAggregatedRecordsBackupFile();
|
||||||
|
noDuplicatesRecordsbackupFile.delete();
|
||||||
|
// readFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void elaborateLine(String line) throws Exception {
|
||||||
|
JsonNode jsonNode = DSMapper.asJsonNode(line);
|
||||||
|
String id = jsonNode.get(Record.ID).asText();
|
||||||
|
if(!ids.contains(id)) {
|
||||||
|
ids.add(id);
|
||||||
|
++uniqueLines;
|
||||||
|
Utility.printLine(noDuplicatesRecordsbackupFile, line);
|
||||||
|
}else {
|
||||||
|
logger.trace("Record with id {} was already found, it will be discarded.", id);
|
||||||
|
++discardedLines;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void readFile() throws Exception {
|
||||||
|
FileInputStream fstream = null;
|
||||||
|
DataInputStream in = null;
|
||||||
|
BufferedReader br = null;
|
||||||
|
try {
|
||||||
|
// Open the file that is the first // command line parameter
|
||||||
|
fstream = new FileInputStream(originalRecordsbackupFile);
|
||||||
|
// Get the object of DataInputStream
|
||||||
|
in = new DataInputStream(fstream);
|
||||||
|
br = new BufferedReader(new InputStreamReader(in));
|
||||||
|
|
||||||
|
readLines = 0;
|
||||||
|
discardedLines = 0;
|
||||||
|
uniqueLines = 0;
|
||||||
|
|
||||||
|
String line;
|
||||||
|
// Read File Line By Line
|
||||||
|
while((line = br.readLine()) != null) {
|
||||||
|
elaborateLine(line);
|
||||||
|
++readLines;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Original records are {}. Unique records are {}. Discarded duplicates records are {}", readLines, uniqueLines, discardedLines);
|
||||||
|
|
||||||
|
Assert.assertTrue(readLines == (uniqueLines+discardedLines));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if(br != null) {
|
||||||
|
br.close();
|
||||||
|
}
|
||||||
|
if(in != null) {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
if(fstream != null) {
|
||||||
|
fstream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.accounting.aggregator.utility.Utility;
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
||||||
import org.gcube.documentstore.records.DSMapper;
|
import org.gcube.documentstore.records.DSMapper;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -21,21 +22,36 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(PostgreSQLConnectorTest.class);
|
private static Logger logger = LoggerFactory.getLogger(PostgreSQLConnectorTest.class);
|
||||||
|
|
||||||
protected PostgreSQLConnector postgreSQLConnector;
|
protected AggregatorPersistenceStatus aggregatorPersistenceStatus;
|
||||||
|
|
||||||
public PostgreSQLConnectorTest() throws Exception {
|
public PostgreSQLConnectorTest() throws Exception {
|
||||||
postgreSQLConnector = PostgreSQLConnector.getPostgreSQLConnector();
|
aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus();
|
||||||
|
logger.info("{}", aggregatorPersistenceStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Ignore
|
||||||
|
@Test
|
||||||
|
public void getAggregatorPersistenceDst() throws Exception {
|
||||||
|
AggregatorPersistenceDst dst = AggregatorPersistenceFactory.getAggregatorPersistenceDst();
|
||||||
|
logger.debug("{}", dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Ignore
|
||||||
|
@Test
|
||||||
|
public void getAggregatorPersistenceSrc() throws Exception {
|
||||||
|
AggregatorPersistenceSrc src = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
|
||||||
|
logger.debug("{}", src);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getLastTest() throws Exception {
|
public void getLastTest() throws Exception {
|
||||||
AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
|
AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
|
||||||
logger.debug("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getUnterminatedTest() throws Exception{
|
public void getUnterminatedTest() throws Exception{
|
||||||
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true);
|
List<AggregationStatus> aggregationStatuses = aggregatorPersistenceStatus.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null, true);
|
||||||
for(AggregationStatus aggregationStatus : aggregationStatuses){
|
for(AggregationStatus aggregationStatus : aggregationStatuses){
|
||||||
logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.debug("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
}
|
}
|
||||||
|
@ -43,10 +59,10 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getLastTestWithConstraint() throws Exception {
|
public void getLastTestWithConstraint() throws Exception {
|
||||||
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
|
Calendar aggregationStart = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 02);
|
||||||
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31);
|
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 03);
|
||||||
|
|
||||||
AggregationStatus aggregationStatus = postgreSQLConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
|
AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime());
|
||||||
logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.info("Last : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,7 +71,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1);
|
Calendar aggregationStart = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 1);
|
||||||
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30);
|
Calendar aggregationEnd = Utility.getAggregationStartCalendar(2017, Calendar.APRIL, 30);
|
||||||
|
|
||||||
List<AggregationStatus> aggregationStatuses = postgreSQLConnector.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true);
|
List<AggregationStatus> aggregationStatuses = aggregatorPersistenceStatus.getUnterminated(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStart.getTime(), aggregationEnd.getTime(), true);
|
||||||
for(AggregationStatus aggregationStatus : aggregationStatuses){
|
for(AggregationStatus aggregationStatus : aggregationStatuses){
|
||||||
logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.info("Unterminated : {}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
}
|
}
|
||||||
|
@ -63,12 +79,12 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getAggregationStatusTest() throws Exception{
|
public void getAggregationStatusTest() throws Exception{
|
||||||
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);
|
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 02);
|
||||||
AggregationStatus aggregationStatus = postgreSQLConnector.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime());
|
AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getAggregationStatus(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, aggregationStartCalendar.getTime());
|
||||||
logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.info("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
//@Test
|
||||||
public void aggregationStatusTest() throws Exception {
|
public void aggregationStatusTest() throws Exception {
|
||||||
int toRemove = -36;
|
int toRemove = -36;
|
||||||
|
|
||||||
|
@ -121,7 +137,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
//@Test
|
||||||
public void createStartedElaboration() throws Exception {
|
public void createStartedElaboration() throws Exception {
|
||||||
|
|
||||||
Calendar start = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);
|
Calendar start = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);
|
||||||
|
@ -152,4 +168,38 @@ public class PostgreSQLConnectorTest extends ContextTest {
|
||||||
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void analyseAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
||||||
|
if(aggregationStatus.getPrevious()!=null) {
|
||||||
|
analyseAggregationStatus(aggregationStatus.getPrevious());
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst();
|
||||||
|
|
||||||
|
if(aggregationStatus.getOriginalRecordsNumber()!=0) {
|
||||||
|
apsDst.upsertAggregationStatus(aggregationStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Ignore
|
||||||
|
@Test
|
||||||
|
public void moveStatusFromSrcToDst() throws Exception {
|
||||||
|
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
|
||||||
|
List<AggregationStatus> aggregationStatuses = apsSrc.getAll();
|
||||||
|
for(AggregationStatus aggregationStatus : aggregationStatuses) {
|
||||||
|
analyseAggregationStatus(aggregationStatus);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Ignore
|
||||||
|
@Test
|
||||||
|
public void testAggregatorPersistenceStatusSrcAndDst() throws Exception {
|
||||||
|
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
|
||||||
|
logger.debug("{}", apsSrc);
|
||||||
|
AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst();
|
||||||
|
logger.debug("{}", apsDst);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,9 +2,7 @@ package org.gcube.accounting.aggregator.plugin;
|
||||||
|
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.ContextTest;
|
import org.gcube.accounting.aggregator.ContextTest;
|
||||||
|
@ -15,7 +13,7 @@ import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord;
|
import org.gcube.accounting.datamodel.usagerecords.StorageStatusRecord;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
|
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
|
||||||
import org.gcube.com.fasterxml.jackson.annotation.JsonIgnore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -24,10 +22,8 @@ public class AccountingAggregatorPluginTest extends ContextTest {
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPluginTest.class);
|
private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPluginTest.class);
|
||||||
|
|
||||||
public static final String ROOT_PROD = "/d4science.research-infrastructures.eu";
|
|
||||||
|
|
||||||
private void aggregate(String recordType, AggregationType aggregationType, Calendar aggregationStartCalendar,
|
private void aggregate(String recordType, AggregationType aggregationType, Calendar aggregationStartCalendar,
|
||||||
Calendar aggregationEndCalendar) throws Exception {
|
Calendar aggregationEndCalendar, boolean forceRerun, boolean forceEarlyAggregation, boolean skipAggregation) throws Exception {
|
||||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
Map<String, Object> inputs = new HashMap<String, Object>();
|
||||||
|
|
||||||
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name());
|
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name());
|
||||||
|
@ -38,10 +34,12 @@ public class AccountingAggregatorPluginTest extends ContextTest {
|
||||||
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER,
|
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER,
|
||||||
aggregationStartCalendar == null);
|
aggregationStartCalendar == null);
|
||||||
|
|
||||||
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, false);
|
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, forceEarlyAggregation);
|
||||||
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, false);
|
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, forceRerun);
|
||||||
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false);
|
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false);
|
||||||
|
|
||||||
|
inputs.put(AccountingAggregatorPlugin.SKIP_AGGREGATION, skipAggregation);
|
||||||
|
|
||||||
if (aggregationStartCalendar != null) {
|
if (aggregationStartCalendar != null) {
|
||||||
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT
|
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT
|
||||||
.format(aggregationStartCalendar.getTime());
|
.format(aggregationStartCalendar.getTime());
|
||||||
|
@ -77,136 +75,110 @@ public class AccountingAggregatorPluginTest extends ContextTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@Ignore
|
||||||
@Test
|
@Test
|
||||||
public void aggregateJobUsageRecord() throws Exception {
|
public void aggregateAllServiceUsageRecord() throws Exception {
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
|
|
||||||
String recordType = JobUsageRecord.class.newInstance().getRecordType();
|
|
||||||
|
|
||||||
boolean allAgregationTypes = true;
|
|
||||||
|
|
||||||
if (!allAgregationTypes) {
|
|
||||||
AggregationType aggregationType = AggregationType.DAILY;
|
|
||||||
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10);
|
|
||||||
// Calendar aggregationEndCalendar =
|
|
||||||
// Utility.getEndCalendarFromStartCalendar(aggregationType,
|
|
||||||
// aggregationStartCalendar, 1);
|
|
||||||
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
|
|
||||||
|
|
||||||
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar);
|
|
||||||
} else {
|
|
||||||
for (AggregationType at : AggregationType.values()) {
|
|
||||||
aggregate(recordType, at, null, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnore
|
|
||||||
@Test
|
|
||||||
public void aggregateStorageStatusRecord() throws Exception {
|
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
|
|
||||||
String recordType = StorageStatusRecord.class.newInstance().getRecordType();
|
|
||||||
|
|
||||||
boolean allAgregationTypes = false;
|
|
||||||
|
|
||||||
if (!allAgregationTypes) {
|
|
||||||
AggregationType aggregationType = AggregationType.DAILY;
|
|
||||||
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10);
|
|
||||||
// Calendar aggregationEndCalendar =
|
|
||||||
// Utility.getEndCalendarFromStartCalendar(aggregationType,
|
|
||||||
// aggregationStartCalendar, 1);
|
|
||||||
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
|
|
||||||
|
|
||||||
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar);
|
|
||||||
} else {
|
|
||||||
for (AggregationType at : AggregationType.values()) {
|
|
||||||
aggregate(recordType, at, null, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnore
|
|
||||||
@Test
|
|
||||||
public void aggregateStorageUsageRecord() throws Exception {
|
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
|
|
||||||
String recordType = StorageUsageRecord.class.newInstance().getRecordType();
|
|
||||||
|
|
||||||
boolean allAgregationTypes = false;
|
|
||||||
|
|
||||||
if (!allAgregationTypes) {
|
|
||||||
AggregationType aggregationType = AggregationType.DAILY;
|
|
||||||
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10);
|
|
||||||
// Calendar aggregationEndCalendar =
|
|
||||||
// Utility.getEndCalendarFromStartCalendar(aggregationType,
|
|
||||||
// aggregationStartCalendar, 1);
|
|
||||||
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
|
|
||||||
|
|
||||||
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar);
|
|
||||||
} else {
|
|
||||||
for (AggregationType at : AggregationType.values()) {
|
|
||||||
aggregate(recordType, at, null, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnore
|
|
||||||
@Test
|
|
||||||
public void aggregateService() throws Exception {
|
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
|
|
||||||
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
|
String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
|
||||||
|
// aggregateOneShot(recordType);
|
||||||
boolean allAgregationTypes = true;
|
// aggregateEverything(recordType);
|
||||||
|
// restart(recordType);
|
||||||
if (!allAgregationTypes) {
|
|
||||||
AggregationType aggregationType = AggregationType.DAILY;
|
|
||||||
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10);
|
|
||||||
// Calendar aggregationEndCalendar =
|
|
||||||
// Utility.getEndCalendarFromStartCalendar(aggregationType,
|
|
||||||
// aggregationStartCalendar, 1);
|
|
||||||
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
|
|
||||||
|
|
||||||
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar);
|
|
||||||
} else {
|
|
||||||
for (AggregationType at : AggregationType.values()) {
|
|
||||||
aggregate(recordType, at, null, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@Ignore
|
||||||
@Test
|
@Test
|
||||||
public void aggregateAll() throws Exception {
|
public void aggregateAllStorageStatusRecord() throws Exception {
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
String recordType = StorageStatusRecord.class.newInstance().getRecordType();
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
// aggregateOneShot(recordType);
|
||||||
|
// aggregateEverything(recordType);
|
||||||
|
// restart(recordType);
|
||||||
|
}
|
||||||
|
|
||||||
Set<String> allRecordTypes = new HashSet<>();
|
@Ignore
|
||||||
allRecordTypes.add(ServiceUsageRecord.class.newInstance().getRecordType());
|
@Test
|
||||||
allRecordTypes.add(JobUsageRecord.class.newInstance().getRecordType());
|
public void aggregateAllJobUsageRecord() throws Exception {
|
||||||
allRecordTypes.add(StorageUsageRecord.class.newInstance().getRecordType());
|
String recordType = JobUsageRecord.class.newInstance().getRecordType();
|
||||||
allRecordTypes.add(StorageStatusRecord.class.newInstance().getRecordType());
|
// aggregateOneShot(recordType);
|
||||||
|
// aggregateEverything(recordType);
|
||||||
|
// restart(recordType);
|
||||||
|
}
|
||||||
|
|
||||||
for (AggregationType at : AggregationType.values()) {
|
@Ignore
|
||||||
for(String recordType : allRecordTypes) {
|
@Test
|
||||||
aggregate(recordType, at, null, null);
|
public void aggregateAllStorageUsageRecord() throws Exception {
|
||||||
}
|
String recordType = StorageUsageRecord.class.newInstance().getRecordType();
|
||||||
|
// aggregateOneShot(recordType);
|
||||||
|
// aggregateEverything(recordType);
|
||||||
|
// restart(recordType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void restart(String recordType) throws Exception {
|
||||||
|
boolean forceRestart = true;
|
||||||
|
boolean forceEarlyAggregation = true;
|
||||||
|
|
||||||
|
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 4);
|
||||||
|
AggregationType aggregationType = AggregationType.DAILY;
|
||||||
|
Calendar now = Calendar.getInstance();
|
||||||
|
Calendar end = Utility.getAggregationStartCalendar(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DAY_OF_MONTH)+1);
|
||||||
|
// end = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 1);
|
||||||
|
while (aggregationStartCalendar.before(end)) {
|
||||||
|
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
|
||||||
|
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
|
||||||
|
aggregationStartCalendar = Calendar.getInstance();
|
||||||
|
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void aggregateOneShot(String recordType) throws Exception {
|
||||||
|
boolean forceRestart = true;
|
||||||
|
boolean forceEarlyAggregation = true;
|
||||||
|
AggregationType aggregationType = AggregationType.DAILY;
|
||||||
|
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 29);
|
||||||
|
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
|
||||||
|
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void aggregateEverything(String recordType) throws Exception {
|
||||||
|
boolean forceRestart = true;
|
||||||
|
boolean forceEarlyAggregation = false;
|
||||||
|
|
||||||
|
AggregationType aggregationType = AggregationType.YEARLY;
|
||||||
|
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2016, Calendar.JANUARY, 1);
|
||||||
|
Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JANUARY, 1);
|
||||||
|
while (aggregationStartCalendar.before(end)) {
|
||||||
|
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
|
||||||
|
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
|
||||||
|
aggregationStartCalendar = Calendar.getInstance();
|
||||||
|
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregationType = AggregationType.MONTHLY;
|
||||||
|
aggregationStartCalendar = Calendar.getInstance();
|
||||||
|
aggregationStartCalendar.setTimeInMillis(end.getTimeInMillis());
|
||||||
|
end = Utility.getAggregationStartCalendar(2023, Calendar.DECEMBER, 1);
|
||||||
|
while (aggregationStartCalendar.before(end)) {
|
||||||
|
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
|
||||||
|
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation, false);
|
||||||
|
aggregationStartCalendar = Calendar.getInstance();
|
||||||
|
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregationType = AggregationType.DAILY;
|
||||||
|
aggregationStartCalendar = end;
|
||||||
|
Calendar now = Calendar.getInstance();
|
||||||
|
end = Utility.getAggregationStartCalendar(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DAY_OF_MONTH));
|
||||||
|
// end = Utility.getAggregationStartCalendar(2024, Calendar.MARCH, 1);
|
||||||
|
while (aggregationStartCalendar.before(end)) {
|
||||||
|
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
|
||||||
|
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true, false);
|
||||||
|
aggregationStartCalendar = Calendar.getInstance();
|
||||||
|
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@Ignore
|
||||||
@Test
|
@Test
|
||||||
public void testRecovery() throws Exception {
|
public void testRecovery() throws Exception {
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
// ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
|
|
||||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
Map<String, Object> inputs = new HashMap<String, Object>();
|
||||||
|
|
||||||
AggregationType aggregationType = AggregationType.DAILY;
|
AggregationType aggregationType = AggregationType.DAILY;
|
||||||
|
|
|
@ -6,7 +6,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.gcube.accounting.aggregator.ContextTest;
|
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
import org.gcube.accounting.aggregator.elaboration.Elaborator;
|
import org.gcube.accounting.aggregator.elaboration.Elaborator;
|
||||||
|
@ -268,12 +267,9 @@ public class MyTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final String ROOT_PROD = "/d4science.research-infrastructures.eu";
|
|
||||||
|
|
||||||
@Ignore
|
@Ignore
|
||||||
@Test
|
@Test
|
||||||
public void testStorageHub() throws Exception {
|
public void testStorageHub() throws Exception {
|
||||||
ContextTest.setContextByName(ROOT_PROD);
|
|
||||||
FolderContainer destinationFolder = null;
|
FolderContainer destinationFolder = null;
|
||||||
StorageHubClient storageHubClient = new StorageHubClient();
|
StorageHubClient storageHubClient = new StorageHubClient();
|
||||||
FolderContainer parent = storageHubClient.getWSRoot();
|
FolderContainer parent = storageHubClient.getWSRoot();
|
||||||
|
|
|
@ -9,9 +9,6 @@ public class WorkSpaceManagementTest extends ContextTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreatedirectory() throws Exception {
|
public void testCreatedirectory() throws Exception {
|
||||||
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
|
||||||
//ContextTest.setContextByName(AccountingAggregatorPluginTest.ROOT_PROD);
|
|
||||||
|
|
||||||
WorkSpaceManagement workSpaceManagement = WorkSpaceManagement.getInstance();
|
WorkSpaceManagement workSpaceManagement = WorkSpaceManagement.getInstance();
|
||||||
|
|
||||||
FolderContainer root = workSpaceManagement.getWorkspaceRoot();
|
FolderContainer root = workSpaceManagement.getWorkspaceRoot();
|
||||||
|
|
|
@ -5,3 +5,5 @@
|
||||||
/token.properties
|
/token.properties
|
||||||
/d4science.research-infrastructures.eu.gcubekey
|
/d4science.research-infrastructures.eu.gcubekey
|
||||||
/CalledMethods/
|
/CalledMethods/
|
||||||
|
/config.ini
|
||||||
|
/pred4s.gcubekey
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
</appender>
|
</appender>
|
||||||
|
|
||||||
|
|
||||||
<logger name="org.gcube" level="INFO" />
|
<logger name="org.gcube" level="ERROR" />
|
||||||
<logger name="org.gcube.accounting.aggregator" level="INFO" />
|
<logger name="org.gcube.accounting.aggregator" level="INFO" />
|
||||||
|
|
||||||
<root level="WARN">
|
<root level="WARN">
|
||||||
|
|
Loading…
Reference in New Issue