refactory code for consumers

added new consumer for folder records


git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@100857 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
roberto.cirillo 2014-10-22 10:38:13 +00:00
parent 165ad4ee13
commit d2ef6cf128
12 changed files with 780 additions and 172 deletions

View File

@ -0,0 +1,253 @@
package org.gcube.contentmanager.storageserver.consumer;
import org.gcube.accounting.datamodel.RawUsageRecord;
import org.gcube.contentmanager.storageserver.accounting.Report;
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
import org.gcube.contentmanager.storageserver.accounting.ReportException;
import org.gcube.contentmanager.storageserver.accounting.ReportFactory;
import org.gcube.contentmanager.storageserver.data.CubbyHole;
import org.gcube.contentmanager.storageserver.parse.utils.ValidationUtils;
import org.gcube.contentmanager.storageserver.store.FolderStatusOperationManager;
import org.gcube.contentmanager.storageserver.store.FolderStatusRecord;
import org.gcube.contentmanager.storageserver.store.MongoDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.DBObject;
public class FolderAccountingConsumer extends Thread{
final static Logger logger=LoggerFactory.getLogger(FolderAccountingConsumer.class);
final static int MINUTE_DECREMENT=-2;
private CubbyHole c;
private int number;
private Report report;
// object fields
private String op;
private String filename;
private String type;
private String name;
private String owner;
private String creationTime;
private String lastAccess;
private String lastOperation;
private String lastUser;
private int linkCount;
private String delete;
private String callerIp;
private String user;
private String password;
String[] server;
private String from;
public FolderAccountingConsumer(String[] srvs, CubbyHole c, int number){
this.c=c;
this.number=number;
this.server=srvs;
// init the accounting report
try {
init();
} catch (ReportException e) {
throw new RuntimeException("Accounting report Exception initialization");
}
}
public FolderAccountingConsumer(String[] srvs, String user, String password, CubbyHole c, int number){
this.c=c;
this.number=number;
this.server=srvs;
this.user=user;
this.password=password;
// init the accounting report
try {
init();
} catch (ReportException e) {
throw new RuntimeException("Accounting report Exception initialization");
}
}
private void init() throws ReportException{
report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE);
report.init();
}
public void run() {
while(true){
DBObject x=null;
MongoDB mongo=null;
try{
x=c.get();
logger.debug("Consumer #" + this.number + " got: " + x);
op = (String) x.get("op");
// retrieve object fields
DBObject obj=(DBObject)x.get("o");
retrieveObjectFields(obj);
// set object dimension
long length=-1;
if(obj.get("length")!=null) length=(long)obj.get("length");
logger.debug("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){
//convert from byte to kb
length=length/1024;
// check scope
String scope=null;
if((filename!=null)&& (filename.contains("/"))){
scope=retrieveScopeFromRemoteFilePath(filename);
}else{
// field added on storage manager library for retrieve scope. Used only if it is a link delete
String pathString=(String)obj.get("onScope");
scope=retrieveScopeFromRemoteFilePath(pathString);
}
logger.info("update folders: "+filename);
logger.info("\n[FolderAccountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
boolean validScope=ValidationUtils.validationScope(scope);
if(validScope){
if(delete!=null){
lastOperation="DELETE";
}else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){
// it is an update on a link object this operation doesn't be accounted
logger.info("update on link object is not accounted. Skip next ");
continue;
}else if((lastOperation==null) || (lastUser==null)){
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
continue;
}else{
// the record is a valid record for folder accounting
logger.info("this is a valid record for folder accounting. build folder record ");
FolderStatusRecord fsr=null;
if(isNeedFSReport(lastOperation)){
try{
mongo=new MongoDB(server, user, password);
if(lastOperation.equalsIgnoreCase("COPY"))
owner=lastUser;
fsr=new FolderStatusRecord(filename, length, 1, lastAccess, from);
fsr=mongo.updateFolderVolume(fsr, lastOperation);
// if it is a Move operation it is need to update the original folder.
if(lastOperation.equalsIgnoreCase("MOVE")){
if((from != null)){
fsr=new FolderStatusRecord(from, length, 1, lastAccess, null);
fsr= mongo.updateFolderVolume(fsr, "DELETE");
}else{
logger.error("this is a move operation but the original path folder is missing. Maybe the storage-manager library is not update on the client. skip next");
}
}
mongo.close();
}catch(Exception e){
logger.error("Problem in updating storage status record: "+e.getMessage());
}
}
}
}
}else{
logger.info("operation "+lastOperation+" is not accounted");
}
}catch(Exception e){
e.printStackTrace();
logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getStackTrace());
logger.info("skip to next record ");
if(mongo!=null)
mongo.close();
}
}
}
private boolean isNeedSSReport(String lastOperation) {
if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE"))
return true;
return false;
}
public void runWithoutThread(DBObject x){
try {
report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE);
} catch (ReportException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
report.init();
run();
}
private void retrieveObjectFields(DBObject obj) {
filename = (String) obj.get("filename");
type = (String) obj.get("type");
name = (String) obj.get("name");
owner = (String) obj.get("owner");
creationTime = (String) obj.get("creationTime");
lastAccess = null;
// only for move operation is needded this field: from
from=(String)obj.get("from");
if(obj.get("lastAccess") != null) lastAccess=(String)obj.get("lastAccess");
callerIp = null;
if(obj.get("callerIP") != null) callerIp=(String)obj.get("callerIP");
lastOperation = null;
if(obj.get("lastOperation") != null) lastOperation=(String)obj.get("lastOperation");
lastUser = null;
if(obj.get("lastUser") != null) lastUser=(String)obj.get("lastUser");
linkCount = 0;
if(obj.get("linkCount") != null) linkCount=(int)obj.get("linkCount");
delete = null;
if(obj.get("onDeleting") != null) delete=(String)obj.get("onDeleting");
// ObjectId objectId=(ObjectId)obj.get("_id");
// id = objectId.toString();
}
private void report(String filename, String owner,
String creationTime, long length,
String scope, String operation, String callerIP, String lastAccess, String lastUser, String totVolume, String totCount) {
// ACCOUNTING CALL TYPE: STORAGE USAGE
RawUsageRecord sur=report.setGenericProperties("storage-usage",lastUser, scope, creationTime, lastAccess, owner);
sur=report.setSpecificProperties(sur, operation, length+"", filename, callerIP, "STORAGE", "1");
logger.info("[accounting call] type: storage usage ");
report.printRecord(sur);
report.send(sur);
// ACCOUNTING CALL TYPE: STORAGE STATUS
logger.debug("set properties: totVolume: "+totVolume+" totCount "+totCount);
if((totVolume!=null) && (totCount!=null)){
RawUsageRecord ssr=report.setGenericProperties("storage-status",lastUser, scope, creationTime, lastAccess, owner);
ssr=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount);
logger.info("[accounting call] type: storage status ");
report.printRecord(ssr);
report.send(ssr);
}else{
logger.info("StorageStatus record not send for operation "+lastOperation);
}
}
private String retrieveScopeFromRemoteFilePath(String filename) {
String[] split=filename.split("/");
if(split.length>0){
String scope=null;
int i=1;
if(split[1].equals("VOLATILE")){
i=2;
}
scope="/"+split[i];
i++;
while((!split[i].equals("home")) && (!split[i].equals("public"))){
scope=scope+"/"+split[i];
i++;
}
logger.info("retieved scope: "+scope);
return scope;
}else logger.error("Scope bad format: scope not retrieved from string: "+filename);
return null;
}
/**
* check if the lastOperation is a valid operation for folder status information record
* @param lastOperation
* @return
*/
private boolean isNeedFSReport(String lastOperation) {
if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE") || lastOperation.equalsIgnoreCase("MOVE"))
return true;
return false;
}
}

