Version 1.1.0

add input endScriptTime 

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@148447 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Alessandro Pieve 2017-05-10 13:01:46 +00:00
parent 00490b54ad
commit 2b16e5a527
11 changed files with 119 additions and 91 deletions

View File

@ -20,12 +20,12 @@ Please see the file named "changelog.xml" in this directory for the release note
Authors
--------------------------------------------------
* Luca Frosini (luca.frosini-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
* Alessandro Pieve (alessandro.pieve-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
Maintainers
-----------
* Luca Frosini (luca.frosini-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
* Alessandro Pieve (alessandro.pieve-AT-isti.cnr.it), Istituto di Scienza e Tecnologie dell'Informazione "A. Faedo" - CNR, Pisa (Italy).
Download information

17
pom.xml
View File

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-aggregator-se-plugin</artifactId>
<version>1.0.1-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<name>Accounting Aggregator</name>
<description>Accounting Aggregator Smart Executor Plugin</description>
@ -42,21 +42,14 @@
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>common-authorization</artifactId>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-client</artifactId>
</dependency>
<dependency>
<groupId>org.gcube.core</groupId>
<artifactId>common-scope</artifactId>
@ -68,7 +61,6 @@
<artifactId>home-library-jcr</artifactId>
<version>[2.0.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>home-library</artifactId>
@ -76,18 +68,15 @@
<scope>compile</scope>
</dependency>
<!-- END Home Library -->
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>common-authorization</artifactId>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -114,15 +103,13 @@
<version>[1.0.1-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<artifactId>document-store-lib</artifactId>
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gcube.accounting</groupId>
<artifactId>accounting-lib</artifactId>
<version>[2.2.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -35,9 +35,6 @@ public class Aggregation {
//list Aggregate record
protected Map<String, List<AggregatedRecord<?,?>>> bufferedRecords = new HashMap<String, List<AggregatedRecord<?,?>>>();
public Aggregation() {
super();
}
@ -80,15 +77,12 @@ public class Aggregation {
}
AggregationUtility util = new AggregationUtility(bufferedRecord);
//verify a record is aggregable
//logger.debug("record: {}",record.toString());
if (util.isAggregable(record)){
try {
AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord) bufferedRecord;
// TODO check compatibility using getAggregable
//logger.debug("if -- madeAggregation aggregate");
bufferedAggregatedRecord.aggregate((AggregatedRecord) record);
//patch for not changed a creation time
//bufferedAggregatedRecord.setCreationTime(bufferedAggregatedRecord.getStartTime());
bufferedAggregatedRecord.setCreationTime(record.getCreationTime());
found = true;
break;
@ -97,20 +91,15 @@ public class Aggregation {
}
}
}
if(!found){
//logger.debug("Aggregated Record not found with execption");
//logger.debug("if -- madeAggregation not found with execption add");
records.add(record);
totalBufferedRecords++;
return;
}
}else{
//logger.debug("else if record contains "+recordType);
records = new ArrayList<AggregatedRecord<?,?>>();
try {
//logger.debug("else -- add getAggregatedRecord");
records.add(getAggregatedRecord(record));
} catch (Exception e) {
logger.debug("pre Exception but records");
@ -122,8 +111,6 @@ public class Aggregation {
}
}
/**
* Reset buffer records
*/
@ -132,7 +119,6 @@ public class Aggregation {
bufferedRecords.clear();
}
/**
*
* @return
@ -177,12 +163,5 @@ public class Aggregation {
madeAggregation(record);
}
}
}

View File

@ -91,7 +91,6 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
Serializable recordValue = record.getResourceProperty(field);
Serializable thisValue = t.getResourceProperty(field);
//logger.error("isAggregable-field:{} ,recordValue:{}, thisValue:{}",field,recordValue,thisValue);
if(recordValue instanceof Comparable && thisValue instanceof Comparable){
@SuppressWarnings("rawtypes")
@ -103,14 +102,7 @@ public class AggregationUtility<T extends AggregatedRecord<T,?>> {
return false;
}
}else{
/*
if (recordValue==null){
//logger.trace("{} != {}", recordValue, thisValue);
return false;
}
*/
if(recordValue.hashCode()!=this.hashCode()){
//logger.trace("{} != {}", recordValue, thisValue);
return false;
}

View File

@ -9,7 +9,6 @@ package org.gcube.accounting.aggregator.persistence;
public interface AggregatorPersistenceBackendQuery {
public static final int KEY_VALUES_LIMIT = 25;
public void prepareConnection(
AggregatorPersistenceBackendQueryConfiguration configuration)
throws Exception;

View File

@ -17,7 +17,7 @@ public class AggregatorPersistenceBackendQueryConfiguration extends AccountingPe
}
/**
* @param class1 The class of the persistence to instantiate
* @param class The class of the persistence to instantiate
* @throws Exception if fails
*/
@SuppressWarnings({ "unchecked", "rawtypes" })

View File

@ -1,10 +1,16 @@
package org.gcube.accounting.aggregator.plugin;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.PrintStream;
import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -18,13 +24,20 @@ import org.gcube.accounting.aggregator.madeaggregation.Aggregation;
import org.gcube.accounting.aggregator.madeaggregation.AggregationType;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceBackendQueryConfiguration;
import org.gcube.accounting.aggregator.recovery.RecoveryRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedJobUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedPortletUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedTaskUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.PortletUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.TaskUsageRecord;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.persistence.PersistenceCouchBase;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.slf4j.Logger;
@ -105,15 +118,37 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
throw new IllegalArgumentException("Interval and type must be defined");
AggregationType aggType =AggregationType.valueOf((String)inputs.get("type"));
Integer interval=(Integer)inputs.get("interval")* aggType.getMultiplierFactor();
Integer intervaTot=(Integer)inputs.get("interval");
Integer interval=intervaTot* aggType.getMultiplierFactor();
//new feature for not elaborate the full range but a set of small intervals
if (inputs.containsKey("intervalStep"))
interval=(Integer)inputs.get("intervalStep");
Integer inputStartTime=null;
String pathFile = null;
if (inputs.containsKey("startTime"))
inputStartTime=(Integer)inputs.get("startTime");
else{
//get start time with file
logger.debug("Attention get start Time from file");
if (inputs.containsKey("pathFile")){
//get start time from file
pathFile=(String) inputs.get("pathFile");
logger.error("open file:{}",pathFile);
BufferedReader reader = new BufferedReader(new FileReader(pathFile));
String line;
while ((line = reader.readLine()) != null)
{
line=line.trim();
inputStartTime=Integer.valueOf(line);
logger.debug("Start Time:{}",inputStartTime);
}
reader.close();
}
}
Boolean currentScope =false;
String scope=null;
@ -183,12 +218,34 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
Cluster cluster = CouchbaseCluster.create(ENV, url);
//Define a type for aggregate
RecordUtility.addRecordPackage(PortletUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedPortletUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(JobUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedJobUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(TaskUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedTaskUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(StorageUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedStorageUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage());
RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage());
//end define
Date today = new Date();
Date endScriptTime = new Date();
if (inputs.containsKey("endScriptTime")){
DateFormat df = new SimpleDateFormat ("MM/dd/yyyy HH:mm");
endScriptTime = df.parse ((today.getMonth()+1)+"/"+today.getDate()+"/"+(today.getYear()+1900) +" "+(String)inputs.get("endScriptTime"));
logger.debug("Script Run until :{}"+endScriptTime);
}
do {
logger.debug("--Start Time Loop:{}"+inputStartTime);
initFolder();
if ((recoveryMode==2)||(recoveryMode==0)){
logger.debug("Recovery mode enabled");
RecoveryRecord.searchFile(cluster,configuration);
@ -201,10 +258,20 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
//elaborate bucket, with scope, type aggregation and interval
elaborateBucket(bucket,scope, inputStartTime, interval, aggType);
}
if (inputs.containsKey("pathFile")){
//update a file for new start time
FileOutputStream file = new FileOutputStream(pathFile);
PrintStream output = new PrintStream(file);
logger.debug("Update pathfile:{} with new start time:{}",pathFile,inputStartTime-intervaTot);
output.println(inputStartTime-intervaTot);
inputStartTime=inputStartTime-intervaTot;
today = new Date();
}
logger.debug("Complete countInsert{}, countDelete{}",countInsert,countDelete);
}
} while(today.compareTo(endScriptTime)<0);
logger.debug("Plugin Terminated");
}
@ -281,7 +348,6 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
if (backup){
logger.debug("Start Backup");
WorkSpaceManagement.onSaveBackupFile(accountingBucket,bucket,scope,startAllKeyString, endAllKeyString,aggType);
//logger.debug("Backup complete startKeyString{}, endKeyString{}",startAllKeyString,endAllKeyString);
}
else
logger.debug("No Backup required");
@ -317,7 +383,6 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
} catch (Exception e) {
logger.error("Exception error VIEW",e.getLocalizedMessage(),e);
//throw e;
}
// Iterate through the returned ViewRows
@ -328,13 +393,15 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
for (ViewRow row : viewResult)
resultElaborate=elaborateRow(row,documentElaborate);
logger.debug("End elaborate row");
//File backup have a name with scope e
//Backup File saved
String nameFileBackup="";
if (scope!=null)
nameFileBackup=scope.replace("/", "")+"-"+startKeyString+"-"+endKeyString;
else
nameFileBackup=startKeyString+"-"+endKeyString;
//save into db (delete no aggregate record and insert a record aggregate)
reallyFlush(aggregate,documentElaborate,nameFileBackup);
endKeyString = startKeyString;
@ -450,6 +517,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
Integer index=0;
boolean succesfulDelete=false;
logger.trace("Start a delete document:{}",docs.size());
//before elaborate a record, create a backup file
List<JsonDocument> notDeleted = docs;
List<JsonDocument> notInserted = aggregate.reallyFlush();
@ -506,6 +574,7 @@ public class AccountingAggregatorPlugin extends Plugin<AccountingAggregatorPlugi
logger.debug("notDeletedTemp size:{} notDeleted:{}",notDeletedTemp.size(),notDeleted.size());
logger.debug("Delete complete:{}, Start a insert aggregated document:{}",countDelete,notInserted.size());
// delete all record and ready for insert a new aggregated record
if (succesfulDelete){
//if successful record delete, delete backup file

View File

@ -12,8 +12,6 @@ public enum DesignID {
accounting_job("accounting_job","JobUsageRecordAggregated","all","scope"),
accounting_task("accounting_task","TaskUsageRecordAggregated","all","scope");
private String nameBucket;
private String nameDesign;
private String nameView;

View File

@ -38,7 +38,7 @@ public class WorkSpaceManagement {
public static Logger logger = LoggerFactory.getLogger(Aggregation.class);
/**
* Save a backup file compressed into workspace
* Save a compressed backup file into workspace
* @param bucket
* @param startKeyString
* @param endKeyString
@ -90,7 +90,7 @@ public class WorkSpaceManagement {
throw e;
}
//manage error
BufferedWriter filebackup =null;
File logFile = new File(namePathFile);
logFile.delete();
@ -235,7 +235,6 @@ public class WorkSpaceManagement {
ws.remove(name, folderId);
Thread.sleep(2000);
ws.createExternalFile(name, description, null, inputStream, folderId);
//ws.updateItem(projectItem.getId(), inputStream);
}
return;
} catch (Exception e) {

View File

@ -55,7 +55,6 @@ public class RecoveryRecord {
*/
protected static void prepareConnection(Cluster cluster,AggregatorPersistenceBackendQueryConfiguration configuration) throws Exception {
String url = configuration.getProperty(ConfigurationServiceEndpoint.URL_PROPERTY_KEY);
String password = configuration.getProperty(ConfigurationServiceEndpoint.PASSWORD_PROPERTY_KEY);
try {
@ -89,7 +88,7 @@ public class RecoveryRecord {
}
@SuppressWarnings("null")
public static void searchFile(Cluster cluster,AggregatorPersistenceBackendQueryConfiguration configuration) throws Exception{
try{
@ -136,7 +135,6 @@ public class RecoveryRecord {
logger.error("Error for list file:{}",e);
}
//cluster.disconnect();
}
public static boolean ElaborateDeleteFile(String nameFile) throws IOException{
HashMap<String, Object> mapper = new Gson().fromJson(new FileReader(new File(nameFile)), HashMap.class);
@ -208,27 +206,28 @@ public class RecoveryRecord {
usageRecordType="";
if (recordType==null)
recordType="";
JsonDocument response = null;
if ((recordType.equals("ServiceUsageRecord")) || (usageRecordType.equals("ServiceUsageRecord"))){
JsonDocument document = JsonDocument.create(identifier, accounting);
JsonDocument response = bucketService.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
response = bucketService.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
if ((recordType.equals("StorageUsageRecord")) || (usageRecordType.equals("StorageUsageRecord"))){
JsonDocument document = JsonDocument.create(identifier, accounting);
JsonDocument response = bucketStorage.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
response = bucketStorage.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
if ((recordType.equals("JobUsageRecord")) || (usageRecordType.equals("JobUsageRecord"))){
JsonDocument document = JsonDocument.create(identifier, accounting);
JsonDocument response = bucketJob.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
response = bucketJob.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
if ((recordType.equals("TaskUsageRecord")) || (usageRecordType.equals("TaskUsageRecord"))){
JsonDocument document = JsonDocument.create(identifier, accounting);
JsonDocument response = bucketTask.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
response = bucketTask.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
if ((recordType.equals("PortletUsageRecord")) || (usageRecordType.equals("PortletUsageRecord"))){
JsonDocument document = JsonDocument.create(identifier, accounting);
JsonDocument response = bucketPortlet.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
response = bucketPortlet.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
}
logger.trace("Elaborate Insert fileJsondocument response:{}",response);
}catch(Exception e){
logger.error("Problem with recovery file and insert record excepiton:{}",e.getLocalizedMessage());
throw e;

View File

@ -37,7 +37,10 @@ public class Tests {
inputs.put("interval",1 );
/* OPTIONAL INPUT */
//change to time
inputs.put("startTime", 6);
//inputs.put("startTime", 6);
//inputs.put("pathFile","/home/pieve/startTime");
//inputs.put("endScriptTime","16:00");
//specify bucket
inputs.put("bucket","accounting_service");
@ -45,6 +48,9 @@ public class Tests {
//current scope
inputs.put("currentScope",false);
//specify user for save to workspace
/*OPTIONAL INPUT for work a partial interval */
//inputs.put("intervalStep",6);
//specify a recovery 0 default recovery and aggregate, 1 only aggregate, 2 only recovery
inputs.put("recovery",0);