diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceConfiguration.java similarity index 64% rename from src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java rename to src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceConfiguration.java index dba6486..227f949 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersitenceConfiguration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceConfiguration.java @@ -5,12 +5,12 @@ import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; /** * @author Luca Frosini (ISTI-CNR) */ -public class AggregatorPersitenceConfiguration extends AccountingPersistenceConfiguration { +public class AggregatorPersistenceConfiguration extends AccountingPersistenceConfiguration { /** * Default Constructor */ - public AggregatorPersitenceConfiguration() { + public AggregatorPersistenceConfiguration() { super(); } @@ -19,7 +19,7 @@ public class AggregatorPersitenceConfiguration extends AccountingPersistenceConf * @throws Exception if fails */ @SuppressWarnings({ "rawtypes" }) - public AggregatorPersitenceConfiguration(Class persistence) throws Exception { + public AggregatorPersistenceConfiguration(Class persistence) throws Exception { super((Class) persistence); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java index e64edba..882e2cc 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceFactory.java @@ -7,16 +7,33 @@ public class AggregatorPersistenceFactory { private static AggregatorPersistenceStatus aggregatorPersistenceStatus; + private static AggregatorPersistenceStatusSrc aggregatorPersistenceStatusSrc; + private static AggregatorPersistenceStatusDst aggregatorPersistenceStatusDst; + private static AggregatorPersistenceSrc aggregatorPersistenceSrc; private static AggregatorPersistenceDst aggregatorPersistenceDst; public static AggregatorPersistenceStatus getAggregatorPersistenceStatus() throws Exception { if(aggregatorPersistenceStatus == null) { - aggregatorPersistenceStatus = new PostgreSQLConnectorSrc(); + aggregatorPersistenceStatus = new PostgreSQLConnectorStatus(); } return aggregatorPersistenceStatus; } + protected static AggregatorPersistenceStatusSrc getAggregatorPersistenceStatusSrc() throws Exception { + if(aggregatorPersistenceStatusSrc == null) { + aggregatorPersistenceStatusSrc = new PostgreSQLConnectorStatusSrc(); + } + return aggregatorPersistenceStatusSrc; + } + + protected static AggregatorPersistenceStatusDst getAggregatorPersistenceStatusDst() throws Exception { + if(aggregatorPersistenceStatusDst == null) { + aggregatorPersistenceStatusDst = new PostgreSQLConnectorStatusDst(); + } + return aggregatorPersistenceStatusDst; + } + public static AggregatorPersistenceSrc getAggregatorPersistenceSrc() throws Exception { if(aggregatorPersistenceSrc == null) { aggregatorPersistenceSrc = new PostgreSQLConnectorSrc(); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java index 1b09bcb..84802cb 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java @@ -3,13 +3,12 @@ package org.gcube.accounting.aggregator.persistence; import java.sql.ResultSet; import org.gcube.accounting.aggregator.status.AggregationStatus; -import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; import org.gcube.com.fasterxml.jackson.databind.JsonNode; /** * @author Luca Frosini (ISTI - CNR) */ -public interface AggregatorPersistenceSrc extends AggregatorPersistence, AccountingPersistenceBackendQuery { +public interface AggregatorPersistenceSrc extends AggregatorPersistence { public ResultSet getResultSetOfRecordToBeAggregated(AggregationStatus aggregationStatus) throws Exception; diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java index 6f206f2..529fb56 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatus.java @@ -5,12 +5,11 @@ import java.util.List; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.status.AggregationStatus; -import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; /** * @author Luca Frosini (ISTI - CNR) */ -public interface AggregatorPersistenceStatus extends AggregatorPersistence, AccountingPersistenceBackendQuery { +public interface AggregatorPersistenceStatus { public List getUnterminated(String recordType, AggregationType aggregationType, Date aggregationStartDate, Date aggregationEndDate, boolean forceRestart) throws Exception; diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusDst.java new file mode 100644 index 0000000..66a95b9 --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusDst.java @@ -0,0 +1,10 @@ +package org.gcube.accounting.aggregator.persistence; + +/** + * @author Luca Frosini (ISTI - CNR) + * Used only to migrate status to move database. + * Do not use for normal operation of aggregation + */ +interface AggregatorPersistenceStatusDst extends AggregatorPersistenceStatus { + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusSrc.java new file mode 100644 index 0000000..ac44ba2 --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceStatusSrc.java @@ -0,0 +1,10 @@ +package org.gcube.accounting.aggregator.persistence; + +/** + * @author Luca Frosini (ISTI - CNR) + * Used only to migrate status to move database. + * Do not use for normal operation of aggregation + */ +interface AggregatorPersistenceStatusSrc extends AggregatorPersistenceStatus { + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index b0a6165..f977f77 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -23,14 +23,13 @@ import org.gcube.accounting.aggregator.status.AggregationStateEvent; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; -import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQuery; -import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration; import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; @@ -40,15 +39,18 @@ import org.postgresql.core.Utils; /** * @author Luca Frosini (ISTI-CNR) */ -public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceStatus { +public class PostgreSQLConnector extends PersistencePostgreSQL implements AggregatorPersistenceSrc, AggregatorPersistenceDst, AggregatorPersistenceStatus { + public static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z"; + private static final String UTC_TIME_ZONE = "UTC"; public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone(UTC_TIME_ZONE); + protected AccountingPersistenceConfiguration configuration; protected Connection connection; - protected PostgreSQLConnector(Class clazz) throws Exception { - this.configuration = new AccountingPersistenceBackendQueryConfiguration(clazz); + protected PostgreSQLConnector(Class clazz) throws Exception { + this.configuration = new AccountingPersistenceConfiguration(clazz); Map>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound(); for(String typeName : aggregatedRecords.keySet()) { try { @@ -428,8 +430,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * "); - stringBuffer.append("FROM aggregation_status"); - stringBuffer.append("ORDER BY aggregation_start_date ASC"); + stringBuffer.append("FROM aggregation_status "); + stringBuffer.append("ORDER BY aggregation_start_date ASC"); Connection connection = getConnection(); Statement statement = connection.createStatement(); diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java index 1b1f3fb..7e8ee4d 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorDst.java @@ -1,31 +1,15 @@ package org.gcube.accounting.aggregator.persistence; -import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.documentstore.persistence.PersistencePostgreSQL; -import org.gcube.documentstore.records.Record; /** * @author Luca Frosini (ISTI-CNR) */ -public class PostgreSQLConnectorDst implements AggregatorPersistenceDst { +public class PostgreSQLConnectorDst extends PostgreSQLConnector implements AggregatorPersistenceDst { - protected PersistencePostgreSQL persistencePostgreSQL; - protected PostgreSQLConnectorDst() throws Exception { - //AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PostgreSQLConnectorDst.class); - AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); - persistencePostgreSQL = new PersistencePostgreSQL(); - persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration); - } - - @Override - public void insert(Record record) throws Exception { - persistencePostgreSQL.insert(record); - } - - @Override - public void commitAndClose() throws Exception { - persistencePostgreSQL.commitAndClose(); +// super(PostgreSQLConnectorDst.class); + super(PersistencePostgreSQL.class); } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java index 077949b..d8e6e24 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorSrc.java @@ -8,6 +8,7 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc { protected PostgreSQLConnectorSrc() throws Exception { +// super(PostgreSQLConnectorSrc.class); super(AccountingPersistenceQueryPostgreSQL.class); } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java index 2f663d7..2db60dd 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatus.java @@ -5,10 +5,13 @@ import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersisten /** * @author Luca Frosini (ISTI-CNR) */ -public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceSrc { +public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceStatus { protected PostgreSQLConnectorStatus() throws Exception { super(AccountingPersistenceQueryPostgreSQL.class); } + protected PostgreSQLConnectorStatus(Class clazz) throws Exception { + super(clazz); + } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java new file mode 100644 index 0000000..1201ae5 --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusDst.java @@ -0,0 +1,15 @@ +package org.gcube.accounting.aggregator.persistence; + +import org.gcube.documentstore.persistence.PersistencePostgreSQL; + +/** + * @author Luca Frosini (ISTI-CNR) + */ +class PostgreSQLConnectorStatusDst extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusDst { + + protected PostgreSQLConnectorStatusDst() throws Exception { + super(PersistencePostgreSQL.class); + } + + +} diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java new file mode 100644 index 0000000..dafbae2 --- /dev/null +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorStatusSrc.java @@ -0,0 +1,14 @@ +package org.gcube.accounting.aggregator.persistence; + +import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL; + +/** + * @author Luca Frosini (ISTI-CNR) + */ +class PostgreSQLConnectorStatusSrc extends PostgreSQLConnectorStatus implements AggregatorPersistenceStatusSrc { + + protected PostgreSQLConnectorStatusSrc() throws Exception { + super(AccountingPersistenceQueryPostgreSQL.class); + } + +} diff --git a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java index b50953c..3002c66 100644 --- a/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnectorTest.java @@ -159,4 +159,38 @@ public class PostgreSQLConnectorTest extends ContextTest { logger.debug("{} : {}", AggregationStatus.class.getSimpleName(), DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); } + + + protected void analyseAggregationStatus(AggregationStatus aggregationStatus) throws Exception { + if(aggregationStatus.getPrevious()!=null) { + analyseAggregationStatus(aggregationStatus.getPrevious()); + } + + AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst(); + + if(aggregationStatus.getOriginalRecordsNumber()!=0) { + apsDst.upsertAggregationStatus(aggregationStatus); + } + + } + + // @Ignore + @Test + public void moveStatusFromSrcToDst() throws Exception { + ContextTest.setContextByName(GCUBE); + AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); + List aggregationStatuses = apsSrc.getAll(); + for(AggregationStatus aggregationStatus : aggregationStatuses) { + analyseAggregationStatus(aggregationStatus); + } + } + +// @Test +// public void testAggregatorPersistenceStatusSrcAndDst() throws Exception { +// AggregatorPersistenceStatusSrc apsSrc = AggregatorPersistenceFactory.getAggregatorPersistenceStatusSrc(); +// logger.debug("{}", apsSrc); +// AggregatorPersistenceStatusDst apsDst = AggregatorPersistenceFactory.getAggregatorPersistenceStatusDst(); +// logger.debug("{}", apsDst); +// } + } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index f9ab3bf..0e3d02c 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -9,8 +9,8 @@ - - + +