Improved aggregator

This commit is contained in:
Luca Frosini 2020-04-03 17:59:39 +02:00
parent 8812f66acc
commit 1f560d0b01
4 changed files with 42 additions and 26 deletions

View File

@ -7,12 +7,9 @@ public enum DesignID {
accounting_storage("StorageUsageRecordAggregated","all"),
StorageUsageRecord("StorageUsageRecordAggregated","all"),
accounting_service("ServiceUsageRecordAggregated","all"),
ServiceUsageRecord("ServiceUsageRecordAggregated","all"),
MonthlyAggregatedServiceUsageRecord("ServiceUsageRecordAggregated","all"),
accounting_portlet("PortletUsageRecordAggregated","all"),
accounting_job("JobUsageRecordAggregated","all"),
accounting_task("TaskUsageRecordAggregated","all");
AggregatedServiceUsageRecord("ServiceUsageRecordAggregated","all"),
JobUsageRecord("JobUsageRecordAggregated","all");
private String designName;
private String viewName;

View File

@ -4,9 +4,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.elaboration.Elaborator;
@ -14,6 +12,7 @@ import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import com.couchbase.client.java.Bucket;
@ -37,7 +36,7 @@ public class InsertDocument extends DocumentElaboration {
*
* There is an entry for each triple
*/
protected Map<String,Set<String>> serviceClassName_calledMethods;
protected Map<String,Map<String,Integer>> serviceClassName_calledMethods;
protected List<String> unparsableLines;
protected boolean serviceUsageRecordElaboration;
@ -47,7 +46,7 @@ public class InsertDocument extends DocumentElaboration {
public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new TreeMap<String,Set<String>>();
serviceClassName_calledMethods = new TreeMap<String,Map<String,Integer>>();
unparsableLines = new ArrayList<>();
File destinationFolder = file.getParentFile();
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
@ -61,13 +60,27 @@ public class InsertDocument extends DocumentElaboration {
protected void addServiceClassName_calledMethods(JsonObject jsonObject) {
String key = getKey(jsonObject);
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
if(setOfCalledMethods==null) {
setOfCalledMethods = new TreeSet<>();
serviceClassName_calledMethods.put(key, setOfCalledMethods);
}
String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD);
setOfCalledMethods.add(calledMethod);
int operationCount = 0;
try {
operationCount = jsonObject.getInt(AggregatedServiceUsageRecord.OPERATION_COUNT);
}catch (Exception e) {
logger.error("", e);
// the record was not an Aggregated ServiceUsageRecord
}
Map<String,Integer> calledMethodsMap = serviceClassName_calledMethods.get(key);
if(calledMethodsMap==null) {
calledMethodsMap = new TreeMap<>();
serviceClassName_calledMethods.put(key, calledMethodsMap);
}
if(!calledMethodsMap.containsKey(calledMethod)) {
calledMethodsMap.put(calledMethod, operationCount);
}else {
int operationCountOnMap = calledMethodsMap.get(calledMethod);
int sum = operationCountOnMap + operationCount;
calledMethodsMap.put(calledMethod, sum);
}
}
protected JsonObject analyseLine(String line) {
@ -97,10 +110,11 @@ public class InsertDocument extends DocumentElaboration {
calledMethodCSVFile.delete();
}
for(String key : serviceClassName_calledMethods.keySet()) {
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
for(String calledMethod : setOfCalledMethods) {
Map<String,Integer> calledMethodsMap = serviceClassName_calledMethods.get(key);
for(String calledMethod : calledMethodsMap.keySet()) {
try {
Utility.printLine(calledMethodCSVFile, key + "," + calledMethod);
int operationCount = calledMethodsMap.get(calledMethod);
Utility.printLine(calledMethodCSVFile, "\"" + key.replace(",", "\",\"") + "\",\"" + calledMethod + "\",\"" + String.valueOf(operationCount) + "\"");
} catch(Throwable e) {
logger.error("Unable to print CSV line : {},{}", key, calledMethod);
}

View File

@ -26,8 +26,10 @@ public class AccountingAggregatorPluginTest extends ContextTest {
Map<String, Object> inputs = new HashMap<String, Object>();
AggregationType aggregationType = AggregationType.YEARLY;
//type aggregation
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, AggregationType.MONTHLY.name());
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name());
inputs.put(AccountingAggregatorPlugin.ELABORATION_TYPE_INPUT_PARAMETER, ElaborationType.AGGREGATE.name());
@ -40,11 +42,11 @@ public class AccountingAggregatorPluginTest extends ContextTest {
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, false);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, false);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, true);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, true);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JULY, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
@ -60,11 +62,12 @@ public class AccountingAggregatorPluginTest extends ContextTest {
logger.debug("Going to launch {} with inputs {}", AccountingAggregatorPluginDeclaration.NAME, inputs);
// Calendar end = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1);
Calendar end = Utility.getAggregationStartCalendar(2017, Calendar.AUGUST, 1);
Calendar end = Utility.getAggregationStartCalendar(2020, Calendar.JANUARY, 1);
while(aggregationStartCalendar.before(end)) {
plugin.launch(inputs);
aggregationStartCalendar.add(Calendar.MONTH, 1);
aggregationStartCalendar.add(aggregationType.getCalendarField(), 1);
aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
}
@ -89,14 +92,16 @@ public class AccountingAggregatorPluginTest extends ContextTest {
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put(AccountingAggregatorPlugin.ELABORATION_TYPE_INPUT_PARAMETER, ElaborationType.RECOVERY.name());
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, AggregationType.MONTHLY.name());
inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(8, 0));
inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(20, 30));
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.DECEMBER, 1);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, true);
/*
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.SEPTEMBER, 30);
@ -109,7 +114,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
logger.debug("Going to launch {} with inputs {}", AccountingAggregatorPluginDeclaration.NAME, inputs);
// Calendar end = Utility.getEndCalendarFromStartCalendar(AggregationType.MONTHLY, aggregationStartCalendar, 1);
Calendar end = Utility.getAggregationStartCalendar(2018, Calendar.JANUARY, 1);
//Calendar end = Utility.getAggregationStartCalendar(2018, Calendar.JANUARY, 1);
/*
while(aggregationStartCalendar.before(end)) {

View File

@ -10,7 +10,7 @@
<logger name="org.gcube" level="INFO" />
<logger name="org.gcube.accounting.aggregator" level="TRACE" />
<logger name="org.gcube.accounting.aggregator" level="INFO" />
<root level="WARN">
<appender-ref ref="STDOUT" />