diff --git a/pom.xml b/pom.xml index 3dacc4a..8fe59b9 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ org.mongodb - mongo-java-driver + mongo-java-driver [2.6.2,) diff --git a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java index 827b70e..944abcb 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java @@ -113,7 +113,7 @@ import org.slf4j.LoggerFactory; Calendar endTime = new GregorianCalendar(); Date time=endTime.getTime(); SimpleDateFormat sdf=new SimpleDateFormat(); - sdf.applyPattern("dd MM yyyy 'at' hh:mm:ss z");//format(time); + sdf.applyPattern("dd MM yyyy 'at' hh:mm:ss z"); sdf.format(time); logger.info("set end time: "+time); try { diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java index 0caabd5..96f2797 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java @@ -13,7 +13,8 @@ import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; -import com.mongodb.MongoClient; +import com.mongodb.Mongo; +//import com.mongodb.MongoClient; import com.mongodb.ServerAddress; import org.bson.types.BSONTimestamp; import org.gcube.contentmanager.storageserver.parse.JsonParser; @@ -23,43 +24,35 @@ import org.slf4j.LoggerFactory; public class ReadingMongoOplog extends Thread{ final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class); - final static String DBNAME="remotefs"; + public String DBNAME="remotefs"; private ServerAddress[] server; - private MongoClient mongoClient; + private Mongo mongoClient; private DB local; private DBCollection oplog; private CubbyHole c; + private String user; + private String password; private int number; - public ReadingMongoOplog(List srvs, CubbyHole c, int number){ + public ReadingMongoOplog(List srvs, CubbyHole c, int numberT){ this.c=c; - this.number=number; - try { - if(srvs.size() > 0){ - server=new ServerAddress[srvs.size()]; - int i=0; - for(String s : srvs){ - server[i]=new ServerAddress(s); - i++; - } - }else{ - logger.error("MongoDB server not Setted. Please set one or more servers"); - throw new RuntimeException("MongoDB server not Setted. Please set one or more servers"); - } - } catch (UnknownHostException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - init(); + this.number=numberT; + setupServerAddress(srvs); + initBackend(); } - private void init() { - mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71" - local = mongoClient.getDB("local"); - oplog = local.getCollection("oplog.rs"); + + public ReadingMongoOplog(List srvs, String user, + String password, CubbyHole c, int numberT) { + this.c=c; + this.number=numberT; + this.user=user; + this.password=password; + setupServerAddress(srvs); + initBackend(); } - public void run() { + public void run() { DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1); if (!lastCursor.hasNext()) { logger.error("no oplog!"); @@ -80,7 +73,7 @@ public class ReadingMongoOplog extends Thread{ // check if discard or process the current DB record if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){ if(x.containsField("o")){ -// parser.jsonRecordParser(x); +// parser.jsonRecordParser(x); c.put(x); logger.info("Producer #" + this.number + " put: " + x); }else{ @@ -100,6 +93,43 @@ public class ReadingMongoOplog extends Thread{ } } + @SuppressWarnings("deprecation") + private void initBackend() { + mongoClient = new Mongo(Arrays.asList(server));//"146.48.123.71" + local = mongoClient.getDB("local"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + boolean auth =false; + if(user!=null && password !=null) + auth=local.authenticate(user.trim(), password.trim().toCharArray()); + if(auth) logger.info("mongo is in authenticate mode"); + else logger.info("mongo is not in authenticate mode"); + oplog = local.getCollection("oplog.rs"); + } + + private void setupServerAddress(List srvs) { + try { + if(srvs.size() > 0){ + server=new ServerAddress[srvs.size()]; + int i=0; + for(String s : srvs){ + server[i]=new ServerAddress(s); + i++; + } + }else{ + logger.error("MongoDB server not Setted. Please set one or more servers"); + throw new RuntimeException("MongoDB server not Setted. Please set one or more servers"); + } + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java index c7ea8c4..c0633ba 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java @@ -1,5 +1,11 @@ package org.gcube.contentmanager.storageserver.parse; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; + import org.bson.types.ObjectId; import org.gcube.contentmanager.storageserver.accounting.Report; import org.gcube.contentmanager.storageserver.accounting.ReportConfig; @@ -21,8 +27,7 @@ public class JsonParser extends Thread{ public JsonParser(CubbyHole c, int number){ this.c=c; this.number=number; - // int the accounting report - + // init the accounting report try { init(); } catch (ReportException e) { @@ -38,6 +43,7 @@ public class JsonParser extends Thread{ while(true){ DBObject x=c.get(); logger.info("Consumer #" + this.number + " got: " + x); + // retrieve object fields DBObject obj=(DBObject)x.get("o"); String op=(String) x.get("op"); String filename=(String) obj.get("filename"); @@ -45,24 +51,30 @@ public class JsonParser extends Thread{ String name=(String) obj.get("name"); String owner=(String) obj.get("owner"); String creationTime=(String) obj.get("creationTime"); + String lastReadTime=null; + if(obj.get("lastRead") != null) lastReadTime=(String)obj.get("lastRead"); + logger.debug("last Read field: "+lastReadTime); String delete=null; - logger.info("obj.get(OnDeleting) "+obj.get("onDeleting")); + logger.debug("obj.get(OnDeleting) "+obj.get("onDeleting")); if(obj.get("onDeleting") != null) delete=(String)obj.get("onDeleting"); - logger.info("delete field: "+delete); + logger.debug("delete field: "+delete); ObjectId objectId=(ObjectId)obj.get("_id"); String id = objectId.toString(); long length=-1; if(obj.get("length")!=null) length=(long)obj.get("length"); logger.debug("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+ " id: "+id); - if((length >0) && (filename!=null)){ + if(((length >0) && (filename!=null))){ //call to the accounting library - String scope=retrieveScopeFromFilename(filename); - report.init(owner, scope, creationTime); -// report.timeUpdate(); - String operation=mappingOperationField(op, id, delete); - report.ultimate(owner, null, operation, length+"", filename, id); - report.send(); - logger.debug("[accountingCall] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner); + String scope=retrieveScopeFromRemoteFilePath(filename); + String operation=null; + try { + operation=mappingOperationField(op, id, delete, lastReadTime); + } catch (ParseException e) { + e.printStackTrace(); + } +// report(op, filename, owner, creationTime, delete, id, length,scope, operation); + logger.info(" operation accounted "+operation); + logger.info("[accountingCall] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+" id: "+id+" scope: "+scope); // }else if(op.equals("i")){ // previousInsert=id; }else{ @@ -72,12 +84,42 @@ public class JsonParser extends Thread{ } } - private String mappingOperationField(String op, String id, String onDeleting) { + private void report(String op, String filename, String owner, + String creationTime, String delete, String id, long length, + String scope, String operation) { + report.init(owner, scope, creationTime); +// report.timeUpdate(); + report.ultimate(owner, null, operation, length+"", filename, id); + report.send(); + } + + private String mappingOperationField(String op, String id, String onDeleting, String lastRead) throws ParseException { logger.info("delete field: "+onDeleting); if(op.equals("u")){ if((onDeleting != null) && (onDeleting.equals("true"))){ logger.info("found delete field"); return "DELETE"; + }else if(lastRead !=null){ + SimpleDateFormat sdf = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z"); + Date dateLastRead = sdf.parse(lastRead); +// Calendar calendarLastRead = Calendar.getInstance(); +// calendarLastRead.setTime(dateLastRead); +// calendarLastRead.set(Calendar.MINUTE, -5); + Calendar now=Calendar.getInstance(); + logger.info("now: "+sdf.format(now.getTime())); + now.add(Calendar.MINUTE, -5); +// logger.info("now decreased -5 min: "+sdf.format(dateNow)); +// logger.info("lastRead is "+sdf.format(dateLastRead)+" now,5 min decreased is"+sdf.format(dateNow)); + logger.info("now decreased -5 min: "+sdf.format(now.getTime())); + logger.info("lastRead is "+sdf.format(dateLastRead)+" now,5 min decreased is"+sdf.format(now.getTime())); + if(now.getTime().compareTo(dateLastRead) < 0){ + logger.info("It is a read"); + return "READ"; + }else{ + logger.info("It isn't a read"); + return "UPDATE"; + } + }else return "UPDATE"; }else if(op.equals("i")){ @@ -88,7 +130,7 @@ public class JsonParser extends Thread{ return op; } - private String retrieveScopeFromFilename(String filename) { + private String retrieveScopeFromRemoteFilePath(String filename) { String[] split=filename.split("/"); if(split.length>0){ String scope=null; diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java index a0fea86..bd16ba0 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java @@ -8,18 +8,32 @@ import org.gcube.contentmanager.storageserver.parse.JsonParser; public class Startup { public static void main(String[] args) { - for (int i=0; i 1){ + for (int i=0; i