From 12ca03a111ab581f01cbc68bb230b59f49f5fdbf Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Thu, 25 Mar 2021 12:38:28 +0100 Subject: [PATCH] Migrating aggregator to PersistenceSQL --- .../aggregation/AggregationInfo.java | 2 +- .../aggregator/persist/InsertDocument.java | 10 ++++----- .../aggregator/persist/Persist.java | 9 +++++++- .../plugin/AccountingAggregatorPlugin.java | 3 +++ .../status/AggregationStateEvent.java | 5 ++--- .../aggregator/status/AggregationStatus.java | 7 +++---- .../AccountingAggregatorPluginTest.java | 21 ++++++++++--------- src/test/resources/META-INF/plugin.properties | 4 ++++ 8 files changed, 37 insertions(+), 24 deletions(-) create mode 100644 src/test/resources/META-INF/plugin.properties diff --git a/src/main/java/org/gcube/accounting/aggregator/aggregation/AggregationInfo.java b/src/main/java/org/gcube/accounting/aggregator/aggregation/AggregationInfo.java index 4b4e6ab..a4e273b 100644 --- a/src/main/java/org/gcube/accounting/aggregator/aggregation/AggregationInfo.java +++ b/src/main/java/org/gcube/accounting/aggregator/aggregation/AggregationInfo.java @@ -4,7 +4,7 @@ import java.util.Calendar; import java.util.Date; import org.gcube.accounting.aggregator.utility.Constant; -import com.fasterxml.jackson.annotation.JsonFormat; +import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; /** * @author Luca Frosini (ISTI - CNR) diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index 13d609a..f25d5a4 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -12,7 +12,7 @@ import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; -import org.gcube.documentstore.persistence.PersistenceBackendFactory; +import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; @@ -54,7 +54,10 @@ public class InsertDocument extends DocumentElaboration { File destinationFolder = file.getParentFile(); calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX)); - persistencePostgreSQL = (PersistencePostgreSQL) PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext()); + AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); + persistencePostgreSQL = new PersistencePostgreSQL(); + persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration); + count = 0; } @@ -110,9 +113,6 @@ public class InsertDocument extends DocumentElaboration { * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); */ Record record = RecordUtility.getRecord(jsonObject.toString()); - if(count == 0) { - persistencePostgreSQL.newConnection(); - } persistencePostgreSQL.insert(record); ++count; diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java index 4ee2cc1..9d7803e 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java @@ -66,9 +66,16 @@ public class Persist { public void recover() throws Exception{ if(aggregationStatus.getAggregatedRecordsNumber()==aggregationStatus.getOriginalRecordsNumber()){ + Calendar now = Utility.getUTCCalendarInstance(); + + if(aggregationStatus.getAggregatedRecordsNumber()==0) { + setAggregationStateToCompleted(now); + return; + } + /* if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){ - Calendar now = Utility.getUTCCalendarInstance(); + logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}", aggregationStatus.getAggregationInfo(), aggregationStatus.getOriginalRecordsNumber(), diff --git a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java index 27ccd74..de616eb 100644 --- a/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java +++ b/src/main/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPlugin.java @@ -14,6 +14,7 @@ import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.UsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; +import org.gcube.documentstore.persistence.PersistenceBackendFactory; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.gcube.vremanagement.executor.plugin.Plugin; @@ -118,6 +119,8 @@ public class AccountingAggregatorPlugin extends Plugin { @SuppressWarnings("unchecked") @Override public void launch(Map inputs) throws Exception { + PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext()); + AggregationType aggregationType = null; Date aggregationStartDate = null; Date aggregationEndDate = null; diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStateEvent.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStateEvent.java index d892ab9..d8e0d4b 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStateEvent.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStateEvent.java @@ -3,9 +3,8 @@ package org.gcube.accounting.aggregator.status; import java.util.Calendar; import org.gcube.accounting.aggregator.utility.Constant; - -import com.fasterxml.jackson.annotation.JsonFormat; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; +import org.gcube.com.fasterxml.jackson.annotation.JsonProperty; /** * @author Luca Frosini (ISTI - CNR) diff --git a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java index 4c10a05..52861a8 100644 --- a/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java +++ b/src/main/java/org/gcube/accounting/aggregator/status/AggregationStatus.java @@ -11,12 +11,11 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; +import org.gcube.com.fasterxml.jackson.annotation.JsonFormat; +import org.gcube.com.fasterxml.jackson.annotation.JsonProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonFormat; -import com.fasterxml.jackson.annotation.JsonProperty; - /** * @author Luca Frosini (ISTI - CNR) */ @@ -51,7 +50,7 @@ public class AggregationStatus { protected AggregationStatus previous; // Last observed status - @JsonFormat(shape= JsonFormat.Shape.STRING) + @JsonFormat(shape= JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN) @JsonProperty protected AggregationState aggregationState; diff --git a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java index 7c88dd7..d79b4f0 100644 --- a/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java +++ b/src/test/java/org/gcube/accounting/aggregator/plugin/AccountingAggregatorPluginTest.java @@ -3,7 +3,6 @@ package org.gcube.accounting.aggregator.plugin; import java.util.Calendar; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin.ElaborationType; @@ -22,8 +21,8 @@ public class AccountingAggregatorPluginTest extends ContextTest { @Test public void aggregate() throws Exception { - //ContextTest.setContextByName(ROOT_DEV_SCOPE); - ContextTest.setContextByName(ROOT_PROD); + ContextTest.setContextByName(ROOT_DEV_SCOPE); + //ContextTest.setContextByName(ROOT_PROD); Map inputs = new HashMap(); @@ -44,16 +43,16 @@ public class AccountingAggregatorPluginTest extends ContextTest { inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, false); inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true); - inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, true); - inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true); + inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, false); + inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false); - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2018, Calendar.SEPTEMBER, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1); String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); // Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1); - Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2018, Calendar.NOVEMBER, 1); + Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.APRIL, 1); /* String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate); @@ -67,7 +66,7 @@ public class AccountingAggregatorPluginTest extends ContextTest { while(aggregationStartCalendar.before(aggregationEndCalendar)) { plugin.launch(inputs); - Thread.sleep(TimeUnit.MINUTES.toMillis(1)); + //Thread.sleep(TimeUnit.MINUTES.toMillis(1)); aggregationStartCalendar.add(aggregationType.getCalendarField(), 1); aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); @@ -98,15 +97,17 @@ public class AccountingAggregatorPluginTest extends ContextTest { inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(8, 0)); inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(20, 30)); - Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1); + Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.FEBRUARY, 1); String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate); + inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true); + inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true); // Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1); - Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2020, Calendar.APRIL, 1); + Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1); String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime()); logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate); inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate); diff --git a/src/test/resources/META-INF/plugin.properties b/src/test/resources/META-INF/plugin.properties new file mode 100644 index 0000000..7370ef5 --- /dev/null +++ b/src/test/resources/META-INF/plugin.properties @@ -0,0 +1,4 @@ +groupId=org.gcube.accounting +artifactId=accounting-aggregator-se-plugin +version=2.0.0-SNAPSHOT +description=Accounting Aggregator Smart Executor Plugin provides lossless accounting records aggregation. \ No newline at end of file