Fixed code

This commit is contained in:
Luca Frosini 2024-02-14 17:50:22 +01:00
parent d2a67855c1
commit 9ff6bdc135
14 changed files with 125 additions and 37 deletions

View File

@ -5,12 +5,12 @@ import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration { public class AggregatorPersistenceConfiguration extends AccountingPersistenceConfiguration {
/** /**
* Default Constructor * Default Constructor
*/ */
public AggregatorPersitenceConfiguration() { public AggregatorPersistenceConfiguration() {
super(); super();
} }
@ -19,7 +19,7 @@ public class AggregatorPersitenceConfiguration extends AccountingPersistenceConf
* @throws Exception if fails * @throws Exception if fails
*/ */
@SuppressWarnings({ "rawtypes" }) @SuppressWarnings({ "rawtypes" })
public AggregatorPersitenceConfiguration(Class<?> persistence) throws Exception { public AggregatorPersistenceConfiguration(Class<?> persistence) throws Exception {
super((Class) persistence); super((Class) persistence);
} }

View File

@ -7,16 +7,33 @@ public class AggregatorPersistenceFactory {
private static AggregatorPersistenceStatus aggregatorPersistenceStatus; private static AggregatorPersistenceStatus aggregatorPersistenceStatus;
private static AggregatorPersistenceStatusSrc aggregatorPersistenceStatusSrc;
private static AggregatorPersistenceStatusDst aggregatorPersistenceStatusDst;
private static AggregatorPersistenceSrc aggregatorPersistenceSrc; private static AggregatorPersistenceSrc aggregatorPersistenceSrc;
private static AggregatorPersistenceDst aggregatorPersistenceDst; private static AggregatorPersistenceDst aggregatorPersistenceDst;
public static AggregatorPersistenceStatus getAggregatorPersistenceStatus() throws Exception { public static AggregatorPersistenceStatus getAggregatorPersistenceStatus() throws Exception {
if(aggregatorPersistenceStatus == null) { if(aggregatorPersistenceStatus == null) {
aggregatorPersistenceStatus = new PostgreSQLConnectorSrc(); aggregatorPersistenceStatus = new PostgreSQLConnectorStatus();
} }
return aggregatorPersistenceStatus; return aggregatorPersistenceStatus;
} }
protected static AggregatorPersistenceStatusSrc getAggregatorPersistenceStatusSrc() throws Exception {
if(aggregatorPersistenceStatusSrc == null) {
aggregatorPersistenceStatusSrc = new PostgreSQLConnectorStatusSrc();
}
return aggregatorPersistenceStatusSrc;
}
protected static AggregatorPersistenceStatusDst getAggregatorPersistenceStatusDst() throws Exception {
if(aggregatorPersistenceStatusDst == null) {
aggregatorPersistenceStatusDst = new PostgreSQLConnectorStatusDst();
}
return aggregatorPersistenceStatusDst;
}
public static AggregatorPersistenceSrc getAggregatorPersistenceSrc() throws Exception { public static AggregatorPersistenceSrc getAggregatorPersistenceSrc() throws Exception {
if(aggregatorPersistenceSrc == null) { if(aggregatorPersistenceSrc == null) {
aggregatorPersistenceSrc = new PostgreSQLConnectorSrc(); aggregatorPersistenceSrc = new PostgreSQLConnectorSrc();

View File

@ -3,13 +3,12 @@ package org.gcube.accounting.aggregator.persistence;
import java.sql.ResultSet; import java.sql.ResultSet;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.com.fasterxml.jackson.databind.JsonNode;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
public interface AggregatorPersistenceSrc extends AggregatorPersistence, AccountingPersistenceBackendQuery { public interface AggregatorPersistenceSrc extends AggregatorPersistence {
public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception; public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception;

View File

@ -5,12 +5,11 @@ import java.util.List;
import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*/ */
public interface AggregatorPersistenceStatus extends AggregatorPersistence, AccountingPersistenceBackendQuery { public interface AggregatorPersistenceStatus {
public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType, public List<AggregationStatus> getUnterminated(String recordType, AggregationType aggregationType,
Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception; Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception;

View File

@ -0,0 +1,10 @@
package org.gcube.accounting.aggregator.persistence;
/**
* @author Luca Frosini (ISTI - CNR)
* Used only to migrate status to move database.
* Do not use for normal operation of aggregation
*/
interface AggregatorPersistenceStatusDst extends AggregatorPersistenceStatus {
}

View File

@ -0,0 +1,10 @@
package org.gcube.accounting.aggregator.persistence;
/**
* @author Luca Frosini (ISTI - CNR)
* Used only to migrate status to move database.
* Do not use for normal operation of aggregation
*/
interface AggregatorPersistenceStatusSrc extends AggregatorPersistenceStatus {
}

View File

@ -23,14 +23,13 @@ import org.gcube.accounting.aggregator.status.AggregationStateEvent;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBFields;
import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
@ -40,15 +39,18 @@ import org.postgresql.core.Utils;
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceStatus { public class PostgreSQLConnector extends PersistencePostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceDst, AggregatorPersistenceStatus {
public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
private static final String UTC_TIME_ZONE = "UTC"; private static final String UTC_TIME_ZONE = "UTC";
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE); public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE);
protected AccountingPersistenceConfiguration configuration;
protected Connection connection; protected Connection connection;
protected PostgreSQLConnector(Class<? extends AccountingPersistenceBackendQuery> clazz) throws Exception { protected PostgreSQLConnector(Class<?> clazz) throws Exception {
this.configuration = new AccountingPersistenceBackendQueryConfiguration(clazz); this.configuration = new AccountingPersistenceConfiguration(clazz);
Map<String, Class<? extends AggregatedRecord<?,?>>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); Map<String, Class<? extends AggregatedRecord<?,?>>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound();
for(String typeName : aggregatedRecords.keySet()) { for(String typeName : aggregatedRecords.keySet()) {
try { try {
@ -428,8 +430,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
StringBuffer stringBuffer = new StringBuffer(); StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("SELECT * "); stringBuffer.append("SELECT * ");
stringBuffer.append("FROM aggregation_status"); stringBuffer.append("FROM aggregation_status ");
stringBuffer.append("ORDER BY aggregation_start_date ASC"); stringBuffer.append("ORDER BY aggregation_start_date ASC");
Connection connection = getConnection(); Connection connection = getConnection();
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();

View File

@ -1,31 +1,15 @@
package org.gcube.accounting.aggregator.persistence; package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.Record;
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class PostgreSQLConnectorDst implements AggregatorPersistenceDst { public class PostgreSQLConnectorDst extends PostgreSQLConnector implements AggregatorPersistenceDst {
protected PersistencePostgreSQL persistencePostgreSQL;
protected PostgreSQLConnectorDst() throws Exception { protected PostgreSQLConnectorDst() throws Exception {
//AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PostgreSQLConnectorDst.class); // super(PostgreSQLConnectorDst.class);
AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); super(PersistencePostgreSQL.class);
persistencePostgreSQL = new PersistencePostgreSQL();
persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration);
}
@Override
public void insert(Record record) throws Exception {
persistencePostgreSQL.insert(record);
}
@Override
public void commitAndClose() throws Exception {
persistencePostgreSQL.commitAndClose();
} }
} }

View File

@ -8,6 +8,7 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten
public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc { public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc {
protected PostgreSQLConnectorSrc() throws Exception { protected PostgreSQLConnectorSrc() throws Exception {
// super(PostgreSQLConnectorSrc.class);
super(AccountingPersistenceQueryPostgreSQL.class); super(AccountingPersistenceQueryPostgreSQL.class);
} }

View File

@ -5,10 +5,13 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceSrc { public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceStatus {
protected PostgreSQLConnectorStatus() throws Exception { protected PostgreSQLConnectorStatus() throws Exception {
super(AccountingPersistenceQueryPostgreSQL.class); super(AccountingPersistenceQueryPostgreSQL.class);
} }
protected PostgreSQLConnectorStatus(Class<?> clazz) throws Exception {
super(clazz);
}
} }

View File

@ -0,0 +1,15 @@
package org.gcube.accounting.aggregator.persistence;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
/**
* @author Luca Frosini (ISTI-CNR)
*/
class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst {
protected PostgreSQLConnectorStatusDst() throws Exception {
super(PersistencePostgreSQL.class);
}
}

View File

@ -0,0 +1,14 @@
package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
/**
* @author Luca Frosini (ISTI-CNR)
*/
class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc {
protected PostgreSQLConnectorStatusSrc() throws Exception {
super(AccountingPersistenceQueryPostgreSQL.class);
}
}

View File

@ -159,4 +159,38 @@ public class PostgreSQLConnectorTest extends ContextTest {
logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
protected void analyseAggregationStatus(AggregationStatus aggregationStatus) throws Exception {
if(aggregationStatus.getPrevious()!=null) {
analyseAggregationStatus(aggregationStatus.getPrevious());
}
AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst();
if(aggregationStatus.getOriginalRecordsNumber()!=0) {
apsDst.upsertAggregationStatus(aggregationStatus);
}
}
// @Ignore
@Test
public void moveStatusFromSrcToDst() throws Exception {
ContextTest.setContextByName(GCUBE);
AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
List<AggregationStatus> aggregationStatuses = apsSrc.getAll();
for(AggregationStatus aggregationStatus : aggregationStatuses) {
analyseAggregationStatus(aggregationStatus);
}
}
// @Test
// public void testAggregatorPersistenceStatusSrcAndDst() throws Exception {
// AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc();
// logger.debug("{}", apsSrc);
// AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst();
// logger.debug("{}", apsDst);
// }
} }

View File

@ -9,8 +9,8 @@
</appender> </appender>
<logger name="org.gcube" level="INFO" /> <logger name="org.gcube" level="ERROR" />
<logger name="org.gcube.accounting.aggregator" level="INFO" /> <logger name="org.gcube.accounting.aggregator" level="TRACE" />
<root level="WARN"> <root level="WARN">
<appender-ref ref="STDOUT" /> <appender-ref ref="STDOUT" />