clean code and add more log
add new log4j properties git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@134274 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
d6ef341506
commit
a003a73bc3
|
@ -144,7 +144,7 @@ import org.slf4j.LoggerFactory;
|
|||
}
|
||||
|
||||
public void printRecord(StorageUsageRecord record){
|
||||
logger.info(" accounting properties: " +
|
||||
logger.info(" Record properties: " +
|
||||
"\n\t owner: "+record.getResourceOwner()+
|
||||
"\n\t scope "+record.getResourceScope()+
|
||||
"\n\t type "+record.getOperationType()+ //ResourceType()+
|
||||
|
|
|
@ -69,31 +69,33 @@ public class UserAccountingConsumer extends Thread{
|
|||
}
|
||||
|
||||
public void run() {
|
||||
logger.info("Sniffing is started ");
|
||||
logger.debug("Consuming SU record started ");
|
||||
while(true){
|
||||
DBObject x=null;
|
||||
MongoDB mongo=null;
|
||||
try{
|
||||
logger.info("Waiting next record... ");
|
||||
logger.debug("SU waiting next record... ");
|
||||
x=c.get();
|
||||
logger.info("Consumer #" + this.number + " got: " + x);
|
||||
logger.info("Consumer #" + this.number + " got: " + x.get("_id"));
|
||||
|
||||
//get operation
|
||||
op = (String) x.get("op");
|
||||
// retrieve object fields
|
||||
DBObject obj=(DBObject)x.get("o");
|
||||
OpLogRemoteObject record=retrieveObjectFields(obj);
|
||||
// set object dimension
|
||||
|
||||
logger.info("[recordCheck] operation: "+op+" name: "+record.getName()+" type: "+record.getType()+" path: "+record.getFilename()+" dir Path: "+record.getDir()+" length: "+record.getLength()+" owner: "+record.getOwner()+ " lastOperation "+record.getLastOperation()+" lastUser: "+record.getLastUser()+" lastAccess: "+record.getLastAccess());
|
||||
logger.debug("[recordCheck] operation: "+op+"\n\t name: "+record.getName()+"\n\t type: "+record.getType()+"\n\t path: "+record.getFilename()+"\n\t dir Path: "+record.getDir()+"\n\t length: "+record.getLength()+"\n\t owner: "+record.getOwner()+ "\n\t lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+"\n\t lastAccess: "+record.getLastAccess());
|
||||
if(((record.getLength() >0) && (((record.getFilename() !=null) && (record.getFilename().length()>0) && (record.getDir().length()>0)&& (record.getDir().contains("/"))) || (record.getLinkCount() > 0)))){
|
||||
//convert from byte to kb
|
||||
record.setLength(record.getLength()/1024);
|
||||
// check scope
|
||||
String scope=null;
|
||||
String pathString=(String)obj.get("onScope");
|
||||
logger.debug("pathString value: "+pathString);
|
||||
logger.debug("[recordCheck] pathString value: "+pathString);
|
||||
if((record.getDir()!=null)&& (record.getDir().contains("/"))){
|
||||
scope=retrieveScopeFromRemoteFilePath(record.getDir());
|
||||
logger.debug("scope retrieved: "+scope);
|
||||
logger.debug("[recordCheck] scope retrieved: "+scope);
|
||||
}else{
|
||||
// field added on storage manager library for retrieve scope. Used only if it is a link delete
|
||||
scope=retrieveScopeFromRemoteFilePath(pathString);
|
||||
|
@ -104,7 +106,7 @@ public class UserAccountingConsumer extends Thread{
|
|||
record.setLastOperation("DELETE");
|
||||
}else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){
|
||||
// it is an update on a link object this operation doesn't be accounted
|
||||
logger.info("update on link object is not accounted. Skip next ");
|
||||
logger.info("[recordCheck] update on link object is not accounted. Skip next ");
|
||||
continue;
|
||||
}else if((record.getLastOperation()==null) || (record.getLastUser()==null)){
|
||||
logger.warn("lastOperation: "+record.getLastOperation()+" lastUser: "+record.getLastUser()+". These values cannot be null. Skip next ");
|
||||
|
@ -112,21 +114,21 @@ public class UserAccountingConsumer extends Thread{
|
|||
}else{
|
||||
// if the lastoperation is download and the caller contains a dts host then it isn't a real user but it is a workspace accounting record
|
||||
if((dtsHosts != null) && (record.getLastOperation() != null) && (record.getLastOperation().equalsIgnoreCase("DOWNLOAD"))){
|
||||
logger.debug("check if the caller is from dts. CallerIP: "+record.getCallerIp());
|
||||
logger.debug("[recordCheck] check if the caller is from dts. CallerIP: "+record.getCallerIp()+" dts value: "+dtsHosts);
|
||||
for(String host: dtsHosts){
|
||||
logger.debug("scan "+host);
|
||||
if(record.getCallerIp().contains(host)){
|
||||
record.setLastOperation("workspace.accounting");
|
||||
logger.info("the caller is dts service: caller "+record.getCallerIp()+ " dts host: "+host+" the new user is: "+record.getLastUser());
|
||||
logger.debug("[recordCheck] the caller is dts service: caller "+record.getCallerIp()+ " dts host: "+host+" the new user is: "+record.getLastUser());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.debug(" operation accounted "+record.getLastOperation());
|
||||
logger.debug("[recordCheck] operation accounted "+record.getLastOperation());
|
||||
StorageStatusObject ssr=null;
|
||||
if(isNeedSSReport(record.getLastOperation())){
|
||||
try{
|
||||
logger.info("build ss record");
|
||||
logger.debug("[recordCheck] update SS record");
|
||||
mongo=new MongoDB(server, user, password);
|
||||
if(record.getLastOperation().equalsIgnoreCase("COPY"))
|
||||
record.setOwner(record.getLastUser());
|
||||
|
@ -135,7 +137,7 @@ public class UserAccountingConsumer extends Thread{
|
|||
mongo.close();
|
||||
}catch(Exception e){
|
||||
mongo.close();
|
||||
logger.error("Problem in updating storage status record: "+e.getMessage());
|
||||
logger.error("[recordCheck] Problem in updating storage status record: "+e.getMessage());
|
||||
}
|
||||
}
|
||||
try{
|
||||
|
@ -147,10 +149,10 @@ public class UserAccountingConsumer extends Thread{
|
|||
else
|
||||
report( record, scope, null, null);
|
||||
}catch(Exception e){
|
||||
logger.error("Problem sending accounting report. Exception message: "+e.getMessage());
|
||||
logger.error("[recordCheck] Problem sending accounting report. Exception message: "+e.getMessage());
|
||||
}
|
||||
}else{
|
||||
logger.info("operation "+record.getLastOperation()+" is not accounted: invalid scope: "+scope);
|
||||
logger.info("[recordCheck] operation "+record.getLastOperation()+" is not accounted: invalid scope: "+scope);
|
||||
}
|
||||
}else{
|
||||
logger.info("operation "+record.getLastOperation()+" is not accounted");
|
||||
|
@ -172,6 +174,7 @@ public class UserAccountingConsumer extends Thread{
|
|||
* @return
|
||||
*/
|
||||
private boolean isNeedSSReport(String lastOperation) {
|
||||
logger.debug("Last operation is "+lastOperation);
|
||||
if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE"))
|
||||
return true;
|
||||
return false;
|
||||
|
|
|
@ -18,6 +18,7 @@ public class CubbyHole {
|
|||
public synchronized DBObject get() {
|
||||
while (requestQueue.size() == 0){
|
||||
try {
|
||||
logger.debug("waiting in get");
|
||||
wait();
|
||||
}
|
||||
catch (InterruptedException e){
|
||||
|
@ -25,7 +26,7 @@ public class CubbyHole {
|
|||
}
|
||||
}
|
||||
DBObject value=requestQueue.remove(0);
|
||||
logger.debug("get element from queue: "+value);
|
||||
logger.debug("get element from queue: "+value.get("_id"));
|
||||
available = false;
|
||||
notifyAll();
|
||||
return value;
|
||||
|
@ -34,11 +35,12 @@ public class CubbyHole {
|
|||
public synchronized void put(DBObject value) {
|
||||
while (available == true) {
|
||||
try {
|
||||
logger.debug("waiting in put");
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
logger.debug("put element to queue: "+value);
|
||||
logger.debug("put element to queue: "+value.get("_id"));
|
||||
requestQueue.addElement(value);
|
||||
available = true;
|
||||
notifyAll();
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package org.gcube.contentmanager.storageserver.data;
|
||||
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import com.mongodb.BasicDBObject;
|
||||
|
@ -10,21 +9,22 @@ import com.mongodb.DB;
|
|||
import com.mongodb.DBCollection;
|
||||
import com.mongodb.DBCursor;
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.Mongo;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoCredential;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.ServerAddress;
|
||||
import org.bson.types.BSONTimestamp;
|
||||
import org.gcube.contentmanager.storageserver.consumer.UserAccountingConsumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ReadingMongoOplog extends Thread{
|
||||
|
||||
public static final String DBNAME="remotefs";
|
||||
public static final String LOCAL_COLLECTION="oplog.rs";
|
||||
public static final String LOCAL_DB="local";
|
||||
protected static ReadPreference READ_PREFERENCE=ReadPreference.secondaryPreferred();
|
||||
final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class);
|
||||
public static String DBNAME="remotefs";
|
||||
private ServerAddress[] server;
|
||||
private MongoClient mongoClient;
|
||||
private DB local;
|
||||
|
@ -35,8 +35,6 @@ public class ReadingMongoOplog extends Thread{
|
|||
private String password;
|
||||
private int number;
|
||||
private List<String> srvs;
|
||||
protected static ReadPreference READ_PREFERENCE=ReadPreference.secondaryPreferred();
|
||||
protected static final String DEFAULT_DB_NAME="local";
|
||||
|
||||
public ReadingMongoOplog(List<String> srvs, CubbyHole c1, CubbyHole c2, int numberT){
|
||||
this.c1=c1;
|
||||
|
@ -69,7 +67,7 @@ public class ReadingMongoOplog extends Thread{
|
|||
DBObject last = lastCursor.next();
|
||||
BSONTimestamp ts = (BSONTimestamp) last.get("ts");
|
||||
while (true) {
|
||||
logger.debug("starting at ts: " + ts);
|
||||
logger.debug("Sniffing is starting at ts: " + ts);
|
||||
DBCursor cursor = oplog.find(new BasicDBObject("ts", new BasicDBObject("$gt", ts)));
|
||||
cursor.addOption(Bytes.QUERYOPTION_TAILABLE);
|
||||
cursor.addOption(Bytes.QUERYOPTION_AWAITDATA);
|
||||
|
@ -79,7 +77,7 @@ public class ReadingMongoOplog extends Thread{
|
|||
ts = (BSONTimestamp) x.get("ts");
|
||||
String ns=(String)x.get("ns");
|
||||
// check if discard or process the current DB record
|
||||
if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
|
||||
if((x.get("o2") != null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
|
||||
if(x.containsField("o")){
|
||||
// c1 buffer for suer accounting
|
||||
c1.put(x);
|
||||
|
@ -87,12 +85,12 @@ public class ReadingMongoOplog extends Thread{
|
|||
if(c2 !=null)
|
||||
c2.put(x);
|
||||
// parser.runWithoutThread(x);
|
||||
logger.debug("Producer #" + this.number + " put: " + x);
|
||||
logger.debug("Producer #" + this.number + " put: " + x.get("_id"));
|
||||
}else{
|
||||
logger.debug("operation is not accounted");
|
||||
}
|
||||
}else{
|
||||
logger.debug("record discarded: \t"+x);
|
||||
logger.debug("record discarded: \t"+x.get("_id"));
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -107,7 +105,6 @@ public class ReadingMongoOplog extends Thread{
|
|||
|
||||
// @SuppressWarnings("deprecation")
|
||||
private void initBackend() {
|
||||
|
||||
MongoClientOptions options=MongoClientOptions.builder().readPreference(READ_PREFERENCE).build();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
|
@ -121,20 +118,19 @@ public class ReadingMongoOplog extends Thread{
|
|||
MongoCredential credential = MongoCredential.createMongoCRCredential(user, "admin", password.toCharArray());
|
||||
logger.debug("try to connect to mongo with authentication... ");
|
||||
mongoClient = new MongoClient(Arrays.asList(server), Arrays.asList(credential), options);//"146.48.123.71"
|
||||
|
||||
}else{
|
||||
logger.debug("try to connect to mongo... ");
|
||||
mongoClient = new MongoClient(Arrays.asList(server));
|
||||
}
|
||||
logger.debug("authenticated. ");
|
||||
logger.debug("try to connect to local db...");
|
||||
local = mongoClient.getDB("local");
|
||||
local = mongoClient.getDB(LOCAL_DB);
|
||||
logger.debug("db connected ");
|
||||
if(auth) logger.info("mongo is in authenticate mode");
|
||||
else logger.info("mongo is not in authenticate mode");
|
||||
oplog = local.getCollection("oplog.rs");
|
||||
oplog = local.getCollection(LOCAL_COLLECTION);
|
||||
}
|
||||
|
||||
private void setupServerAddress(List<String> srvs) {
|
||||
// try {
|
||||
if(srvs.size() > 0){
|
||||
server=new ServerAddress[srvs.size()];
|
||||
int i=0;
|
||||
|
@ -146,9 +142,6 @@ public class ReadingMongoOplog extends Thread{
|
|||
logger.error("MongoDB server not set. Please set one or more servers");
|
||||
throw new RuntimeException("MongoDB server not set. Please set one or more servers");
|
||||
}
|
||||
// } catch (UnknownHostException e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,8 +42,7 @@ public class StorageStatusOperationManager {
|
|||
|
||||
final BasicDBObject query = new BasicDBObject("consumer", ssRecord.getConsumer());
|
||||
// Creating BasicDBObjectBuilder object without arguments
|
||||
DBObject documentBuilder = BasicDBObjectBuilder.start()
|
||||
.add("volume", volume).add("count", count).get();
|
||||
DBObject documentBuilder = BasicDBObjectBuilder.start().add("volume", volume).add("count", count).get();
|
||||
// get the dbobject from builder and Inserting document
|
||||
getSsCollection().update(query,new BasicDBObject("$set", documentBuilder), true, false);
|
||||
// close();
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
log4j.rootLogger=DEBUG, A1
|
||||
log4j.appender.A1=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.A1.File=mongoTrigger.fullog
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||
log4j.appender.A1.MaxFileSize=10MB
|
||||
log4j.appender.A1.MaxBackupIndex=10
|
||||
|
||||
|
||||
log4j.logger.org.gcube.documentstore=WARN, file
|
||||
log4j.logger.org.gcube.data.publishing=WARN, file
|
||||
log4j.logger.com.couchbase.client=WARN, file
|
||||
log4j.logger.org.gcube.contentmanager.storageserver=DEBUG, file1
|
||||
|
||||
|
||||
# accounting log file
|
||||
log4j.appender.file=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.file.File=accounting.log
|
||||
log4j.appender.file.MaxFileSize=10MB
|
||||
log4j.appender.file.MaxBackupIndex=10
|
||||
log4j.appender.file.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
|
||||
|
||||
# Storage manager trigger log file
|
||||
log4j.appender.file1=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.file1.File=mongoTrigger.log
|
||||
log4j.appender.file1.MaxFileSize=10MB
|
||||
log4j.appender.file1.MaxBackupIndex=10
|
||||
log4j.appender.file1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.file1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
|
||||
|
||||
|
||||
#CONSOLE
|
||||
#log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
#log4j.appender.stdout.Threshold=FATAL
|
||||
#log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %c %d{dd MMM yyyy ;HH:mm:ss.SSS} - %m%n
|
Loading…
Reference in New Issue