diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java index 8f58954..ff54fe3 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java @@ -76,48 +76,56 @@ public class JsonParser extends Thread{ public void run() { while(true){ - DBObject x=c.get(); - logger.info("Consumer #" + this.number + " got: " + x); - op = (String) x.get("op"); - // retrieve object fields - DBObject obj=(DBObject)x.get("o"); - retrieveObjectFields(obj); - long length=-1; - if(obj.get("length")!=null) length=(long)obj.get("length"); - logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess); - if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){ - //convert from byte to kb - length=length/1024; - String scope=null; - if((filename!=null)&& (filename.contains("/"))) - scope=retrieveScopeFromRemoteFilePath(filename); - else{ - // field added on storage manager library for retrieve scope. Used only if it is a link delete - String pathString=(String)obj.get("onScope"); - scope=retrieveScopeFromRemoteFilePath(pathString); - } - boolean validScope=ValidationUtils.validationScope(scope); - if(validScope){ - if(delete!=null){ - lastOperation="DELETE"; - }else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ - // it is an update on a link object this operation doesn't be accounted - logger.info("update on link object is not accounted. Skip next "); - continue; + DBObject x=null; + try{ + x=c.get(); + logger.info("Consumer #" + this.number + " got: " + x); + op = (String) x.get("op"); + // retrieve object fields + DBObject obj=(DBObject)x.get("o"); + retrieveObjectFields(obj); + long length=-1; + if(obj.get("length")!=null) length=(long)obj.get("length"); + logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess); + if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){ + //convert from byte to kb + length=length/1024; + String scope=null; + if((filename!=null)&& (filename.contains("/"))) + scope=retrieveScopeFromRemoteFilePath(filename); + else{ + // field added on storage manager library for retrieve scope. Used only if it is a link delete + String pathString=(String)obj.get("onScope"); + scope=retrieveScopeFromRemoteFilePath(pathString); } -// operation=mappingOperationField(op, id, delete, lastAccess); - MongoDB mongo=new MongoDB(server, user, password); - StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation); - //call to the accounting library - report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+""); - logger.info(" operation accounted "+lastOperation); - logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp); - }else{ - logger.info("operation is not accounted: invalid scope: "+scope); - } - }else{ - logger.info("operation is not accounted"); - } + boolean validScope=ValidationUtils.validationScope(scope); + if(validScope){ + if(delete!=null){ + lastOperation="DELETE"; + }else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ + // it is an update on a link object this operation doesn't be accounted + logger.info("update on link object is not accounted. Skip next "); + continue; + } +// operation=mappingOperationField(op, id, delete, lastAccess); + MongoDB mongo=new MongoDB(server, user, password); + StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation); + mongo.close(); + //call to the accounting library + report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+""); + logger.info(" operation accounted "+lastOperation); + logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp); + }else{ + logger.info("operation is not accounted: invalid scope: "+scope); + } + }else{ + logger.info("operation is not accounted"); + } + + }catch(Exception e){ + logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getMessage()); + logger.info("skip to next record "); + } } } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java b/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java index b6712d0..c2268ad 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java @@ -49,65 +49,17 @@ public class MongoDB { this.pwd=password; this.user=user; this.collection=DEFAULT_COLLECTION; - db=getDB(); - ssCollection = db.getCollection(collection); +// db=getDB(); +// ssCollection = db.getCollection(collection); } - private DB getDB() { - if(db != null){ - // check if the old server is primary - try{ - DB db = mongo.getDB(ACCOUNTING_DB); - }catch(Exception e ){ - logger.warn("the server now is not a primary "); - db=null; - } - } - if(db==null){ - int i=-1; - for(String srv : server){ - try { - i++; - if(mongo!=null) - mongo.close(); - MongoOptions options=new MongoOptions(); - options.autoConnectRetry=true; - options.socketKeepAlive=true; - options.maxWaitTime=240000; - options.connectionsPerHost=35; - mongo = new Mongo(srv, options); -// mongo.getMongoOptions().autoConnectRetry=true; -// mongo.getMongoOptions().socketKeepAlive=true; - logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions()); - db = mongo.getDB(ACCOUNTING_DB); - // check on user and password for non authenticate mode - if(user==null) user=""; - if(pwd==null) pwd=""; - boolean auth = db.authenticate(user, pwd.toCharArray()); - if(auth) logger.info("mongo is in authenticate mode"); - else logger.info("mongo is not in authenticate mode"); - ssCollection = db.getCollection(collection); - ssCollection.findOne(); -// GridFS gfs = new GridFS(db); - String firstServer = server[0]; - server[0] = srv; - server[i]=firstServer; - break; - } catch (Exception e) { - logger.warn("server "+srv+" is not a primary retry "); - continue; - } - - } - } - return db; - } public void put(String consumer, long volume, int count){ BasicDBObject doc = new BasicDBObject("consumer", consumer) .append("volume", volume) .append("count", count); - ssCollection.insert(doc); + getCollection().insert(doc); + close(); } public StorageStatusRecord update(String consumer, long volume, int count, String operation){ @@ -123,13 +75,50 @@ public class MongoDB { DBObject documentBuilder = BasicDBObjectBuilder.start() .add("volume", volume).add("count", count).get(); // get the dbobject from builder and Inserting document - ssCollection.update(query,new BasicDBObject("$set", documentBuilder), true, false); + getCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false); + close(); }else{ put(consumer, volume, count); } return new StorageStatusRecord(consumer, volume, count); } + + + public StorageStatusRecord get(String consumer){ + BasicDBObject query = new BasicDBObject("consumer", consumer); + DBCursor cursor=getCollection().find(query); + DBObject obj=null; + try{ + if(cursor.hasNext()){ + obj=cursor.next(); + + } + }finally{ + cursor.close(); + } + if(obj!=null){ + String cons=null; + if(obj.containsField("consumer")) cons=(String) obj.get("consumer"); + else logger.error("incomplete record found. consumer field is missing"); + long vol =0; + if(obj.containsField("volume")) vol=(long) obj.get("volume"); + else logger.error("incomplete record found. volume field is missing"); + int count=0; + if(obj.containsField("count")) count=(int) obj.get("count"); + else logger.error("incomplete record found. count field is missing"); + String id=(String)obj.get("id"); + return new StorageStatusRecord(id, cons, vol, count, obj); + }else{ + return null; + } + } + + public void close(){ + if(mongo!=null) + mongo.close(); + } + private long setVolume(long volume, long currentVolume, String operation) { logger.info("accounting: operation "+operation+" total Volume "+currentVolume+" current volume "+volume); if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){ @@ -152,33 +141,61 @@ public class MongoDB { } - - public StorageStatusRecord get(String consumer){ - BasicDBObject query = new BasicDBObject("consumer", consumer); - DBCursor cursor=ssCollection.find(query); - DBObject obj=null; - try{ - if(cursor.hasNext()){ - obj=cursor.next(); - + private DB getDB() { + if(db != null){ + // check if the old server is primary + try{ + DB db = mongo.getDB(ACCOUNTING_DB); + }catch(Exception e ){ + logger.warn("the server now is not a primary "); + db=null; + } } - }finally{ - cursor.close(); - } - if(obj!=null){ - String cons=null; - if(obj.containsField("consumer")) cons=(String) obj.get("consumer"); - else logger.error("incomplete record found. consumer field is missing"); - long vol =0; - if(obj.containsField("volume")) vol=(long) obj.get("volume"); - else logger.error("incomplete record found. volume field is missing"); - int count=0; - if(obj.containsField("count")) count=(int) obj.get("count"); - else logger.error("incomplete record found. count field is missing"); - String id=(String)obj.get("id"); - return new StorageStatusRecord(id, cons, vol, count, obj); - }else - return null; + if(db==null){ + int i=-1; + for(String srv : server){ + try { + i++; + ssCollection=null; + if(mongo!=null) + mongo.close(); + MongoOptions options=new MongoOptions(); + options.autoConnectRetry=true; + options.socketKeepAlive=true; + options.maxWaitTime=240000; + options.connectionsPerHost=35; + mongo = new Mongo(srv, options); +// mongo.getMongoOptions().autoConnectRetry=true; +// mongo.getMongoOptions().socketKeepAlive=true; + logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions()); + db = mongo.getDB(ACCOUNTING_DB); + // check on user and password for non authenticate mode + if(user==null) user=""; + if(pwd==null) pwd=""; + boolean auth = db.authenticate(user, pwd.toCharArray()); + if(auth) logger.info("mongo is in authenticate mode"); + else logger.info("mongo is not in authenticate mode"); + if(ssCollection == null) + ssCollection=db.getCollection(collection); + ssCollection.findOne(); + String firstServer = server[0]; + server[0] = srv; + server[i]=firstServer; + break; + } catch (Exception e) { + logger.warn("server "+srv+" is not a primary retry "); + continue; + } + } + } + return db; } - + + private DBCollection getCollection() { + if(ssCollection==null) + return getDB().getCollection(collection); + else + return ssCollection; + } + } diff --git a/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java b/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java index 36beb2f..eeb4465 100644 --- a/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java +++ b/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java @@ -16,10 +16,14 @@ public class MongoDBTest { } - @Test - public void putAndRetrieve(){ - mongo.update("test.consumer", 100, -1, "UPLOAD"); +// @Test + public void update(){ + mongo.update("test.consumer", 100, 1, "UPLOAD"); } + @Test + public void put(){ + mongo.put("test.consumer2", 100, 1 ); + } }