Merged from branch

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@155053 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-10-10 10:25:13 +00:00
parent b2e0eb62cb
commit 26c9cf5cf2
6 changed files with 204 additions and 9 deletions

View File

@ -10,6 +10,7 @@
<Change>Aggregation get last aggregation day from management bucket #9424</Change>
<Change>Recovery restart aggregation which didn't terminate getting the informations from management bucket #9425</Change>
<Change>Added support to read original records from a bucket and to publish aggregated in a different one #9556</Change>
<Change>Added possibility to specify a range for aggregation and recovery</Change>
</Changeset>
<Changeset component="org.gcube.accounting.accounting-aggregator-se-plugin.1.0.1" date="2017-03-16">
<Change>Enhached logging</Change>

View File

@ -146,9 +146,6 @@ public class Aggregator {
String record = content.toString();
// Backup the Record on local file
Utility.printLine(originalRecordsbackupFile, record);
// Aggregate the Record
aggregateRow(aggregatorBuffer, record);
@ -161,6 +158,8 @@ public class Aggregator {
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
}
Utility.printLine(originalRecordsbackupFile, record);
return originalRecordsCounter;
}catch (Exception e) {
throw e;

View File

@ -8,6 +8,7 @@ import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin.ElaborationType;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,8 +17,10 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPluginTest.class);
//@Test
public void serviceJanuary2017Aggregation() throws Exception {
@Test
public void aggregate() throws Exception {
setContext(ROOT);
Map<String, Object> inputs = new HashMap<String, Object>();
//type aggregation
@ -34,15 +37,17 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, true);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 1);
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.OCTOBER, 2);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JANUARY, 31);
/*
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.AUGUST, 3);
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate);
*/
AccountingAggregatorPlugin plugin = new AccountingAggregatorPlugin(null);
logger.debug("Going to launch {} with inputs {}", AccountingAggregatorPluginDeclaration.NAME, inputs);
@ -51,14 +56,25 @@ public class AccountingAggregatorPluginTest extends ScopedTest {
}
//@Test
@Test
public void testRecovery() throws Exception {
setContext(ROOT);
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put(AccountingAggregatorPlugin.ELABORATION_TYPE_INPUT_PARAMETER, ElaborationType.RECOVERY.name());
inputs.put(AccountingAggregatorPlugin.PERSIST_START_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(8, 0));
inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(19, 0));
inputs.put(AccountingAggregatorPlugin.PERSIST_END_TIME_INPUT_PARAMETER, Utility.getPersistTimeParameter(20, 30));
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JULY, 26);
String aggregationStartDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationStartCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationStartDate);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2017, Calendar.JULY, 27);
String aggregationEndDate = AccountingAggregatorPlugin.AGGREGATION_START_DATE_DATE_FORMAT.format(aggregationEndCalendar.getTime());
logger.trace("{} : {}", AccountingAggregatorPlugin.AGGREGATION_START_DATE_INPUT_PARAMETER, aggregationEndDate);
inputs.put(AccountingAggregatorPlugin.AGGREGATION_END_DATE_INPUT_PARAMETER, aggregationEndDate);
AccountingAggregatorPlugin plugin = new AccountingAggregatorPlugin(null);

View File

