196 lines
7.4 KiB
Java
196 lines
7.4 KiB
Java
package org.gcube.contentmanager.storageserver.parse;
|
|
|
|
import java.util.List;
|
|
|
|
import org.bson.types.ObjectId;
|
|
import org.gcube.accounting.datamodel.RawUsageRecord;
|
|
import org.gcube.contentmanager.storageserver.accounting.Report;
|
|
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
|
|
import org.gcube.contentmanager.storageserver.accounting.ReportException;
|
|
import org.gcube.contentmanager.storageserver.accounting.ReportFactory;
|
|
import org.gcube.contentmanager.storageserver.data.CubbyHole;
|
|
import org.gcube.contentmanager.storageserver.parse.utils.ValidationUtils;
|
|
import org.gcube.contentmanager.storageserver.store.MongoDB;
|
|
import org.gcube.contentmanager.storageserver.store.StorageStatusRecord;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import com.mongodb.DBObject;
|
|
|
|
public class JsonParser extends Thread{
|
|
|
|
final static Logger logger=LoggerFactory.getLogger(JsonParser.class);
|
|
final static int MINUTE_DECREMENT=-2;
|
|
private CubbyHole c;
|
|
private int number;
|
|
private Report report;
|
|
// object fields
|
|
private String op;
|
|
private String filename;
|
|
private String type;
|
|
private String name;
|
|
private String owner;
|
|
private String creationTime;
|
|
private String lastAccess;
|
|
private String lastOperation;
|
|
private String lastUser;
|
|
private int linkCount;
|
|
private String delete;
|
|
// private String id;
|
|
private String callerIp;
|
|
private String user;
|
|
private String password;
|
|
String[] server;
|
|
// private String previousInsert;
|
|
|
|
public JsonParser(String[] srvs, CubbyHole c, int number){
|
|
this.c=c;
|
|
this.number=number;
|
|
this.server=srvs;
|
|
// init the accounting report
|
|
try {
|
|
init();
|
|
} catch (ReportException e) {
|
|
throw new RuntimeException("Accounting report Exception initialization");
|
|
}
|
|
}
|
|
|
|
public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number){
|
|
this.c=c;
|
|
this.number=number;
|
|
this.server=srvs;
|
|
this.user=user;
|
|
this.password=password;
|
|
// init the accounting report
|
|
try {
|
|
init();
|
|
} catch (ReportException e) {
|
|
throw new RuntimeException("Accounting report Exception initialization");
|
|
}
|
|
}
|
|
|
|
private void init() throws ReportException{
|
|
report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE);
|
|
report.init();
|
|
}
|
|
|
|
public void run() {
|
|
while(true){
|
|
DBObject x=null;
|
|
MongoDB mongo=null;
|
|
try{
|
|
x=c.get();
|
|
logger.info("Consumer #" + this.number + " got: " + x);
|
|
op = (String) x.get("op");
|
|
// retrieve object fields
|
|
DBObject obj=(DBObject)x.get("o");
|
|
retrieveObjectFields(obj);
|
|
long length=-1;
|
|
if(obj.get("length")!=null) length=(long)obj.get("length");
|
|
logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
|
|
if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){
|
|
//convert from byte to kb
|
|
length=length/1024;
|
|
String scope=null;
|
|
if((filename!=null)&& (filename.contains("/")))
|
|
scope=retrieveScopeFromRemoteFilePath(filename);
|
|
else{
|
|
// field added on storage manager library for retrieve scope. Used only if it is a link delete
|
|
String pathString=(String)obj.get("onScope");
|
|
scope=retrieveScopeFromRemoteFilePath(pathString);
|
|
}
|
|
boolean validScope=ValidationUtils.validationScope(scope);
|
|
if(validScope){
|
|
if(delete!=null){
|
|
lastOperation="DELETE";
|
|
}else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){
|
|
// it is an update on a link object this operation doesn't be accounted
|
|
logger.info("update on link object is not accounted. Skip next ");
|
|
continue;
|
|
}
|
|
// operation=mappingOperationField(op, id, delete, lastAccess);
|
|
mongo=new MongoDB(server, user, password);
|
|
StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation);
|
|
mongo.close();
|
|
//call to the accounting library
|
|
report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+"");
|
|
logger.info(" operation accounted "+lastOperation);
|
|
logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
|
|
}else{
|
|
logger.info("operation is not accounted: invalid scope: "+scope);
|
|
}
|
|
}else{
|
|
logger.info("operation is not accounted");
|
|
}
|
|
|
|
}catch(Exception e){
|
|
e.printStackTrace();
|
|
logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getStackTrace());
|
|
logger.info("skip to next record ");
|
|
if(mongo!=null)
|
|
mongo.close();
|
|
}
|
|
}
|
|
}
|
|
|
|
private void retrieveObjectFields(DBObject obj) {
|
|
filename = (String) obj.get("filename");
|
|
type = (String) obj.get("type");
|
|
name = (String) obj.get("name");
|
|
owner = (String) obj.get("owner");
|
|
creationTime = (String) obj.get("creationTime");
|
|
lastAccess = null;
|
|
if(obj.get("lastAccess") != null) lastAccess=(String)obj.get("lastAccess");
|
|
callerIp = null;
|
|
if(obj.get("callerIP") != null) callerIp=(String)obj.get("callerIP");
|
|
lastOperation = null;
|
|
if(obj.get("lastOperation") != null) lastOperation=(String)obj.get("lastOperation");
|
|
lastUser = null;
|
|
if(obj.get("lastUser") != null) lastUser=(String)obj.get("lastUser");
|
|
linkCount = 0;
|
|
if(obj.get("linkCount") != null) linkCount=(int)obj.get("linkCount");
|
|
delete = null;
|
|
if(obj.get("onDeleting") != null) delete=(String)obj.get("onDeleting");
|
|
// ObjectId objectId=(ObjectId)obj.get("_id");
|
|
// id = objectId.toString();
|
|
}
|
|
|
|
private void report(String filename, String owner,
|
|
String creationTime, long length,
|
|
String scope, String operation, String callerIP, String lastAccess, String lastUser, String totVolume, String totCount) {
|
|
// ACCOUNTING CALL TYPE: STORAGE USAGE
|
|
RawUsageRecord sur=report.setGenericProperties("storage-usage",lastUser, scope, creationTime, lastAccess, owner);
|
|
sur=report.setSpecificProperties(sur, operation, length+"", filename, callerIP, "STORAGE", "1");
|
|
report.send(sur);
|
|
// ACCOUNTING CALL TYPE: STORAGE STATUS
|
|
RawUsageRecord ssr=report.setGenericProperties("storage-status",lastUser, scope, creationTime, lastAccess, owner);
|
|
sur=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount);
|
|
report.send(ssr);
|
|
|
|
}
|
|
|
|
|
|
private String retrieveScopeFromRemoteFilePath(String filename) {
|
|
String[] split=filename.split("/");
|
|
if(split.length>0){
|
|
String scope=null;
|
|
int i=1;
|
|
if(split[1].equals("VOLATILE")){
|
|
i=2;
|
|
}
|
|
scope="/"+split[i];
|
|
i++;
|
|
while((!split[i].equals("home")) && (!split[i].equals("public"))){
|
|
scope=scope+"/"+split[i];
|
|
i++;
|
|
}
|
|
logger.info("retieved scope: "+scope);
|
|
return scope;
|
|
}else logger.error("Scope bad format: scope not retrieved from string: "+filename);
|
|
return null;
|
|
}
|
|
|
|
|
|
|
|
}
|