diff --git a/pom.xml b/pom.xml index deb3ca7..3b08850 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,8 @@ org.mongodb mongo-java-driver - 2.12.4 + + [3.0.0, 3.1.0) org.slf4j diff --git a/src/main/java/org/gcube/contentmanager/storageserver/accounting/Report.java b/src/main/java/org/gcube/contentmanager/storageserver/accounting/Report.java index 46e562d..a252636 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/accounting/Report.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/accounting/Report.java @@ -14,7 +14,7 @@ public interface Report { * @param resourceScope * @return */ - public StorageUsageRecord setGenericProperties(String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner, String operation, String size); + public StorageUsageRecord setGenericProperties(StorageUsageRecord sur, String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner, String operation, String size); /** * Set end time of operation and other specific properties diff --git a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java index d04e278..bf4454d 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; public class ReportAccountingImpl implements Report { final Logger logger = LoggerFactory.getLogger(ReportAccountingImpl.class); - public StorageUsageRecord sur; +// public StorageUsageRecord sur; public AccountingPersistence accountingPersistence; private String providerUri; // public ResourceAccounting raFactory; @@ -25,11 +25,8 @@ import org.slf4j.LoggerFactory; @Override public void init(){ accountingPersistence = AccountingPersistenceFactory.getPersistence(); -// accountingPersistence.account(usageRecord) -// raFactory = null; try { -// raFactory = ResourceAccountingFactory.getResourceAccountingInstance(); - sur = new StorageUsageRecord(); +// sur = new StorageUsageRecord(); } catch (Exception e) { e.printStackTrace(); @@ -37,10 +34,12 @@ import org.slf4j.LoggerFactory; } @Override - public StorageUsageRecord setGenericProperties(String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner, String operation, String size) { + public StorageUsageRecord setGenericProperties(StorageUsageRecord sur, String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner, String operation, String size) { logger.trace("set accounting generic properties: operation: "+operation+" resourceType: "+resourceType+" consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime+" lastAccess "+lastAccess+" owner "+ owner); if(accountingPersistence==null) init(); - if(sur == null) sur = new StorageUsageRecord(); + if (sur == null) + sur = new StorageUsageRecord(); +// logger.debug("id generated by backend "+sur.getId()); try { sur.setDataType(StorageUsageRecord.DataType.STORAGE); sur.setOperationResult(OperationResult.SUCCESS); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java b/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java index a61fb76..c07c77b 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java @@ -133,6 +133,7 @@ public class UserAccountingConsumer extends Thread{ ssr=mongo.updateUserVolume(ssr, record.getLastOperation()); mongo.close(); }catch(Exception e){ + mongo.close(); logger.error("Problem in updating storage status record: "+e.getMessage()); } } @@ -214,7 +215,7 @@ public class UserAccountingConsumer extends Thread{ private void report(OpLogRemoteObject record, String scope, String totVolume, String totCount) { // ACCOUNTING CALL TYPE: STORAGE USAGE - StorageUsageRecord sur=report.setGenericProperties("storage-usage", record.getLastUser(), scope, record.getCreationTime(), record.getLastAccess(), record.getOwner(), record.getLastOperation(), record.getLength()+""); + StorageUsageRecord sur=report.setGenericProperties(null, "storage-usage", record.getLastUser(), scope, record.getCreationTime(), record.getLastAccess(), record.getOwner(), record.getLastOperation(), record.getLength()+""); sur=report.setSpecificProperties(sur, record.getFilename(), "STORAGE", record.getCallerIp(), record.getId()); logger.info("[accounting call] type: storage usage "); report.printRecord(sur); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java index 89f75bb..359d540 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java @@ -12,6 +12,9 @@ import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import org.bson.types.BSONTimestamp; import org.gcube.contentmanager.storageserver.consumer.UserAccountingConsumer; @@ -23,7 +26,7 @@ public class ReadingMongoOplog extends Thread{ final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class); public static String DBNAME="remotefs"; private ServerAddress[] server; - private Mongo mongoClient; + private MongoClient mongoClient; private DB local; private DBCollection oplog; private CubbyHole c1; @@ -32,6 +35,8 @@ public class ReadingMongoOplog extends Thread{ private String password; private int number; private List srvs; + protected static ReadPreference READ_PREFERENCE=ReadPreference.primary(); + protected static final String DEFAULT_DB_NAME="local"; public ReadingMongoOplog(List srvs, CubbyHole c1, CubbyHole c2, int numberT){ this.c1=c1; @@ -102,8 +107,8 @@ public class ReadingMongoOplog extends Thread{ // @SuppressWarnings("deprecation") private void initBackend() { - mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71" - local = mongoClient.getDB("local"); + + MongoClientOptions options=MongoClientOptions.builder().readPreference(READ_PREFERENCE).build(); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -111,15 +116,25 @@ public class ReadingMongoOplog extends Thread{ e.printStackTrace(); } boolean auth =false; - if(user!=null && password !=null) - auth=local.authenticate(user.trim(), password.trim().toCharArray()); + logger.info("try to auth with "+user+" "+password); + if(user!=null && password !=null){ + MongoCredential credential = MongoCredential.createMongoCRCredential(user, "admin", password.toCharArray()); + logger.debug("try to connect to mongo with authentication... "); + mongoClient = new MongoClient(Arrays.asList(server), Arrays.asList(credential), options);//"146.48.123.71" + }else{ + logger.debug("try to connect to mongo... "); + mongoClient = new MongoClient(Arrays.asList(server)); + } + logger.debug("try to connect to local db..."); + local = mongoClient.getDB("local"); + logger.debug("db connected "); if(auth) logger.info("mongo is in authenticate mode"); else logger.info("mongo is not in authenticate mode"); oplog = local.getCollection("oplog.rs"); } private void setupServerAddress(List srvs) { - try { +// try { if(srvs.size() > 0){ server=new ServerAddress[srvs.size()]; int i=0; @@ -131,9 +146,9 @@ public class ReadingMongoOplog extends Thread{ logger.error("MongoDB server not set. Please set one or more servers"); throw new RuntimeException("MongoDB server not set. Please set one or more servers"); } - } catch (UnknownHostException e) { - e.printStackTrace(); - } +// } catch (UnknownHostException e) { +// e.printStackTrace(); +// } } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java index 10e3234..084c762 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java @@ -40,6 +40,14 @@ public class Configuration { } + public Configuration(String scope, boolean dtsFilter){ + this.activeDTSFilter=dtsFilter; + this.scope=scope; + if(!ValidationUtils.validationScope(scope)) + throw new IllegalArgumentException("invalid scope exception: "+scope); + + } + public String[] getServerAccess(){ String savedScope=null; if(scope!=null){ @@ -273,5 +281,23 @@ public class Configuration { this.activeDTSFilter = activeDTSFilter; } + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + + } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java index 27dcdfa..8a0254d 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java @@ -35,12 +35,13 @@ public class Startup { } scope=args[0]; String oplogServer= args[1]; - if(args.length == 4){ - user=args[2]; - password=args[3]; - } - Configuration cfg=new Configuration(scope, user, password, true); +// if(args.length == 4){ +// user=args[2]; +// password=args[3]; +// } + Configuration cfg=new Configuration(scope, true); String[] server=retrieveServerConfiguration(cfg); + List dtsHosts=retrieveDTSConfiguration(cfg); CubbyHole c1 = new CubbyHole(); CubbyHole c2 = null;//new CubbyHole(); @@ -52,41 +53,47 @@ public class Startup { private static void startFolderAccountingConsumer(String[] args, String[] server, CubbyHole c2) { FolderAccountingConsumer fsConsumer=null; - if(args.length == 3) - fsConsumer=new FolderAccountingConsumer(server, c2, 1); + if(user!=null && password != null) + fsConsumer=new FolderAccountingConsumer(server, user, password, c2, 1); else - fsConsumer=new FolderAccountingConsumer(server, args[1], args[2], c2, 1); + fsConsumer=new FolderAccountingConsumer(server, c2, 1); fsConsumer.start(); } private static void startUserAccountingConsumer(String[] args, String[] server, List dtsHosts, CubbyHole c1) { UserAccountingConsumer ssConsumer=null; - if(args.length == 2) + if(user!=null && password != null) + ssConsumer=new UserAccountingConsumer(server, user, password, c1, 1, dtsHosts); + + else //if(args.length == 4) ssConsumer=new UserAccountingConsumer(server, c1, 1, dtsHosts); - else if(args.length == 4) - ssConsumer=new UserAccountingConsumer(server, args[1], args[2], c1, 1, dtsHosts); - else{ - throw new IllegalArgumentException("input parameter are incorrect"); - } +// else{ +// throw new IllegalArgumentException("input parameter are incorrect"); +// } ssConsumer.start(); } private static void startProducer(String[] args, String oplogServer, String[] server, CubbyHole c1, CubbyHole c2) { ReadingMongoOplog producer=null; - if(args.length == 4) - producer=new ReadingMongoOplog( Arrays.asList(oplogServer), args[1], args[2], c1, c2, 1 ); - else if(args.length == 2) + if(args[2]!=null && args[3]!= null) + producer=new ReadingMongoOplog( Arrays.asList(oplogServer), args[2], args[3], c1, c2, 1 ); + else //if(args.length == 2) producer=new ReadingMongoOplog( Arrays.asList(server), c1, c2, 1 ); - else{ - throw new IllegalArgumentException("input parameter are incorrect"); - } +// else{ +// throw new IllegalArgumentException("input parameter are incorrect"); +// } producer.start(); } private static String[] retrieveServerConfiguration(Configuration c) { - return c.getServerAccess(); + String[] server= c.getServerAccess(); + if(user == null) + user=c.getUsername(); + if(password == null) + password=c.getPassword(); + return server; } private static List retrieveDTSConfiguration(Configuration c){ diff --git a/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java b/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java index 7ed6fe1..4eac0be 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/store/MongoDB.java @@ -1,15 +1,26 @@ package org.gcube.contentmanager.storageserver.store; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.DB; import com.mongodb.DBCollection; -import com.mongodb.Mongo; -import com.mongodb.MongoOptions; +//import com.mongodb.Mongo; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +//import com.mongodb.MongoOptions; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; public class MongoDB { - private Mongo mongo; + private MongoClient mongo; private DB db; private String[] server; private String collectionSSName; @@ -25,6 +36,8 @@ public class MongoDB { private static final String DEFAULT_FS_COLLECTION="folderStatus"; private FolderStatusOperationManager folderOperationManager; private StorageStatusOperationManager ssOperationManager; + protected static ReadPreference READ_PREFERENCE=ReadPreference.primary(); + protected static final WriteConcern WRITE_TYPE=WriteConcern.REPLICAS_SAFE; public MongoDB(String[] server, int port, String user, String password){ this.server=server; @@ -98,56 +111,98 @@ public class MongoDB { return record; } - private DB getDB() { - if(db != null){ - // check if the old server is primary - try{ - DB db = mongo.getDB(ACCOUNTING_DB); - }catch(Exception e ){ - logger.warn("the server now is not a primary "); - db=null; - } + + protected DB getDB(){ + if(db==null){ + try{ + int i=-1; + List srvList=new ArrayList(); + for(String srv : server){ + srvList.add(new ServerAddress(srv)); } - if(db==null){ - int i=-1; - for(String srv : server){ - try { - i++; - ssCollection=null; - if(mongo!=null) - mongo.close(); - MongoOptions options=new MongoOptions(); - options.autoConnectRetry=true; - options.socketKeepAlive=true; - options.maxWaitTime=240000; - options.connectionsPerHost=35; - mongo = new Mongo(srv, options); -// mongo.getMongoOptions().autoConnectRetry=true; -// mongo.getMongoOptions().socketKeepAlive=true; - logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions()); - db = mongo.getDB(ACCOUNTING_DB); - // check on user and password for non authenticate mode - if(user==null) user=""; - if(pwd==null) pwd=""; - boolean auth = db.authenticate(user, pwd.toCharArray()); - if(auth) logger.debug("mongo is in authenticate mode"); - else logger.debug("mongo is not in authenticate mode"); - if(ssCollection == null) - ssCollection=db.getCollection(collectionSSName); - ssCollection.findOne(); - String firstServer = server[0]; - server[0] = srv; - server[i]=firstServer; - break; - } catch (Exception e) { - logger.warn("server "+srv+" is not a primary retry "); - continue; + if(mongo==null){ + logger.debug(" open mongo connection "); + MongoClientOptions options=MongoClientOptions.builder().connectionsPerHost(10).socketTimeout(60000).connectTimeout(30000).readPreference(READ_PREFERENCE).build(); + if(((pwd != null) && (pwd.length() >0)) && ((user != null) && (user.length() > 0))){ + MongoCredential credential = MongoCredential.createMongoCRCredential(user, ACCOUNTING_DB, pwd.toCharArray()); + mongo = new MongoClient(srvList, Arrays.asList(credential), options); + }else{ + mongo = new MongoClient(srvList, options); } - } + logger.debug("Istantiate MongoDB with options: "+mongo.getMongoClientOptions()); } - return db; + db = mongo.getDB(ACCOUNTING_DB); + db.setWriteConcern(WRITE_TYPE); + } catch (Exception e) { + close(); + logger.error("Problem to open the DB connection for gridfs file "); + e.printStackTrace(); + } + logger.info("new mongo connection pool opened"); + } + return db; } + +// private DB getDB() { +// if(db != null){ +// // check if the old server is primary +// try{ +// DB db = mongo.getDB(ACCOUNTING_DB); +// }catch(Exception e ){ +// logger.warn("the server now is not a primary "); +// db=null; +// } +// } +// if(db==null){ +// List srvList=new ArrayList(); +// for(String srv : server){ +// srvList.add(new ServerAddress(srv)); +// } +// int i=-1; +// for(String srv : server){ +// try { +// i++; +// ssCollection=null; +// if(mongo!=null) +// mongo.close(); +//// MongoOptions options=new MongoOptions(); +////// options.autoConnectRetry=true; +//// options.socketKeepAlive=true; +//// options.maxWaitTime=240000; +//// options.connectionsPerHost=35; +// MongoClientOptions options=MongoClientOptions.builder().connectionsPerHost(10).socketTimeout(60000).connectTimeout(30000).build(); +//// mongo = new Mongo(srv, options); +// if(((pwd != null) && (pwd.length() >0)) && ((user != null) && (user.length() > 0))){ +// MongoCredential credential = MongoCredential.createMongoCRCredential(user, ACCOUNTING_DB, pwd.toCharArray()); +// mongo = new MongoClient(srvList, Arrays.asList(credential), options); +// }else{ +// mongo = new MongoClient(srvList, options); +// } +// logger.debug("Istantiate MongoDB with options: "+mongo.getMongoOptions()); +// db = mongo.getDB(ACCOUNTING_DB); +// // check on user and password for non authenticate mode +//// if(user==null) user=""; +//// if(pwd==null) pwd=""; +//// boolean auth = db.authenticate(user, pwd.toCharArray()); +//// if(auth) logger.debug("mongo is in authenticate mode"); +//// else logger.debug("mongo is not in authenticate mode"); +// if(ssCollection == null) +// ssCollection=db.getCollection(collectionSSName); +// ssCollection.findOne(); +// String firstServer = server[0]; +// server[0] = srv; +// server[i]=firstServer; +// break; +// } catch (Exception e) { +// logger.warn("server "+srv+" is not a primary retry "); +// continue; +// } +// } +// } +// return db; +// } + public DBCollection getStorageStatusCollection() { if(ssCollection==null) return getDB().getCollection(collectionSSName); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/store/StorageStatusOperationManager.java b/src/main/java/org/gcube/contentmanager/storageserver/store/StorageStatusOperationManager.java index 2589fa0..e4d4eb9 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/store/StorageStatusOperationManager.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/store/StorageStatusOperationManager.java @@ -95,13 +95,13 @@ public class StorageStatusOperationManager { } private long volumeCalculation(long currentVolume, long partialVolume, String operation) { - logger.debug("accounting: operation "+operation+" total Volume "+partialVolume+" current volume "+currentVolume); + logger.info("accounting: operation "+operation+" total Volume "+partialVolume+" current volume "+currentVolume); if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){ partialVolume=partialVolume+currentVolume; }else if(operation.equalsIgnoreCase("DELETE")){ partialVolume=partialVolume-currentVolume; } - logger.debug("new volume "+partialVolume); + logger.info("new volume "+partialVolume); return partialVolume; } diff --git a/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java b/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java index e67519e..265cada 100644 --- a/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java +++ b/src/test/java/org/gcube/contentmanager/storageserver/store/MongoDBTest.java @@ -5,12 +5,12 @@ import org.junit.Test; public class MongoDBTest { - private static String[] server={"146.48.123.71","146.48.123.72"}; + private static String[] server={"mongo1-d-d4s.d4science.org","mongo2-d-d4s.d4science.org","mongo3-d-d4s.d4science.org"}; private static MongoDB mongo; @BeforeClass public static void init(){ - mongo=new MongoDB(server, null, null); + mongo=new MongoDB(server, "devUser", "d3v_u534"); } // @Test diff --git a/src/test/java/org/gcube/contentmanager/storageserver/store/OplogTest.java b/src/test/java/org/gcube/contentmanager/storageserver/store/OplogTest.java new file mode 100644 index 0000000..83a4b8a --- /dev/null +++ b/src/test/java/org/gcube/contentmanager/storageserver/store/OplogTest.java @@ -0,0 +1,83 @@ +package org.gcube.contentmanager.storageserver.store; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.bson.types.BSONTimestamp; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.mongodb.BasicDBObject; +import com.mongodb.Bytes; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; + +public class OplogTest { + + private static String[] server={"mongo1-d-d4s.d4science.org","mongo2-d-d4s.d4science.org","mongo3-d-d4s.d4science.org"}; +// private static MongoDB mongo; + private List srvs; + protected static ReadPreference READ_PREFERENCE=ReadPreference.primary(); + protected static final String DEFAULT_DB_NAME="local"; + private MongoClient mongoClient; + DB local; + private DBCollection oplog; + private String user="oplogger"; + private String password="0pl0gg3r"; + + +// @BeforeClass +// public static void init(){ +// initBackend(); +// } + + + + @Test + public void initTest(){ + initBackend(); + DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1); + if (!lastCursor.hasNext()) { + System.out.println("no oplog!"); + return; + } + DBObject last = lastCursor.next(); + BSONTimestamp ts = (BSONTimestamp) last.get("ts"); + DBCursor cursor = oplog.find(new BasicDBObject("ts", new BasicDBObject("$gt", ts))); + cursor.addOption(Bytes.QUERYOPTION_TAILABLE); + cursor.addOption(Bytes.QUERYOPTION_AWAITDATA); + if (cursor.hasNext()) { + DBObject x = cursor.next(); + System.out.println("oplog current object: "+x); + } + } + + private void initBackend() { + List srvList=new ArrayList(); + for(String srv : server){ + srvList.add(new ServerAddress(srv)); + } + MongoClientOptions options=MongoClientOptions.builder().readPreference(READ_PREFERENCE).build(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + boolean auth =false; + MongoCredential credential = MongoCredential.createMongoCRCredential(user, "admin", password.toCharArray()); + mongoClient = new MongoClient(srvList, Arrays.asList(credential), options);//"146.48.123.71" + local = mongoClient.getDB("local"); + oplog = local.getCollection("oplog.rs"); + } +}