work in progress: storageStatus resource for accounting

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@96701 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
roberto.cirillo 2014-06-04 17:18:07 +00:00
parent 53a73184c2
commit 3b40dced22
9 changed files with 311 additions and 34 deletions

View File

@ -42,7 +42,7 @@
<dependency>
<groupId>org.gcube.core</groupId>
<artifactId>common-scope</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>[1.2.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
</dependencies>
<build>

View File

@ -19,7 +19,10 @@ import org.slf4j.LoggerFactory;
public class ReportAccountingImpl implements Report {
final Logger logger = LoggerFactory.getLogger(ReportAccountingImpl.class);
public RawUsageRecord ur;
// storage usage record
public RawUsageRecord sur;
// storage status record
public RawUsageRecord ssr;
public ResourceAccounting raFactory;
@ -41,12 +44,12 @@ import org.slf4j.LoggerFactory;
public void start(String consumerId, String resourceScope, String creationTime) {
logger.info("set accounting properties: consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime);
if(raFactory==null) init();
this.ur = new RawUsageRecord();
this.sur = new RawUsageRecord();
// generic properties
ur.setResourceType("storage-usage");
if(consumerId!=null) ur.setConsumerId(consumerId);
sur.setResourceType("storage-usage");
if(consumerId!=null) sur.setConsumerId(consumerId);
// ur.setResourceOwner("paolo.fabriani");
if(resourceScope !=null) ur.setResourceScope(resourceScope);
if(resourceScope !=null) sur.setResourceScope(resourceScope);
//set creation time
if(creationTime!=null){
SimpleDateFormat formatter = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z");
@ -56,7 +59,7 @@ import org.slf4j.LoggerFactory;
} catch (ParseException e) {
logger.error("Error in parsing date: "+creationTime+" exc msg: "+e.getMessage());
}
ur.setCreateTime(date);
sur.setCreateTime(date);
}
}
@ -68,24 +71,36 @@ import org.slf4j.LoggerFactory;
@Override
public void ultimate(String owner, String operation, String size, String filePath, String id, String callerIP, String lastAccess) {
logger.info("set accounting properties: owner "+owner+" operation: "+operation+" size: "+size+ " remotePath: "+filePath+" id: "+id+"callerIP "+callerIP+" lastAccess"+lastAccess);
if(ur==null) this.ur = new RawUsageRecord();
if(sur==null) this.sur = new RawUsageRecord();
//specific properties
if(owner != null) ur.setResourceOwner(owner);
if (operation!=null) ur.setResourceSpecificProperty("operationType",operation);
if(size!= null) ur.setResourceSpecificProperty("dataVolume", size);
if(filePath != null) ur.setResourceSpecificProperty("remotePath", filePath);
if(id!= null) ur.setResourceSpecificProperty("id", id);
if(callerIP!=null) ur.setResourceSpecificProperty("callerIP", "etics.eng.it");
if(lastAccess!=null)ur.setResourceSpecificProperty("lastAccess", lastAccess);
if(owner != null) sur.setResourceOwner(owner);
if (operation!=null) sur.setResourceSpecificProperty("operationType",operation);
if(size!= null) sur.setResourceSpecificProperty("dataVolume", size);
if(filePath != null) sur.setResourceSpecificProperty("remotePath", filePath);
if(id!= null) sur.setResourceSpecificProperty("id", id);
if(callerIP!=null) sur.setResourceSpecificProperty("callerIP", callerIP);
if(lastAccess!=null){
sur.setResourceSpecificProperty("lastAccess", lastAccess);
// set the mandatory fields
Date date=null;
try {
date = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z").parse(lastAccess);
sur.setStartTime(date);
sur.setEndTime(date);
} catch (Exception e) {
e.printStackTrace();
}
// end mandatory files
}
// set static properties
ur.setResourceSpecificProperty("dataType","STORAGE");
ur.setResourceSpecificProperty("dataCount", "1");
sur.setResourceSpecificProperty("dataType","STORAGE");
sur.setResourceSpecificProperty("dataCount", "1");
}
@Override
public void send() {
logger.info("report sending...");
if(raFactory!=null)
raFactory.sendAccountingMessage(ur);
raFactory.sendAccountingMessage(sur);
else
logger.error("Problem on building accounting record: Factory Object is null ");
}
@ -99,7 +114,7 @@ import org.slf4j.LoggerFactory;
}
logger.info("caller ip: "+address);
if (address!=null) ur.setResourceSpecificProperty("callerIP", address);;
if (address!=null) sur.setResourceSpecificProperty("callerIP", address);;
}
private void setStartTime() {
@ -108,7 +123,7 @@ import org.slf4j.LoggerFactory;
Date time=startTime.getTime();
logger.info("set start time: "+time);
try {
ur.setStartTime(time);
sur.setStartTime(time);
}
catch (InvalidValueException e) {
e.printStackTrace();
@ -123,7 +138,7 @@ import org.slf4j.LoggerFactory;
sdf.format(time);
logger.info("set end time: "+time);
try {
ur.setEndTime(time);
sur.setEndTime(time);
}
catch (InvalidValueException e) {
e.printStackTrace();

View File

@ -119,6 +119,8 @@ public class JsonParser extends Thread{
private void report(String filename, String owner,
String creationTime, String id, long length,
String scope, String operation, String callerIP, String lastAccess, String lastUser) {
//convert from byte to kb
length=length/1024;
report.start(lastUser, scope, creationTime);
report.ultimate(owner, operation, length+"", filename, id, callerIP, lastAccess);
report.send();

View File

@ -9,17 +9,16 @@ import org.gcube.common.scope.impl.ServiceMapScannerMediator;
public class ValidationUtils {
public static boolean validationScope(String scope){
ScopeBean scopeBean=new ScopeBean(scope);
if((scopeBean.is(Type.VRE)))
scope=scopeBean.enclosingScope().toString();
Set<String> scopeSet=new ServiceMapScannerMediator().getScopeKeySet();
for(String scopeItem : scopeSet){
// System.out.println("scope scanned: "+scopeItem);
if(scope.equalsIgnoreCase(scopeItem))
return true;
}
return false;
// ScopeBean scopeBean=new ScopeBean(scope);
// System.out.println("scope type "+scopeBean.type());
// if((scopeBean.is(Type.INFRASTRUCTURE)) || (scopeBean.is(Type.VO)) || (scopeBean.is(Type.VRE)))
// return true;
// return false;
}
}

View File

@ -4,17 +4,20 @@ import java.util.Arrays;
import org.gcube.contentmanager.storageserver.data.CubbyHole;
import org.gcube.contentmanager.storageserver.data.ReadingMongoOplog;
import org.gcube.contentmanager.storageserver.parse.JsonParser;
//ClaSSPATH
import java.net.URL;
import java.net.URLClassLoader;
public class Startup {
public static void main(String[] args) {
for (int i=0; i<args.length;i++){
//NO show password
if(i==2)
System.out.println("param N." +i + ": *********");
else
System.out.println("param N." +i + ": " + args[i]);
}
// for (int i=0; i<args.length;i++){
// //NO show password
// if(i==2)
// System.out.println("param N." +i + ": *********");
// else
// System.out.println("param N." +i + ": " + args[i]);
// }
if(args.length != 1 && args.length != 3){
System.out.println("Usage:");
System.out.println("\tjava Startup ip user password\n\n");
@ -28,6 +31,16 @@ public class Startup {
return;
}
/*CLASSPATH*/
// System.out.println("show classpath: ");
//// ClassLoader cl = ClassLoader.getSystemClassLoader();
//// URL[] urls = ((URLClassLoader)cl).getURLs();
//// for(URL url: urls){
//// System.out.println(url.getFile());
//// }
// String classpath = System.getProperty("java.class.path");
// System.out.println("classpath:\n"+classpath);
/*END CLASSPATH*/
CubbyHole c = new CubbyHole();
ReadingMongoOplog producer=null;
if(args.length == 3)

View File

@ -0,0 +1,165 @@
package org.gcube.contentmanager.storageserver.store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoOptions;
import com.mongodb.gridfs.GridFS;
public class MongoDB {
private Mongo mongo;
private DB db;
private String[] server;
private String collection;
private int port;
private String pwd;
private String user;
private DBCollection ssCollection;
Logger logger = LoggerFactory.getLogger(MongoDB.class);
private static final String ACCOUNTING_DB="accounting";
private static final String DEFAULT_COLLECTION="storageStatus";
public MongoDB(String[] server, int port, String user, String password, String collectionName){
this.server=server;
this.port=port;
this.pwd=password;
this.user=user;
this.collection=collectionName;
db=getDB();
ssCollection = db.getCollection(collectionName);
}
public MongoDB(String[] server, String user, String password, String collectionName){
this.server=server;
this.pwd=password;
this.user=user;
this.collection=collectionName;
db=getDB();
ssCollection = db.getCollection(collectionName);
}
public MongoDB(String[] server, String user, String password){
this.server=server;
this.pwd=password;
this.user=user;
this.collection=DEFAULT_COLLECTION;
db=getDB();
ssCollection = db.getCollection(collection);
}
private DB getDB() {
if(db != null){
// check if the old server is primary
try{
DB db = mongo.getDB(ACCOUNTING_DB);
}catch(Exception e ){
logger.warn("the server now is not a primary ");
db=null;
}
}
if(db==null){
int i=-1;
for(String srv : server){
try {
i++;
if(mongo!=null)
mongo.close();
MongoOptions options=new MongoOptions();
options.autoConnectRetry=true;
options.socketKeepAlive=true;
options.maxWaitTime=240000;
options.connectionsPerHost=35;
mongo = new Mongo(srv, options);
// mongo.getMongoOptions().autoConnectRetry=true;
// mongo.getMongoOptions().socketKeepAlive=true;
logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions());
db = mongo.getDB(ACCOUNTING_DB);
// check on user and password for non authenticate mode
if(user==null) user="";
if(pwd==null) pwd="";
boolean auth = db.authenticate(user, pwd.toCharArray());
if(auth) logger.info("mongo is in authenticate mode");
else logger.info("mongo is not in authenticate mode");
// GridFS gfs = new GridFS(db);
String firstServer = server[0];
server[0] = srv;
server[i]=firstServer;
break;
} catch (Exception e) {
logger.warn("server "+srv+" is not a primary retry ");
continue;
}
}
}
return db;
}
public void put(String consumer, long volume, int count){
// StorageStatusRecord ssr=new StorageStatusRecord(consumer, volume, count);
BasicDBObject doc = new BasicDBObject("consumer", consumer)
.append("volume", volume)
.append("count", count);
// .append("info", new BasicDBObject("x", 203).append("y", 102));
ssCollection.insert(doc);
}
public void update(String consumer, long volume, int count){
StorageStatusRecord ssr=get(consumer);
if(ssr != null){
int currentCount=ssr.getCount();
currentCount=currentCount+count;
ssr.setCount(currentCount);
long currentVolume=ssr.getVolume();
currentVolume=currentVolume+volume;
String id=ssr.getId();
}else{
put(consumer, volume, count);
}
}
public StorageStatusRecord get(String consumer){
BasicDBObject query = new BasicDBObject("consumer", consumer);
DBCursor cursor=ssCollection.find(query);
// if(cursor.length()>1){
// logger.error("found more than one mongodb objects with consumer: "+consumer);
// return null;
// }else if(cursor== null){
// logger.info("record with consumer: "+consumer+" not found");
// return null;
// }else{
DBObject obj=null;
try{
if(cursor.hasNext()){
obj=cursor.next();
}
}finally{
cursor.close();
}
if(obj!=null){
String cons=null;
if(obj.containsField("consumer")) cons=(String) obj.get("consumer");
else logger.error("incomplete record found. consumer field is missing");
long vol =0;
if(obj.containsField("volume")) vol=(long) obj.get("volume");
else logger.error("incomplete record found. volume field is missing");
int count=0;
if(obj.containsField("count")) count=(int) obj.get("count");
else logger.error("incomplete record found. count field is missing");
String id=(String)obj.get("id");
return new StorageStatusRecord(id, cons, vol, count);
}else return null;
// }
}
}

View File

@ -0,0 +1,53 @@
package org.gcube.contentmanager.storageserver.store;
public class StorageStatusRecord {
private String consumer;
private long volume;
private int count;
private String id;
public StorageStatusRecord(String id, String consumer, long volume, int count){
this.id=id;
this.consumer=consumer;
this.volume=volume;
this.count=count;
}
public String getConsumer() {
return consumer;
}
public void setConsumer(String consumer) {
this.consumer = consumer;
}
public long getVolume() {
return volume;
}
public void setVolume(long volume) {
this.volume = volume;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -0,0 +1,30 @@
package org.gcube.contentmanager.storageserver.store;
import static org.junit.Assert.*;
import org.junit.BeforeClass;
import org.junit.Test;
public class MongoDBTest {
private static String[] server={"146.48.123.72","146.48.123.71"};
private static MongoDB mongo;
// @BeforeClass
public static void init(){
mongo=new MongoDB(server, null, null);
}
// @Test
public void putAndRetrieve(){
mongo.update("test.consumer", 100, -1);
}
// @Test
public void test() {
fail("Not yet implemented");
}
}

View File

@ -8,7 +8,7 @@ import org.junit.Test;
public class ValidationScopeTest {
private String scope="/gcube/devsec";
private String scope="/d4science.research-infrastructures.eu/gCubeApps";
@Test
public void test() {