Migrating aggregator to PersistenceSQL

This commit is contained in:
Luca Frosini 2021-03-25 12:38:28 +01:00
parent cc6fea69fb
commit 12ca03a111
8 changed files with 37 additions and 24 deletions

View File

@ -4,7 +4,7 @@ import java.util.Calendar;
import java.util.Date;
import org.gcube.accounting.aggregator.utility.Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
/**
* @author Luca Frosini (ISTI - CNR)

View File

@ -12,7 +12,7 @@ import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.persistence.PersistenceBackendFactory;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
@ -54,7 +54,10 @@ public class InsertDocument extends DocumentElaboration {
File destinationFolder = file.getParentFile();
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
persistencePostgreSQL = (PersistencePostgreSQL) PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext());
AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class);
persistencePostgreSQL = new PersistencePostgreSQL();
persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration);
count = 0;
}
@ -110,9 +113,6 @@ public class InsertDocument extends DocumentElaboration {
* bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
*/
Record record = RecordUtility.getRecord(jsonObject.toString());
if(count == 0) {
persistencePostgreSQL.newConnection();
}
persistencePostgreSQL.insert(record);
++count;

View File

@ -66,9 +66,16 @@ public class Persist {
public void recover() throws Exception{
if(aggregationStatus.getAggregatedRecordsNumber()==aggregationStatus.getOriginalRecordsNumber()){
Calendar now = Utility.getUTCCalendarInstance();
if(aggregationStatus.getAggregatedRecordsNumber()==0) {
setAggregationStateToCompleted(now);
return;
}
/*
if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){
Calendar now = Utility.getUTCCalendarInstance();
logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}",
aggregationStatus.getAggregationInfo(),
aggregationStatus.getOriginalRecordsNumber(),

View File

@ -14,6 +14,7 @@ import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.UsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.persistence.PersistenceBackendFactory;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.vremanagement.executor.plugin.Plugin;
@ -118,6 +119,8 @@ public class AccountingAggregatorPlugin extends Plugin {
@SuppressWarnings("unchecked")
@Override
public void launch(Map<String, Object> inputs) throws Exception {
PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext());
AggregationType aggregationType = null;
Date aggregationStartDate = null;
Date aggregationEndDate = null;

View File

@ -3,9 +3,8 @@ package org.gcube.accounting.aggregator.status;
import java.util.Calendar;
import org.gcube.accounting.aggregator.utility.Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
import org.gcube.com.fasterxml.jackson.annotation.JsonProperty;
/**
* @author Luca Frosini (ISTI - CNR)

View File

@ -11,12 +11,11 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.com.fasterxml.jackson.annotation.JsonFormat;
import org.gcube.com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* @author Luca Frosini (ISTI - CNR)
*/
@ -51,7 +50,7 @@ public class AggregationStatus {
protected AggregationStatus previous;
// Last observed status
@JsonFormat(shape= JsonFormat.Shape.STRING)
@JsonFormat(shape= JsonFormat.Shape.STRING, pattern = Constant.DATETIME_PATTERN)
@JsonProperty
protected AggregationState aggregationState;

View File

@ -3,7 +3,6 @@ package org.gcube.accounting.aggregator.plugin;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin.ElaborationType;
@ -22,8 +21,8 @@ public class AccountingAggregatorPluginTest extends ContextTest {
@Test
public void aggregate() throws Exception {
//ContextTest.setContextByName(ROOT_DEV_SCOPE);
ContextTest.setContextByName(ROOT_PROD);
ContextTest.setContextByName(ROOT_DEV_SCOPE);
//ContextTest.setContextByName(ROOT_PROD);
Map<String, Object> inputs = new HashMap<String, Object>();
@ -44,16 +43,16 @@ public class AccountingAggregatorPluginTest extends ContextTest {
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, false);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, false);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2018, Calendar.SEPTEMBER, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
// Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2018, Calendar.NOVEMBER, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.APRIL, 1);
/*
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
@ -67,7 +66,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
while(aggregationStartCalendar.before(aggregationEndCalendar)) {
plugin.launch(inputs);
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
//Thread.sleep(TimeUnit.MINUTES.toMillis(1));
aggregationStartCalendar.add(aggregationType.getCalendarField(), 1);
aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
@ -98,15 +97,17 @@ public class AccountingAggregatorPluginTest extends ContextTest {
inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(8, 0));
inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(20, 30));
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.FEBRUARY, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true);
// Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2020, Calendar.APRIL, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2020, Calendar.MARCH, 1);
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate);

View File

@ -0,0 +1,4 @@
groupId=org.gcube.accounting
artifactId=accounting-aggregator-se-plugin
version=2.0.0-SNAPSHOT
description=Accounting Aggregator Smart Executor Plugin provides lossless accounting records aggregation.