@ -45,6 +45,9 @@ public class ScopedTest {
public static final String DEFAULT_TEST_SCOPE;
public static final String ALTERNATIVE_TEST_SCOPE;
public static final String ROOT_VARNAME = "ROOT";
public static final String ROOT;
static {
Properties properties = new Properties();
InputStream input = ScopedTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME);
@ -64,6 +67,8 @@ public class ScopedTest {
GCUBE_DEVSEC = properties.getProperty(GCUBE_DEVSEC_VARNAME);
GCUBE_DEVSEC_DEVVRE = properties.getProperty(GCUBE_DEVSEC_DEVVRE_VARNAME);
ROOT = properties.getProperty(ROOT_VARNAME);
DEFAULT_TEST_SCOPE = GCUBE_DEVSEC;
ALTERNATIVE_TEST_SCOPE = GCUBE_DEVNEXT;
}

View File

@ -0,0 +1,43 @@
package org.gcube.accounting.aggregator.recover;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* This is used in case of multiple concurrent run launched accidentally
* which generated duplicated entries in original.json file and
* unpredictable records in aggregated
* To recover the situation. the file must be copied in src/test/resources
*/
public class LaunchGenerate {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(LaunchGenerate.class);
//@Test
public void run() throws Exception {
File src = new File("src");
File test = new File(src, "test");
File resources = new File(test, "resources");
File year = new File(resources, "2017");
List<File> files = new ArrayList<>();
File month07 = new File(year, "07");
File file26 = new File(month07, "26-ServiceUsageRecord.original.json.bad");
Assert.assertTrue(file26.exists());
files.add(file26);
for(File file : files){
RecoverOriginalRecords g = new RecoverOriginalRecords(file);
g.elaborate();
}
}
}

View File

@ -0,0 +1,131 @@
package org.gcube.accounting.aggregator.recover;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.accounting.aggregator.aggregation.AggregatorBuffer;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.couchbase.client.java.document.json.JsonObject;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class RecoverOriginalRecords {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final String ID = Record.ID;
static {
/// One Record per package is enough
RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage());
}
protected final File srcFile;
protected final File duplicatedFile;
protected final File cleanDstFile;
protected final File aggregatedFile;
protected final Map<String, Record> uniqueRecords;
protected final AggregatorBuffer aggregatorBuffer;
protected int elaborated;
protected int duplicated;
protected int aggregated;
protected int unique;
protected RecoverOriginalRecords(File srcFile){
this.srcFile = srcFile;
this.duplicatedFile = new File(srcFile.getParentFile(), srcFile.getName().replaceAll(".bad", ".duplicated"));
this.cleanDstFile = new File(srcFile.getParentFile(), srcFile.getName().replaceAll(".bad", ""));
this.aggregatedFile = new File(srcFile.getParentFile(), cleanDstFile.getName().replaceAll("original", "aggregated"));
this.uniqueRecords= new HashMap<>();
this.aggregatorBuffer = new AggregatorBuffer();
this.elaborated = 0;
this.duplicated = 0;
this.aggregated = 0;
this.unique = 0;
}
protected void readFile() throws Exception {
try {
// Open the file that is the first // command line parameter
FileInputStream fstream = new FileInputStream(srcFile);
// Get the object of DataInputStream
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
// Read File Line By Line
while ((line = br.readLine()) != null) {
elaborateLine(line);
++elaborated;
}
br.close();
in.close();
fstream.close();
} catch (Exception e) {
logger.error("Error while elaborating file {}", srcFile.getAbsolutePath(), e);
throw e;
}
}
public void elaborate() throws Exception{
logger.info("Going to elaborate {}", srcFile.getAbsolutePath());
readFile();
afterElaboration();
}
protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = JsonObject.fromJson(line);
String id = jsonObject.getString(ID);
if(uniqueRecords.containsKey(id)){
logger.trace("Duplicated Original Record with ID {}", id);
Utility.printLine(duplicatedFile, line);
duplicated++;
}else{
Record record = RecordUtility.getRecord(line);
uniqueRecords.put(id, record);
Utility.printLine(cleanDstFile, line);
@SuppressWarnings("rawtypes")
AggregatedRecord aggregatedRecord = AggregatorBuffer.getAggregatedRecord(record);
aggregatorBuffer.aggregate(aggregatedRecord);
unique++;
}
}
/**
* Perform actions at the end of line by line elaboration
* @throws Exception
*/
protected void afterElaboration() throws Exception {
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord);
JsonObject jsonObject = JsonObject.fromJson(marshalled);
Utility.printLine(aggregatedFile, jsonObject.toString());
aggregated++;
}
}
}