First Release

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@132194 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2016-09-30 12:44:58 +00:00
parent dc3804ae7f
commit f3446308e9
2 changed files with 26 additions and 35 deletions

View File

@ -1,7 +1,5 @@
package org.gcube.accounting.aggregator.plugin;
import java.io.File;
import java.io.Serializable;
import java.text.SimpleDateFormat;
@ -63,6 +61,17 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
private final static String LINE_SUFFIX = "}";
private final static String KEY_VALUE_PAIR_SEPARATOR = ",";
private final static String KEY_VALUE_LINKER = "=";
public static Integer countInsert=0;
public static Integer countDelete=0;
public static Integer RecoveryMode=0;
/**
* @param runningPluginEvolution
*/
public AccountingAggregatorPlugin(AccountingAggregatorPluginDeclaration pluginDeclaration) {
super(pluginDeclaration);
}
/* The environment configuration */
protected static final CouchbaseEnvironment ENV =
@ -72,32 +81,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
.keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds
.build();
/**
* @param runningPluginEvolution
*/
public AccountingAggregatorPlugin(AccountingAggregatorPluginDeclaration pluginDeclaration) {
super(pluginDeclaration);
}
public static Integer countInsert=0;
public static Integer countDelete=0;
public static Integer RecoveryMode=0;
/**{@inheritDoc}*/
@Override
public void launch(Map<String, Object> inputs) throws Exception {
countInsert=0;
countDelete=0;
if(inputs == null || inputs.isEmpty()){
logger.debug("{} inputs {}", this.getClass().getSimpleName(), inputs);
throw new Exception("Inputs null");
}
//Type :HOURLY,DAILY,MONTHLY,YEARLY
//Interval: Number of hour,day,month,year
if (!inputs.containsKey("type") || !inputs.containsKey("interval"))
@ -126,12 +118,6 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
if (inputs.containsKey("recovery"))
RecoveryMode=(Integer)inputs.get("recovery");
//FOR DEBUG
//String scopeDebug="/gcube/devNext";
//ScopeProvider.instance.set(scopeDebug);
// END FOR DEBUG
logger.trace("Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,RecoveryMode);
//Get Configuration from service end point
@ -141,8 +127,6 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
AggregatorPersistenceBackendQueryConfiguration configuration;
try{
configuration = new AggregatorPersistenceBackendQueryConfiguration(PersistenceCouchBase.class);
url = configuration.getProperty(ConfigurationServiceEndpoint.URL_PROPERTY_KEY);
password = configuration.getProperty(ConfigurationServiceEndpoint.PASSWORD_PROPERTY_KEY);
if (inputs.containsKey("bucket"))
@ -201,8 +185,6 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
* Init folder for backup file
*/
public void initFolder(){
Constant.PATH_DIR_BACKUP=System.getProperty(Constant.HOME_SYSTEM_PROPERTY)+"/"+Constant.NAME_DIR_BACKUP;
Constant.PATH_DIR_BACKUP_INSERT=Constant.PATH_DIR_BACKUP+"/insert";
Constant.PATH_DIR_BACKUP_DELETE=Constant.PATH_DIR_BACKUP+"/delete";
@ -325,6 +307,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
/**
* Elaborate row for aggregate
* elaborateRow
* @param row
* @return
* @throws Exception
@ -336,13 +319,12 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
String document=row.value().toString().replace("\":", "=").replace("\"", "");
Map<String,? extends Serializable> map = getMapFromString(document);
@SuppressWarnings("rawtypes")
AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map);
aggregate.aggregate(record);
//insert an elaborate row into list JsonDocument for memory document elaborate
String identifier=(String) row.document().content().get("id");
JsonDocument documentJson = JsonDocument.create(identifier, row.document().content());
documentElaborate.add(documentJson);
return true;
@ -394,6 +376,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
/**
* Delete a record not aggregate and insert a new record aggregate
* If a problem with delete record, not insert a new record and save a backupfile
* reallyFlush
* @param aggregate
* @param docs
* @param nameFile
@ -457,6 +440,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
for (JsonDocument document: notInserted){
countInsert ++;
try{
@SuppressWarnings("unused")
JsonDocument response = accountingBucket.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
catch(Exception e){

View File

@ -4,6 +4,8 @@ import java.util.HashMap;
import java.util.Map;
import org.gcube.accounting.aggregator.madeaggregation.AggregationType;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -22,19 +24,24 @@ public class Tests {
@Test
public void testLaunch() throws Exception {
//FOR DEBUG
String scopeDebug="/gcube/devNext";
ScopeProvider.instance.set(scopeDebug);
// END FOR DEBUG
Map<String, Object> inputs = new HashMap<String, Object>();
//type aggregation
inputs.put("type",AggregationType.DAILY.name());
//period to be processed
inputs.put("interval",1 );
/* OPTIONAL INPUT */
//change to time
inputs.put("startTime", 10);
inputs.put("startTime", 20);
//specify bucket
//inputs.put("bucket","accounting_service");
inputs.put("bucket","accounting_service");
//current scope
inputs.put("currentScope",true);