package org.gcube.contentmanager.storageserver.store; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoOptions; public class MongoDB { private Mongo mongo; private DB db; private String[] server; private String collection; private int port; private String pwd; private String user; private DBCollection ssCollection; Logger logger = LoggerFactory.getLogger(MongoDB.class); private static final String ACCOUNTING_DB="accounting"; private static final String DEFAULT_COLLECTION="storageStatus"; public MongoDB(String[] server, int port, String user, String password, String collectionName){ this.server=server; this.port=port; this.pwd=password; this.user=user; this.collection=collectionName; db=getDB(); ssCollection = db.getCollection(collectionName); } public MongoDB(String[] server, String user, String password, String collectionName){ this.server=server; this.pwd=password; this.user=user; this.collection=collectionName; db=getDB(); ssCollection = db.getCollection(collectionName); } public MongoDB(String[] server, String user, String password){ this.server=server; this.pwd=password; this.user=user; this.collection=DEFAULT_COLLECTION; // db=getDB(); // ssCollection = db.getCollection(collection); } public void put(String consumer, long volume, int count){ BasicDBObject doc = new BasicDBObject("consumer", consumer) .append("volume", volume) .append("count", count); getCollection().insert(doc); close(); } public StorageStatusRecord update(String consumer, long volume, int count, String operation){ StorageStatusRecord ssr=get(consumer); if(ssr != null){ int currentCount=ssr.getCount(); count = setCount(count, currentCount, operation); ssr.setCount(count); long currentVolume=ssr.getVolume(); volume = setVolume(volume, currentVolume, operation); final BasicDBObject query = new BasicDBObject("consumer", consumer); // Creating BasicDBObjectBuilder object without arguments DBObject documentBuilder = BasicDBObjectBuilder.start() .add("volume", volume).add("count", count).get(); // get the dbobject from builder and Inserting document getCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false); close(); }else{ put(consumer, volume, count); } return new StorageStatusRecord(consumer, volume, count); } public StorageStatusRecord get(String consumer){ BasicDBObject query = new BasicDBObject("consumer", consumer); DBCursor cursor=getCollection().find(query); DBObject obj=null; try{ if(cursor.hasNext()){ obj=cursor.next(); } }finally{ cursor.close(); } if(obj!=null){ String cons=null; if(obj.containsField("consumer")) cons=(String) obj.get("consumer"); else logger.error("incomplete record found. consumer field is missing"); long vol =0; if(obj.containsField("volume")) vol=(long) obj.get("volume"); else logger.error("incomplete record found. volume field is missing"); int count=0; if(obj.containsField("count")) count=(int) obj.get("count"); else logger.error("incomplete record found. count field is missing"); String id=(String)obj.get("id"); return new StorageStatusRecord(id, cons, vol, count, obj); }else{ return null; } } public void close(){ if(mongo!=null) mongo.close(); } private long setVolume(long volume, long currentVolume, String operation) { logger.info("accounting: operation "+operation+" total Volume "+currentVolume+" current volume "+volume); if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){ currentVolume=currentVolume+volume; }else if(operation.equalsIgnoreCase("DELETE")){ currentVolume=currentVolume-volume; } logger.info("new current volume "+currentVolume); return currentVolume; } private int setCount(int count, int currentCount, String operation) { logger.info("accounting: operation "+operation+" total count "+currentCount+" current count"+count); if(operation.equalsIgnoreCase("UPLOAD")) currentCount=currentCount+count; else if(operation.equalsIgnoreCase("DELETE")) currentCount=currentCount-count; logger.info("new count calculated: "+currentCount); return currentCount; } private DB getDB() { if(db != null){ // check if the old server is primary try{ DB db = mongo.getDB(ACCOUNTING_DB); }catch(Exception e ){ logger.warn("the server now is not a primary "); db=null; } } if(db==null){ int i=-1; for(String srv : server){ try { i++; ssCollection=null; if(mongo!=null) mongo.close(); MongoOptions options=new MongoOptions(); options.autoConnectRetry=true; options.socketKeepAlive=true; options.maxWaitTime=240000; options.connectionsPerHost=35; mongo = new Mongo(srv, options); // mongo.getMongoOptions().autoConnectRetry=true; // mongo.getMongoOptions().socketKeepAlive=true; logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions()); db = mongo.getDB(ACCOUNTING_DB); // check on user and password for non authenticate mode if(user==null) user=""; if(pwd==null) pwd=""; boolean auth = db.authenticate(user, pwd.toCharArray()); if(auth) logger.info("mongo is in authenticate mode"); else logger.info("mongo is not in authenticate mode"); if(ssCollection == null) ssCollection=db.getCollection(collection); ssCollection.findOne(); String firstServer = server[0]; server[0] = srv; server[i]=firstServer; break; } catch (Exception e) { logger.warn("server "+srv+" is not a primary retry "); continue; } } } return db; } private DBCollection getCollection() { if(ssCollection==null) return getDB().getCollection(collection); else return ssCollection; } }