Added procedure to migrate AccountignManager documents
This commit is contained in:
parent
f7c89321ee
commit
91ce1233ed
5
pom.xml
5
pom.xml
|
@ -65,6 +65,11 @@
|
||||||
<artifactId>document-store-lib-postgresql</artifactId>
|
<artifactId>document-store-lib-postgresql</artifactId>
|
||||||
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.gcube.accounting</groupId>
|
||||||
|
<artifactId>accounting-analytics-persistence-postgresql</artifactId>
|
||||||
|
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
<artifactId>slf4j-api</artifactId>
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
|
|
@ -303,6 +303,48 @@ public class CouchBaseConnector {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<AggregationStatus> getAll() throws Exception{
|
||||||
|
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SELECT *
|
||||||
|
* FROM AccountingManager
|
||||||
|
* ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
String aggregationStartDateField = "`aggregationInfo`.`aggregationStartDate`";
|
||||||
|
|
||||||
|
Sort sort = Sort.asc(aggregationStartDateField);
|
||||||
|
|
||||||
|
Statement statement = select("*").from(bucket.name()).orderBy(sort);
|
||||||
|
|
||||||
|
logger.trace("Going to query : {}", statement.toString());
|
||||||
|
|
||||||
|
N1qlQueryResult result = bucket.query(statement);
|
||||||
|
if (!result.finalSuccess()) {
|
||||||
|
logger.debug("{} failed : {}", N1qlQueryResult.class.getSimpleName(), result.errors());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<N1qlQueryRow> rows = result.allRows();
|
||||||
|
List<AggregationStatus> aggregationStatuses = new ArrayList<>(rows.size());
|
||||||
|
for(N1qlQueryRow row: rows){
|
||||||
|
try {
|
||||||
|
JsonObject jsonObject = row.value().getObject(bucket.name());
|
||||||
|
logger.trace("JsonObject : {}", jsonObject.toString());
|
||||||
|
AggregationStatus aggregationStatus = DSMapper.getObjectMapper().readValue(jsonObject.toString(), AggregationStatus.class);
|
||||||
|
aggregationStatuses.add(aggregationStatus);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Unable to elaborate result for {}", row.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return aggregationStatuses;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
||||||
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
|
Bucket bucket = CouchBaseConnector.getInstance().connectionMap.get(CouchBaseConnector.ACCOUNTING_MANAGER_BUCKET_NAME);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,252 @@
|
||||||
|
package org.gcube.accounting.aggregator.persistence;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
|
import org.gcube.accounting.aggregator.status.AggregationStateEvent;
|
||||||
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
|
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
|
||||||
|
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
|
||||||
|
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
|
||||||
|
import org.postgresql.core.Utils;
|
||||||
|
|
||||||
|
public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL {
|
||||||
|
|
||||||
|
private static final String UTC_TIME_ZONE = "UTC";
|
||||||
|
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE);
|
||||||
|
|
||||||
|
public PostgreSQLConnector() throws Exception {
|
||||||
|
this.configuration = new AccountingPersistenceBackendQueryConfiguration(AccountingPersistenceQueryPostgreSQL.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Connection getConnection() throws Exception {
|
||||||
|
Class.forName("org.postgresql.Driver");
|
||||||
|
String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY);
|
||||||
|
String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY);
|
||||||
|
String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY);
|
||||||
|
|
||||||
|
Connection connection = DriverManager.getConnection(url, username, password);
|
||||||
|
logger.trace("Database {} opened successfully", url);
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getQuotedString(String string) throws SQLException {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append("'");
|
||||||
|
Utils.escapeLiteral(builder, string, false);
|
||||||
|
builder.append("'");
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getValue(Serializable serializable) throws SQLException {
|
||||||
|
if(serializable == null) {
|
||||||
|
return "null";
|
||||||
|
}
|
||||||
|
|
||||||
|
if(serializable instanceof Number) {
|
||||||
|
return serializable.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
if(serializable instanceof Calendar) {
|
||||||
|
Calendar calendar = (Calendar) serializable;
|
||||||
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN);
|
||||||
|
simpleDateFormat.setTimeZone(DEFAULT_TIME_ZONE);
|
||||||
|
String date = simpleDateFormat.format(calendar.getTime());
|
||||||
|
return getQuotedString(date);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(serializable instanceof Date) {
|
||||||
|
Date date = (Date) serializable;
|
||||||
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATETIME_PATTERN);
|
||||||
|
simpleDateFormat.setTimeZone(DEFAULT_TIME_ZONE);
|
||||||
|
String dateString = simpleDateFormat.format(date);
|
||||||
|
return getQuotedString(dateString);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if(serializable instanceof Enum) {
|
||||||
|
Enum<?> e = (Enum<?>) serializable;
|
||||||
|
return getQuotedString(e.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
// String, URI etc
|
||||||
|
return getQuotedString(serializable.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInsertAggregationStatusQuery(AggregationStatus aggregationStatus, boolean upsert) throws SQLException {
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
|
||||||
|
stringBuffer.append("INSERT INTO ");
|
||||||
|
stringBuffer.append("aggregation_status");
|
||||||
|
stringBuffer.append(" (id, ");
|
||||||
|
stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, ");
|
||||||
|
stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, ");
|
||||||
|
stringBuffer.append("context, current_aggregation_state, last_update_time, previous)");
|
||||||
|
stringBuffer.append(" VALUES (");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getUUID().toString()));
|
||||||
|
|
||||||
|
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getRecordType()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getAggregationType()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getAggregationStartDate()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationInfo.getAggregationEndDate()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getOriginalRecordsNumber()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getAggregatedRecordsNumber()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getRecoveredRecordNumber()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getPercentage()));
|
||||||
|
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getContext()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getAggregationState()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getLastUpdateTime()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
|
||||||
|
AggregationStatus previous = aggregationStatus.getPrevious();
|
||||||
|
if(previous!=null) {
|
||||||
|
stringBuffer.append(getValue(previous.getUUID().toString()));
|
||||||
|
}else {
|
||||||
|
stringBuffer.append(getValue(null));
|
||||||
|
}
|
||||||
|
|
||||||
|
if(upsert) {
|
||||||
|
stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET ");
|
||||||
|
stringBuffer.append("original_records_number=EXCLUDED.original_records_number, aggregated_records_number=EXCLUDED.aggregated_records_number, recovered_records_number=EXCLUDED.recovered_records_number, malformed_records_number=EXCLUDED.malformed_records_number, percentage=EXCLUDED.percentage, ");
|
||||||
|
stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;");
|
||||||
|
}else {
|
||||||
|
stringBuffer.append(");");
|
||||||
|
}
|
||||||
|
|
||||||
|
return stringBuffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInsertAggregationStateQuery(AggregationStatus aggregationStatus) throws SQLException {
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
for(AggregationStateEvent aggregationStateEvent : aggregationStatus.getAggregationStateEvents()) {
|
||||||
|
stringBuffer.append("INSERT INTO ");
|
||||||
|
stringBuffer.append("aggregation_status_event");
|
||||||
|
stringBuffer.append(" ( aggregation_state, start_time, end_time, aggregation_status)");
|
||||||
|
stringBuffer.append(" VALUES (");
|
||||||
|
stringBuffer.append(getValue(aggregationStateEvent.getAggregationState()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStateEvent.getStartTime()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStateEvent.getEndTime()));
|
||||||
|
stringBuffer.append(", ");
|
||||||
|
stringBuffer.append(getValue(aggregationStatus.getUUID().toString()));
|
||||||
|
stringBuffer.append(") ON CONFLICT DO NOTHING;");
|
||||||
|
}
|
||||||
|
return stringBuffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void upsertAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
||||||
|
Connection connection = getConnection();
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
String sqlCommand = getInsertAggregationStatusQuery(aggregationStatus, true);
|
||||||
|
statement.executeUpdate(sqlCommand);
|
||||||
|
sqlCommand = getInsertAggregationStateQuery(aggregationStatus);
|
||||||
|
statement.executeUpdate(sqlCommand);
|
||||||
|
statement.close();
|
||||||
|
connection.commit();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AggregationStatus getLast(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SELECT *
|
||||||
|
* FROM aggregation_status
|
||||||
|
* WHERE
|
||||||
|
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
|
||||||
|
* `aggregationInfo`.`aggregationType` = "DAILY" AND
|
||||||
|
* `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
|
||||||
|
* `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
|
||||||
|
* ORDER BY `aggregationInfo`.`aggregationStartDate` DESC LIMIT 1
|
||||||
|
*/
|
||||||
|
|
||||||
|
return null;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AggregationStatus> getUnterminated(Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
||||||
|
return getUnterminated(null, null, aggregationStartDate, aggregationEndDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate) throws Exception{
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SELECT *
|
||||||
|
* FROM AccountingManager
|
||||||
|
* WHERE
|
||||||
|
* `aggregationState` != "COMPLETED" AND
|
||||||
|
* `lastUpdateTime` < "2017-07-31 09:31:10.984 +0000" AND
|
||||||
|
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
|
||||||
|
* `aggregationInfo`.`aggregationType` = "DAILY" AND
|
||||||
|
* `aggregationInfo`.`aggregationStartDate` >= "2017-05-01 00:00:00.000 +0000"
|
||||||
|
* `aggregationInfo`.`aggregationStartDate` <= "2017-05-31 00:00:00.000 +0000"
|
||||||
|
*
|
||||||
|
* ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
|
||||||
|
*/
|
||||||
|
|
||||||
|
List<AggregationStatus> aggregationStatuses = new ArrayList<>();
|
||||||
|
|
||||||
|
return aggregationStatuses;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<AggregationStatus> getAll() throws Exception{
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SELECT *
|
||||||
|
* FROM AccountingManager
|
||||||
|
* ORDER BY `aggregationInfo`.`aggregationStartDate` ASC
|
||||||
|
*/
|
||||||
|
|
||||||
|
List<AggregationStatus> aggregationStatuses = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
return aggregationStatuses;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static AggregationStatus getAggregationStatus(String recordType, AggregationType aggregationType, Date aggregationStartDate) throws Exception{
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SELECT *
|
||||||
|
* FROM AccountingManager
|
||||||
|
* WHERE
|
||||||
|
* `aggregationInfo`.`recordType` = "ServiceUsageRecord" AND
|
||||||
|
* `aggregationInfo`.`aggregationType` = "DAILY" AND
|
||||||
|
* `aggregationInfo`.`aggregationStartDate` = "2017-06-24 00:00:00.000 +0000"
|
||||||
|
*/
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -144,6 +144,14 @@ public class AggregationStatus {
|
||||||
public int getAggregatedRecordsNumber() {
|
public int getAggregatedRecordsNumber() {
|
||||||
return aggregatedRecordsNumber;
|
return aggregatedRecordsNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getRecoveredRecordNumber() {
|
||||||
|
return recoveredRecordNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getPercentage() {
|
||||||
|
return percentage;
|
||||||
|
}
|
||||||
|
|
||||||
public AggregationState getAggregationState() {
|
public AggregationState getAggregationState() {
|
||||||
return aggregationState;
|
return aggregationState;
|
||||||
|
@ -160,6 +168,10 @@ public class AggregationStatus {
|
||||||
public void setContext(String context) {
|
public void setContext(String context) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AggregationStatus getPrevious() {
|
||||||
|
return previous;
|
||||||
|
}
|
||||||
|
|
||||||
public int getMalformedRecordNumber() {
|
public int getMalformedRecordNumber() {
|
||||||
return malformedRecordNumber;
|
return malformedRecordNumber;
|
||||||
|
|
|
@ -7,6 +7,7 @@ import java.util.List;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
import org.gcube.accounting.aggregator.aggregation.AggregationInfo;
|
||||||
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
import org.gcube.accounting.aggregator.aggregation.AggregationType;
|
||||||
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
|
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
|
||||||
|
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnector;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationState;
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
||||||
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
||||||
import org.gcube.accounting.aggregator.utility.Utility;
|
import org.gcube.accounting.aggregator.utility.Utility;
|
||||||
|
@ -19,7 +20,34 @@ import org.slf4j.LoggerFactory;
|
||||||
public class CouchBaseConnectorTest extends ContextTest {
|
public class CouchBaseConnectorTest extends ContextTest {
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPluginTest.class);
|
private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPluginTest.class);
|
||||||
|
|
||||||
|
private PostgreSQLConnector postgreSQLConnector;
|
||||||
|
|
||||||
|
public CouchBaseConnectorTest() throws Exception {
|
||||||
|
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
||||||
|
postgreSQLConnector = new PostgreSQLConnector();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void analyseAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
|
||||||
|
if(aggregationStatus.getPrevious()!=null) {
|
||||||
|
analyseAggregationStatus(aggregationStatus.getPrevious());
|
||||||
|
}
|
||||||
|
if(aggregationStatus.getOriginalRecordsNumber()!=0) {
|
||||||
|
postgreSQLConnector.upsertAggregationStatus(aggregationStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void insertAllInPostgreSQL() throws Exception {
|
||||||
|
ContextTest.setContextByName(ROOT_DEV_SCOPE);
|
||||||
|
List<AggregationStatus> aggregationStatuses = CouchBaseConnector.getAll();
|
||||||
|
for(AggregationStatus aggregationStatus : aggregationStatuses) {
|
||||||
|
analyseAggregationStatus(aggregationStatus);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getLastTest() throws Exception {
|
public void getLastTest() throws Exception {
|
||||||
AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
|
AggregationStatus aggregationStatus = CouchBaseConnector.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
|
||||||
|
|
Loading…
Reference in New Issue