added user and pwd as input parameter
added check on read operation (under test) git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@93340 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
07395c202f
commit
f022f34a35
|
@ -113,7 +113,7 @@ import org.slf4j.LoggerFactory;
|
|||
Calendar endTime = new GregorianCalendar();
|
||||
Date time=endTime.getTime();
|
||||
SimpleDateFormat sdf=new SimpleDateFormat();
|
||||
sdf.applyPattern("dd MM yyyy 'at' hh:mm:ss z");//format(time);
|
||||
sdf.applyPattern("dd MM yyyy 'at' hh:mm:ss z");
|
||||
sdf.format(time);
|
||||
logger.info("set end time: "+time);
|
||||
try {
|
||||
|
|
|
@ -13,7 +13,8 @@ import com.mongodb.DB;
|
|||
import com.mongodb.DBCollection;
|
||||
import com.mongodb.DBCursor;
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.Mongo;
|
||||
//import com.mongodb.MongoClient;
|
||||
import com.mongodb.ServerAddress;
|
||||
import org.bson.types.BSONTimestamp;
|
||||
import org.gcube.contentmanager.storageserver.parse.JsonParser;
|
||||
|
@ -23,43 +24,35 @@ import org.slf4j.LoggerFactory;
|
|||
public class ReadingMongoOplog extends Thread{
|
||||
|
||||
final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class);
|
||||
final static String DBNAME="remotefs";
|
||||
public String DBNAME="remotefs";
|
||||
private ServerAddress[] server;
|
||||
private MongoClient mongoClient;
|
||||
private Mongo mongoClient;
|
||||
private DB local;
|
||||
private DBCollection oplog;
|
||||
private CubbyHole c;
|
||||
private String user;
|
||||
private String password;
|
||||
private int number;
|
||||
|
||||
public ReadingMongoOplog(List<String> srvs, CubbyHole c, int number){
|
||||
public ReadingMongoOplog(List<String> srvs, CubbyHole c, int numberT){
|
||||
this.c=c;
|
||||
this.number=number;
|
||||
try {
|
||||
if(srvs.size() > 0){
|
||||
server=new ServerAddress[srvs.size()];
|
||||
int i=0;
|
||||
for(String s : srvs){
|
||||
server[i]=new ServerAddress(s);
|
||||
i++;
|
||||
}
|
||||
}else{
|
||||
logger.error("MongoDB server not Setted. Please set one or more servers");
|
||||
throw new RuntimeException("MongoDB server not Setted. Please set one or more servers");
|
||||
}
|
||||
} catch (UnknownHostException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
init();
|
||||
this.number=numberT;
|
||||
setupServerAddress(srvs);
|
||||
initBackend();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71"
|
||||
local = mongoClient.getDB("local");
|
||||
oplog = local.getCollection("oplog.rs");
|
||||
|
||||
public ReadingMongoOplog(List<String> srvs, String user,
|
||||
String password, CubbyHole c, int numberT) {
|
||||
this.c=c;
|
||||
this.number=numberT;
|
||||
this.user=user;
|
||||
this.password=password;
|
||||
setupServerAddress(srvs);
|
||||
initBackend();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
public void run() {
|
||||
DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1);
|
||||
if (!lastCursor.hasNext()) {
|
||||
logger.error("no oplog!");
|
||||
|
@ -80,7 +73,7 @@ public class ReadingMongoOplog extends Thread{
|
|||
// check if discard or process the current DB record
|
||||
if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
|
||||
if(x.containsField("o")){
|
||||
// parser.jsonRecordParser(x);
|
||||
// parser.jsonRecordParser(x);
|
||||
c.put(x);
|
||||
logger.info("Producer #" + this.number + " put: " + x);
|
||||
}else{
|
||||
|
@ -100,6 +93,43 @@ public class ReadingMongoOplog extends Thread{
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void initBackend() {
|
||||
mongoClient = new Mongo(Arrays.asList(server));//"146.48.123.71"
|
||||
local = mongoClient.getDB("local");
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
boolean auth =false;
|
||||
if(user!=null && password !=null)
|
||||
auth=local.authenticate(user.trim(), password.trim().toCharArray());
|
||||
if(auth) logger.info("mongo is in authenticate mode");
|
||||
else logger.info("mongo is not in authenticate mode");
|
||||
oplog = local.getCollection("oplog.rs");
|
||||
}
|
||||
|
||||
private void setupServerAddress(List<String> srvs) {
|
||||
try {
|
||||
if(srvs.size() > 0){
|
||||
server=new ServerAddress[srvs.size()];
|
||||
int i=0;
|
||||
for(String s : srvs){
|
||||
server[i]=new ServerAddress(s);
|
||||
i++;
|
||||
}
|
||||
}else{
|
||||
logger.error("MongoDB server not Setted. Please set one or more servers");
|
||||
throw new RuntimeException("MongoDB server not Setted. Please set one or more servers");
|
||||
}
|
||||
} catch (UnknownHostException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
package org.gcube.contentmanager.storageserver.parse;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.GregorianCalendar;
|
||||
|
||||
import org.bson.types.ObjectId;
|
||||
import org.gcube.contentmanager.storageserver.accounting.Report;
|
||||
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
|
||||
|
@ -21,8 +27,7 @@ public class JsonParser extends Thread{
|
|||
public JsonParser(CubbyHole c, int number){
|
||||
this.c=c;
|
||||
this.number=number;
|
||||
// int the accounting report
|
||||
|
||||
// init the accounting report
|
||||
try {
|
||||
init();
|
||||
} catch (ReportException e) {
|
||||
|
@ -38,6 +43,7 @@ public class JsonParser extends Thread{
|
|||
while(true){
|
||||
DBObject x=c.get();
|
||||
logger.info("Consumer #" + this.number + " got: " + x);
|
||||
// retrieve object fields
|
||||
DBObject obj=(DBObject)x.get("o");
|
||||
String op=(String) x.get("op");
|
||||
String filename=(String) obj.get("filename");
|
||||
|
@ -45,24 +51,30 @@ public class JsonParser extends Thread{
|
|||
String name=(String) obj.get("name");
|
||||
String owner=(String) obj.get("owner");
|
||||
String creationTime=(String) obj.get("creationTime");
|
||||
String lastReadTime=null;
|
||||
if(obj.get("lastRead") != null) lastReadTime=(String)obj.get("lastRead");
|
||||
logger.debug("last Read field: "+lastReadTime);
|
||||
String delete=null;
|
||||
logger.info("obj.get(OnDeleting) "+obj.get("onDeleting"));
|
||||
logger.debug("obj.get(OnDeleting) "+obj.get("onDeleting"));
|
||||
if(obj.get("onDeleting") != null) delete=(String)obj.get("onDeleting");
|
||||
logger.info("delete field: "+delete);
|
||||
logger.debug("delete field: "+delete);
|
||||
ObjectId objectId=(ObjectId)obj.get("_id");
|
||||
String id = objectId.toString();
|
||||
long length=-1;
|
||||
if(obj.get("length")!=null) length=(long)obj.get("length");
|
||||
logger.debug("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+ " id: "+id);
|
||||
if((length >0) && (filename!=null)){
|
||||
if(((length >0) && (filename!=null))){
|
||||
//call to the accounting library
|
||||
String scope=retrieveScopeFromFilename(filename);
|
||||
report.init(owner, scope, creationTime);
|
||||
// report.timeUpdate();
|
||||
String operation=mappingOperationField(op, id, delete);
|
||||
report.ultimate(owner, null, operation, length+"", filename, id);
|
||||
report.send();
|
||||
logger.debug("[accountingCall] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner);
|
||||
String scope=retrieveScopeFromRemoteFilePath(filename);
|
||||
String operation=null;
|
||||
try {
|
||||
operation=mappingOperationField(op, id, delete, lastReadTime);
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// report(op, filename, owner, creationTime, delete, id, length,scope, operation);
|
||||
logger.info(" operation accounted "+operation);
|
||||
logger.info("[accountingCall] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+" id: "+id+" scope: "+scope);
|
||||
// }else if(op.equals("i")){
|
||||
// previousInsert=id;
|
||||
}else{
|
||||
|
@ -72,12 +84,42 @@ public class JsonParser extends Thread{
|
|||
}
|
||||
}
|
||||
|
||||
private String mappingOperationField(String op, String id, String onDeleting) {
|
||||
private void report(String op, String filename, String owner,
|
||||
String creationTime, String delete, String id, long length,
|
||||
String scope, String operation) {
|
||||
report.init(owner, scope, creationTime);
|
||||
// report.timeUpdate();
|
||||
report.ultimate(owner, null, operation, length+"", filename, id);
|
||||
report.send();
|
||||
}
|
||||
|
||||
private String mappingOperationField(String op, String id, String onDeleting, String lastRead) throws ParseException {
|
||||
logger.info("delete field: "+onDeleting);
|
||||
if(op.equals("u")){
|
||||
if((onDeleting != null) && (onDeleting.equals("true"))){
|
||||
logger.info("found delete field");
|
||||
return "DELETE";
|
||||
}else if(lastRead !=null){
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z");
|
||||
Date dateLastRead = sdf.parse(lastRead);
|
||||
// Calendar calendarLastRead = Calendar.getInstance();
|
||||
// calendarLastRead.setTime(dateLastRead);
|
||||
// calendarLastRead.set(Calendar.MINUTE, -5);
|
||||
Calendar now=Calendar.getInstance();
|
||||
logger.info("now: "+sdf.format(now.getTime()));
|
||||
now.add(Calendar.MINUTE, -5);
|
||||
// logger.info("now decreased -5 min: "+sdf.format(dateNow));
|
||||
// logger.info("lastRead is "+sdf.format(dateLastRead)+" now,5 min decreased is"+sdf.format(dateNow));
|
||||
logger.info("now decreased -5 min: "+sdf.format(now.getTime()));
|
||||
logger.info("lastRead is "+sdf.format(dateLastRead)+" now,5 min decreased is"+sdf.format(now.getTime()));
|
||||
if(now.getTime().compareTo(dateLastRead) < 0){
|
||||
logger.info("It is a read");
|
||||
return "READ";
|
||||
}else{
|
||||
logger.info("It isn't a read");
|
||||
return "UPDATE";
|
||||
}
|
||||
|
||||
}else
|
||||
return "UPDATE";
|
||||
}else if(op.equals("i")){
|
||||
|
@ -88,7 +130,7 @@ public class JsonParser extends Thread{
|
|||
return op;
|
||||
}
|
||||
|
||||
private String retrieveScopeFromFilename(String filename) {
|
||||
private String retrieveScopeFromRemoteFilePath(String filename) {
|
||||
String[] split=filename.split("/");
|
||||
if(split.length>0){
|
||||
String scope=null;
|
||||
|
|
|
@ -8,18 +8,32 @@ import org.gcube.contentmanager.storageserver.parse.JsonParser;
|
|||
public class Startup {
|
||||
|
||||
public static void main(String[] args) {
|
||||
for (int i=0; i<args.length;i++)
|
||||
System.out.println("param N." +i + ": " + args[i]);
|
||||
|
||||
if(args.length < 1 || args.length > 1){
|
||||
for (int i=0; i<args.length;i++){
|
||||
//NO show password
|
||||
if(i==2)
|
||||
System.out.println("param N." +i + ": *********");
|
||||
else
|
||||
System.out.println("param N." +i + ": " + args[i]);
|
||||
}
|
||||
if(args.length != 1 && args.length != 3){
|
||||
System.out.println("Usage:");
|
||||
System.out.println("\tjava Startup ip user password\n\n");
|
||||
System.out.println("Example:");
|
||||
System.out.println("\tjava Startup 127.0.0.1 pippo pluT0\n");
|
||||
System.out.println("or ");
|
||||
System.out.println("Usage:");
|
||||
System.out.println("\tjava Startup ip \n\n");
|
||||
System.out.println("Example:");
|
||||
System.out.println("\tjava Startup 127.0.0.1 \n\n");
|
||||
|
||||
return;
|
||||
}
|
||||
CubbyHole c = new CubbyHole();
|
||||
ReadingMongoOplog producer=new ReadingMongoOplog( Arrays.asList(args[0]), c, 1 );
|
||||
ReadingMongoOplog producer=null;
|
||||
if(args.length == 3)
|
||||
producer=new ReadingMongoOplog( Arrays.asList(args[0]), args[1], args[2], c, 1 );
|
||||
else
|
||||
producer=new ReadingMongoOplog( Arrays.asList(args[0]), c, 1 );
|
||||
JsonParser consumer=new JsonParser(c, 1);
|
||||
producer.start();
|
||||
consumer.start();
|
||||
|
|
Loading…
Reference in New Issue