View File

@ -1,4 +1,4 @@
package org.gcube.contentmanager.storageserver.parse;
package org.gcube.contentmanager.storageserver.consumer;
import java.util.List;
@ -16,9 +16,9 @@ import org.slf4j.LoggerFactory;
import com.mongodb.DBObject;
public class JsonParser extends Thread{
public class UserAccountingConsumer extends Thread{
final static Logger logger=LoggerFactory.getLogger(JsonParser.class);
final static Logger logger=LoggerFactory.getLogger(UserAccountingConsumer.class);
final static int MINUTE_DECREMENT=-2;
private CubbyHole c;
private int number;
@ -41,7 +41,7 @@ public class JsonParser extends Thread{
String[] server;
List<String> dtsHosts;
public JsonParser(String[] srvs, CubbyHole c, int number,List<String> dtsHosts){
public UserAccountingConsumer(String[] srvs, CubbyHole c, int number,List<String> dtsHosts){
this.c=c;
this.number=number;
this.server=srvs;
@ -54,7 +54,7 @@ public class JsonParser extends Thread{
}
}
public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number, List<String> dtsHosts){
public UserAccountingConsumer(String[] srvs, String user, String password, CubbyHole c, int number, List<String> dtsHosts){
this.c=c;
this.number=number;
this.server=srvs;
@ -112,17 +112,19 @@ public class JsonParser extends Thread{
continue;
}else if((lastOperation==null) || (lastUser==null)){
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
}
// if the lastoperation is download and the caller contains a dts host then it isn't a real user but it is a workspace accounting record
if(lastOperation != null && lastOperation.equalsIgnoreCase("DOWNLOAD")){
logger.debug("check if the caller is from dts. CallerIP: "+callerIp);
for(String host: dtsHosts){
logger.debug("scan "+host);
if(callerIp.contains(host)){
lastUser="workspace.accounting";
logger.info("the caller is dts service: caller "+callerIp+ " dts host: "+host+" the new user is: "+lastUser);
}
}
continue;
}else{
// if the lastoperation is download and the caller contains a dts host then it isn't a real user but it is a workspace accounting record
if((dtsHosts != null) && (lastOperation != null) && (lastOperation.equalsIgnoreCase("DOWNLOAD"))){
logger.debug("check if the caller is from dts. CallerIP: "+callerIp);
for(String host: dtsHosts){
logger.debug("scan "+host);
if(callerIp.contains(host)){
lastUser="workspace.accounting";
logger.info("the caller is dts service: caller "+callerIp+ " dts host: "+host+" the new user is: "+lastUser);
}
}
}
}
logger.debug(" operation accounted "+lastOperation);
StorageStatusRecord ssr=null;
@ -131,7 +133,8 @@ public class JsonParser extends Thread{
mongo=new MongoDB(server, user, password);
if(lastOperation.equalsIgnoreCase("COPY"))
owner=lastUser;
ssr=mongo.update(owner, length, 1, lastOperation);
ssr=new StorageStatusRecord(owner, length, 1);
ssr=mongo.updateUserVolume(ssr, lastOperation);
mongo.close();
}catch(Exception e){
logger.error("Problem in updating storage status record: "+e.getMessage());
@ -165,12 +168,17 @@ public class JsonParser extends Thread{
}
}
/**
* check if the lastOperation is a valid operation for storage status information record
* @param lastOperation
* @return
*/
private boolean isNeedSSReport(String lastOperation) {
if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE"))
return true;
return false;
}
public void runWithoutThread(DBObject x){
try {
@ -249,18 +257,5 @@ public class JsonParser extends Thread{
}else logger.error("Scope bad format: scope not retrieved from string: "+filename);
return null;
}
// private StorageStatusRecord getStorageStatusRecord(String[] server, String user, String password, String owner, String lastUser, long length, String lastOperation, MongoDB mongo){
// StorageStatusRecord ssr=null;
// try{
// mongo=new MongoDB(server, user, password);
// ssr=mongo.update(lastUser, length, 1, lastOperation);
// mongo.close();
// }catch(Exception e){
// logger.error("Problem in updating storage status record: "+e.getMessage());
// }
// return ssr;
// }
}

View File

@ -14,7 +14,7 @@ import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import org.bson.types.BSONTimestamp;
import org.gcube.contentmanager.storageserver.parse.JsonParser;
import org.gcube.contentmanager.storageserver.consumer.UserAccountingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,14 +26,16 @@ public class ReadingMongoOplog extends Thread{
private Mongo mongoClient;
private DB local;
private DBCollection oplog;
private CubbyHole c;
private CubbyHole c1;
private CubbyHole c2;
private String user;
private String password;
private int number;
private List<String> srvs;
public ReadingMongoOplog(List<String> srvs, CubbyHole c, int numberT){
this.c=c;
public ReadingMongoOplog(List<String> srvs, CubbyHole c1, CubbyHole c2, int numberT){
this.c1=c1;
this.c2=c2;
this.number=numberT;
this.srvs=srvs;
setupServerAddress(srvs);
@ -42,8 +44,9 @@ public class ReadingMongoOplog extends Thread{
public ReadingMongoOplog(List<String> srvs, String user,
String password, CubbyHole c, int numberT) {
this.c=c;
String password, CubbyHole c1, CubbyHole c2, int numberT) {
this.c1=c1;
this.c2=c2;
this.number=numberT;
this.user=user;
this.password=password;
@ -73,7 +76,10 @@ public class ReadingMongoOplog extends Thread{
// check if discard or process the current DB record
if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
if(x.containsField("o")){
c.put(x);
// c1 buffer for suer accounting
c1.put(x);
// c2 buffer for folder accounting (TODO)
c2.put(x);
// parser.runWithoutThread(x);
logger.debug("Producer #" + this.number + " put: " + x);
}else{

View File

@ -0,0 +1,7 @@
package org.gcube.contentmanager.storageserver.parse.utils;
public class ParserUtils {
}

View File

@ -28,9 +28,12 @@ public class Configuration {
private String username;
private String password;
private String backendType;
private ArrayList<String> dtsHosts;
private boolean activeDTSFilter;
Logger logger= LoggerFactory.getLogger(Configuration.class);
public Configuration(String scope, String user, String password){
public Configuration(String scope, String user, String password, boolean dtsFilter){
this.activeDTSFilter=dtsFilter;
this.scope=scope;
if(!ValidationUtils.validationScope(scope))
throw new IllegalArgumentException("invalid scope exception: "+scope);
@ -175,19 +178,21 @@ public class Configuration {
}
public List<String> retrieveDTSHosts(){
ArrayList<String> scopes=ValidationUtils.getVOScopes(scope);
ArrayList<String> hosts= new ArrayList<String>();
for(String currentScope:scopes){
String host=getHosts("DataTransformation", "DataTransformationService", currentScope);
logger.debug("host found: "+host+ " in scope: "+currentScope);
if(host!=null)
hosts.add(host);
if(activeDTSFilter){
ArrayList<String> scopes=ValidationUtils.getVOScopes(scope);
dtsHosts= new ArrayList<String>();
for(String currentScope:scopes){
String host=getHosts("DataTransformation", "DataTransformationService", currentScope);
logger.debug("host found: "+host+ " in scope: "+currentScope);
if(host!=null)
dtsHosts.add(host);
}
for(String host : hosts){
logger.debug("DTS host: "+host);
}
return hosts;
}
for(String host : dtsHosts){
logger.debug("DTS host: "+host);
}
return dtsHosts;
}else return null;
}
public String getHosts(String serviceClass, String serviceName) {
@ -228,5 +233,45 @@ public class Configuration {
return host;
}
public String getScope() {
return scope;
}
public void setScope(String scope) {
this.scope = scope;
}
public String[] getServer() {
return server;
}
public void setServer(String[] server) {
this.server = server;
}
public String getBackendType() {
return backendType;
}
public void setBackendType(String backendType) {
this.backendType = backendType;
}
public ArrayList<String> getDtsHosts() {
return dtsHosts;
}
public void setDtsHosts(ArrayList<String> dtsHosts) {
this.dtsHosts = dtsHosts;
}
public boolean isActiveDTSFilter() {
return activeDTSFilter;
}
public void setActiveDTSFilter(boolean activeDTSFilter) {
this.activeDTSFilter = activeDTSFilter;
}
}

View File

@ -2,9 +2,11 @@ package org.gcube.contentmanager.storageserver.startup;
import java.util.Arrays;
import java.util.List;
import org.gcube.contentmanager.storageserver.consumer.FolderAccountingConsumer;
import org.gcube.contentmanager.storageserver.consumer.UserAccountingConsumer;
import org.gcube.contentmanager.storageserver.data.CubbyHole;
import org.gcube.contentmanager.storageserver.data.ReadingMongoOplog;
import org.gcube.contentmanager.storageserver.parse.JsonParser;
public class Startup {
@ -38,22 +40,29 @@ public class Startup {
user=args[2];
password=args[3];
}
Configuration cfg=new Configuration(scope, user, password);
Configuration cfg=new Configuration(scope, user, password, true);
String[] server=retrieveServerConfiguration(cfg);
List<String> dtsHosts=retrieveDTSConfiguration(cfg);
CubbyHole c = new CubbyHole();
CubbyHole c1 = new CubbyHole();
CubbyHole c2 = new CubbyHole();
ReadingMongoOplog producer=null;
if(args.length == 3)
producer=new ReadingMongoOplog( Arrays.asList(oplogServer), args[1], args[2], c, 1 );
producer=new ReadingMongoOplog( Arrays.asList(oplogServer), args[1], args[2], c1, c2, 1 );
else
producer=new ReadingMongoOplog( Arrays.asList(server), c, 1 );
JsonParser consumer=null;
producer=new ReadingMongoOplog( Arrays.asList(server), c1, c2, 1 );
UserAccountingConsumer ssConsumer=null;
if(args.length == 3)
consumer=new JsonParser(server, c, 1, dtsHosts);
ssConsumer=new UserAccountingConsumer(server, c1, 1, dtsHosts);
else
consumer=new JsonParser(server, args[1], args[2], c, 1, dtsHosts);
ssConsumer=new UserAccountingConsumer(server, args[1], args[2], c1, 1, dtsHosts);
FolderAccountingConsumer fsConsumer=null;
if(args.length == 3)
fsConsumer=new FolderAccountingConsumer(server, c1, 1);
else
fsConsumer=new FolderAccountingConsumer(server, args[1], args[2], c1, 1);
producer.start();
consumer.start();
ssConsumer.start();
fsConsumer.start();
}
private static String[] retrieveServerConfiguration(Configuration c) {

View File

@ -0,0 +1,128 @@
package org.gcube.contentmanager.storageserver.store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
public class FolderStatusOperationManager {
Logger logger=LoggerFactory.getLogger(FolderStatusOperationManager.class);
DBCollection fsCollection;
public FolderStatusOperationManager(DBCollection fsCollection){
setFsCollection(fsCollection);
}
public void putFSRecord(String folder, long volume, int count, String lastUpdate){
BasicDBObject doc = new BasicDBObject("folder", folder)
.append("volume", volume)
.append("count", count)
.append("lastupdate", lastUpdate);
getFsCollection().insert(doc);
}
public FolderStatusRecord updateFolder(FolderStatusRecord fsRecord, String lastOperation){
FolderStatusRecord oldFsr=getFSRecord(fsRecord.getFolder());
if(oldFsr != null){
int partialCount=oldFsr.getCount();
int count = countCalculation(fsRecord.getCount(), partialCount, lastOperation);
fsRecord.setCount(count);
long partialVolume=oldFsr.getVolume();
long volume = volumeCalculation(fsRecord.getVolume(), partialVolume, lastOperation);
fsRecord.setVolume(volume);
final BasicDBObject query = new BasicDBObject("folder", fsRecord.getFolder());
// Creating BasicDBObjectBuilder object without arguments
DBObject documentBuilder = BasicDBObjectBuilder.start()
.add("volume", volume).add("count", count).add("lastUpdate", fsRecord.getLastUpdate()).get();
// get the dbobject from builder and Inserting document
getFsCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false);
// // if is a move operation then will be update also the source folder
// if(lastOperation.equalsIgnoreCase("MOVE")){
// FolderStatusRecord originalFolderRecord = getFSRecord(folder)
// }
}else{
putFSRecord(fsRecord.getFolder(), fsRecord.getVolume(), fsRecord.getCount(), fsRecord.getLastUpdate());
}
return fsRecord;
}
public FolderStatusRecord getFSRecord(String folder){
BasicDBObject query = new BasicDBObject("folder", folder);
DBCursor cursor=getFsCollection().find(query);
DBObject obj=null;
try{
if(cursor.hasNext()){
obj=cursor.next();
}
}finally{
cursor.close();
}
if(obj!=null){
String cons=null;
if(obj.containsField("consumer")) cons=(String) obj.get("consumer");
else logger.error("incomplete record found. consumer field is missing");
long vol =0;
if(obj.containsField("volume")) vol=(long) obj.get("volume");
else logger.error("incomplete record found. volume field is missing");
int count=0;
if(obj.containsField("count")) count=(int) obj.get("count");
else logger.error("incomplete record found. count field is missing");
String lastUpdate=null;
if(obj.containsField("lastUpdate")) lastUpdate=(String) obj.get("lastUpdate");
else logger.error("incomplete record found. lastUpdate field is missing");
String originalFolder=null;
if(obj.containsField("from")) originalFolder=(String) obj.get("from");
else logger.info(" originalFolder field is missing");
String id=(String)obj.get("id");
return new FolderStatusRecord(id, cons, vol, count, lastUpdate, originalFolder, obj);
}else{
return null;
}
}
private long volumeCalculation(long currentVolume, long partialVolume, String operation) {
logger.info("accounting: operation "+operation+" total Volume "+partialVolume+" current volume "+currentVolume);
if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){
partialVolume=partialVolume+currentVolume;
}else if(operation.equalsIgnoreCase("DELETE")){
partialVolume=partialVolume-currentVolume;
}
logger.info("new volume "+partialVolume);
return partialVolume;
}
private int countCalculation(int currentCount, int partialCount, String operation) {
logger.info("accounting: operation "+operation+" total count "+partialCount+" current count"+currentCount);
if(operation.equalsIgnoreCase("UPLOAD")|| operation.equalsIgnoreCase("COPY"))
partialCount=partialCount+currentCount;
else if(operation.equalsIgnoreCase("DELETE"))
partialCount=partialCount-currentCount;
logger.info("new count: "+partialCount);
return partialCount;
}
public DBCollection getFsCollection() {
return fsCollection;
}
public void setFsCollection(DBCollection fsCollection) {
this.fsCollection = fsCollection;
}
}

View File

@ -0,0 +1,89 @@
package org.gcube.contentmanager.storageserver.store;
import java.util.Date;
import com.mongodb.DBObject;
public class FolderStatusRecord {
private String folder;
private String originalFolder;
private long volume;
private int count;
private String lastUpdate;
private String id;
private DBObject dbo;
public FolderStatusRecord(String folderPath, long volume, int count, String lastUpdate, String from){
this.folder=folderPath;
this.volume=volume;
this.count=count;
this.lastUpdate=lastUpdate;
this.originalFolder=from;
}
public FolderStatusRecord(String id, String folderPath, long volume, int count, String lastUpdate, String from, DBObject obj){
this.id=id;
this.folder=folderPath;
this.volume=volume;
this.count=count;
this.lastUpdate=lastUpdate;
this.originalFolder=from;
this.dbo=obj;
}
public String getFolder() {
return folder;
}
public void setFolder(String folder) {
this.folder = folder;
}
public long getVolume() {
return volume;
}
public void setVolume(long volume) {
this.volume = volume;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public String getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(String lastUpdate) {
this.lastUpdate = lastUpdate;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOriginalFolder() {
return originalFolder;
}
public void setOriginalFolder(String originalFolder) {
this.originalFolder = originalFolder;
}
}

View File

@ -2,12 +2,8 @@ package org.gcube.contentmanager.storageserver.store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoOptions;
@ -16,130 +12,83 @@ public class MongoDB {
private Mongo mongo;
private DB db;
private String[] server;
private String collection;
private String collectionSSName;
private String collectionFSName;
private int port;
private String pwd;
private String user;
private DBCollection ssCollection;
private DBCollection fsCollection;
Logger logger = LoggerFactory.getLogger(MongoDB.class);
private static final String ACCOUNTING_DB="accounting";
private static final String DEFAULT_COLLECTION="storageStatus";
private static final String DEFAULT_SS_COLLECTION="storageStatus";
private static final String DEFAULT_FS_COLLECTION="folderStatus";
private FolderStatusOperationManager folderOperationManager;
private StorageStatusOperationManager ssOperationManager;
public MongoDB(String[] server, int port, String user, String password, String collectionName){
public MongoDB(String[] server, int port, String user, String password){
this.server=server;
this.port=port;
this.pwd=password;
this.user=user;
this.collection=collectionName;
db=getDB();
ssCollection = db.getCollection(collectionName);
this.collectionSSName=DEFAULT_SS_COLLECTION;
this.collectionFSName=DEFAULT_FS_COLLECTION;
folderOperationManager=new FolderStatusOperationManager(getFolderStatusCollection());
ssOperationManager=new StorageStatusOperationManager(getStorageStatusCollection());
}
public MongoDB(String[] server, String user, String password, String collectionName){
this.server=server;
this.pwd=password;
this.user=user;
this.collection=collectionName;
db=getDB();
ssCollection = db.getCollection(collectionName);
}
public MongoDB(String[] server, String user, String password){
this.server=server;
this.pwd=password;
this.user=user;
this.collection=DEFAULT_COLLECTION;
// db=getDB();
// ssCollection = db.getCollection(collection);
}
this.collectionSSName=DEFAULT_SS_COLLECTION;
this.collectionFSName=DEFAULT_FS_COLLECTION;
folderOperationManager=new FolderStatusOperationManager(getFolderStatusCollection());
ssOperationManager=new StorageStatusOperationManager(getStorageStatusCollection());
}
public void put(String consumer, long volume, int count){
BasicDBObject doc = new BasicDBObject("consumer", consumer)
.append("volume", volume)
.append("count", count);
getCollection().insert(doc);
public MongoDB(String[] server, String user, String password, String ssCollection, String fsCollection){
this.server=server;
this.pwd=password;
this.user=user;
if(ssCollection!=null)
this.collectionSSName=ssCollection;
else
this.collectionSSName=DEFAULT_SS_COLLECTION;
if(fsCollection!=null)
this.collectionFSName=fsCollection;
else
this.collectionFSName=DEFAULT_FS_COLLECTION;
folderOperationManager=new FolderStatusOperationManager(getFolderStatusCollection());
ssOperationManager=new StorageStatusOperationManager(getStorageStatusCollection());
}
public StorageStatusRecord updateUserVolume(StorageStatusRecord ssRecord, String operation){
ssRecord= ssOperationManager.updateUser(ssRecord, operation);
close();
}
public StorageStatusRecord update(String consumer, long volume, int count, String operation){
StorageStatusRecord ssr=get(consumer);
if(ssr != null){
int currentCount=ssr.getCount();
count = setCount(count, currentCount, operation);
ssr.setCount(count);
long currentVolume=ssr.getVolume();
volume = setVolume(volume, currentVolume, operation);
final BasicDBObject query = new BasicDBObject("consumer", consumer);
// Creating BasicDBObjectBuilder object without arguments
DBObject documentBuilder = BasicDBObjectBuilder.start()
.add("volume", volume).add("count", count).get();
// get the dbobject from builder and Inserting document
getCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false);
close();
}else{
put(consumer, volume, count);
}
return new StorageStatusRecord(consumer, volume, count);
}
public StorageStatusRecord get(String consumer){
BasicDBObject query = new BasicDBObject("consumer", consumer);
DBCursor cursor=getCollection().find(query);
DBObject obj=null;
try{
if(cursor.hasNext()){
obj=cursor.next();
}
}finally{
cursor.close();
}
if(obj!=null){
String cons=null;
if(obj.containsField("consumer")) cons=(String) obj.get("consumer");
else logger.error("incomplete record found. consumer field is missing");
long vol =0;
if(obj.containsField("volume")) vol=(long) obj.get("volume");
else logger.error("incomplete record found. volume field is missing");
int count=0;
if(obj.containsField("count")) count=(int) obj.get("count");
else logger.error("incomplete record found. count field is missing");
String id=(String)obj.get("id");
return new StorageStatusRecord(id, cons, vol, count, obj);
}else{
return null;
}
return ssRecord;
}
public void close(){
if(mongo!=null)
mongo.close();
public FolderStatusRecord updateFolderVolume(FolderStatusRecord fsRecord, String operation){
fsRecord= folderOperationManager.updateFolder(fsRecord, operation);
close();
return fsRecord;
}
private long setVolume(long volume, long currentVolume, String operation) {
logger.info("accounting: operation "+operation+" total Volume "+currentVolume+" current volume "+volume);
if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){
currentVolume=currentVolume+volume;
}else if(operation.equalsIgnoreCase("DELETE")){
currentVolume=currentVolume-volume;
}
logger.info("new volume "+currentVolume);
return currentVolume;
public StorageStatusRecord getSSRecord(String consumer){
StorageStatusRecord record=ssOperationManager.getSSRecord(consumer);
close();
return record;
}
private int setCount(int count, int currentCount, String operation) {
logger.info("accounting: operation "+operation+" total count "+currentCount+" current count"+count);
if(operation.equalsIgnoreCase("UPLOAD")|| operation.equalsIgnoreCase("COPY"))
currentCount=currentCount+count;
else if(operation.equalsIgnoreCase("DELETE"))
currentCount=currentCount-count;
logger.info("new count: "+currentCount);
return currentCount;
public FolderStatusRecord getFSRecord(String folder){
FolderStatusRecord record=folderOperationManager.getFSRecord(folder);
close();
return record;
}
private DB getDB() {
if(db != null){
@ -176,7 +125,7 @@ public class MongoDB {
if(auth) logger.debug("mongo is in authenticate mode");
else logger.debug("mongo is not in authenticate mode");
if(ssCollection == null)
ssCollection=db.getCollection(collection);
ssCollection=db.getCollection(collectionSSName);
ssCollection.findOne();
String firstServer = server[0];
server[0] = srv;
@ -191,11 +140,25 @@ public class MongoDB {
return db;
}
private DBCollection getCollection() {
public DBCollection getStorageStatusCollection() {
if(ssCollection==null)
return getDB().getCollection(collection);
return getDB().getCollection(collectionSSName);
else
return ssCollection;
}
public DBCollection getFolderStatusCollection() {
if(fsCollection==null)
return getDB().getCollection(collectionFSName);
else
return fsCollection;
}
public void close(){
if(mongo!=null)
mongo.close();
}
}

View File

@ -0,0 +1,115 @@
package org.gcube.contentmanager.storageserver.store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
public class StorageStatusOperationManager {
DBCollection ssCollection;
Logger logger=LoggerFactory.getLogger(StorageStatusOperationManager.class);
public StorageStatusOperationManager(DBCollection ssCollection){
setSsCollection(ssCollection);
}
public void putSSRecord(String consumer, long volume, int count){
BasicDBObject doc = new BasicDBObject("consumer", consumer)
.append("volume", volume)
.append("count", count);
getSsCollection().insert(doc);
}
public StorageStatusRecord updateUser(StorageStatusRecord ssRecord, String lastOperation){
StorageStatusRecord oldSsr=getSSRecord(ssRecord.getConsumer());
if(oldSsr != null){
int partialCount=oldSsr.getCount();
int count = countCalculation(ssRecord.getCount(), partialCount, lastOperation);
ssRecord.setCount(count);
long partialVolume=oldSsr.getVolume();
long volume = volumeCalculation(ssRecord.getVolume(), partialVolume, lastOperation);
ssRecord.setVolume(volume);
final BasicDBObject query = new BasicDBObject("consumer", ssRecord.getConsumer());
// Creating BasicDBObjectBuilder object without arguments
DBObject documentBuilder = BasicDBObjectBuilder.start()
.add("volume", volume).add("count", count).get();
// get the dbobject from builder and Inserting document
getSsCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false);
// close();
}else{
putSSRecord(ssRecord.getConsumer(), ssRecord.getVolume(), ssRecord.getCount());
}
return ssRecord;
}
public StorageStatusRecord getSSRecord(String consumer){
BasicDBObject query = new BasicDBObject("consumer", consumer);
DBCursor cursor=getSsCollection().find(query);
DBObject obj=null;
try{
if(cursor.hasNext()){
obj=cursor.next();
}
}finally{
cursor.close();
}
if(obj!=null){
String cons=null;
if(obj.containsField("consumer")) cons=(String) obj.get("consumer");
else logger.error("incomplete record found. consumer field is missing");
long vol =0;
if(obj.containsField("volume")) vol=(long) obj.get("volume");
else logger.error("incomplete record found. volume field is missing");
int count=0;
if(obj.containsField("count")) count=(int) obj.get("count");
else logger.error("incomplete record found. count field is missing");
String id=(String)obj.get("id");
return new StorageStatusRecord(id, cons, vol, count, obj);
}else{
return null;
}
}
private long volumeCalculation(long currentVolume, long partialVolume, String operation) {
logger.info("accounting: operation "+operation+" total Volume "+partialVolume+" current volume "+currentVolume);
if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){
partialVolume=partialVolume+currentVolume;
}else if(operation.equalsIgnoreCase("DELETE")){
partialVolume=partialVolume-currentVolume;
}
logger.info("new volume "+partialVolume);
return partialVolume;
}
private int countCalculation(int currentCount, int partialCount, String operation) {
logger.info("accounting: operation "+operation+" total count "+partialCount+" current count"+currentCount);
if(operation.equalsIgnoreCase("UPLOAD")|| operation.equalsIgnoreCase("COPY"))
partialCount=partialCount+currentCount;
else if(operation.equalsIgnoreCase("DELETE"))
partialCount=partialCount-currentCount;
logger.info("new count: "+partialCount);
return partialCount;
}
public DBCollection getSsCollection() {
return ssCollection;
}
public void setSsCollection(DBCollection ssCollection) {
this.ssCollection = ssCollection;
}
}

View File

@ -16,7 +16,7 @@ public class ConfigurationTest {
@BeforeClass
public static void init(){
c=new Configuration(scope, user, password);
c=new Configuration(scope, user, password, true);
}
@Test

View File

@ -1,7 +1,5 @@
package org.gcube.contentmanager.storageserver.store;
import static org.junit.Assert.*;
import org.junit.BeforeClass;
import org.junit.Test;
@ -15,15 +13,15 @@ public class MongoDBTest {
mongo=new MongoDB(server, null, null);
}
// @Test
public void update(){
mongo.update("test.consumer", 100, 1, "UPLOAD");
StorageStatusRecord ssr=new StorageStatusRecord("test.consumer", 100, 1);
mongo.updateUserVolume(ssr, "UPLOAD");
}
@Test
public void put(){
mongo.put("test.consumer2", 100, 1 );
new StorageStatusOperationManager(mongo.getStorageStatusCollection()).putSSRecord( "test.consumer2", 100, 1 );
}
}