add StorageStatusRecord for report

update to version 1.1.0
clean code

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@99650 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
roberto.cirillo 2014-09-10 08:23:30 +00:00
parent 6f220a84a1
commit 56d18fa6ae
6 changed files with 124 additions and 50 deletions

View File

@ -8,7 +8,7 @@
</parent>
<groupId>org.gcube.contentmanagement</groupId>
<artifactId>storage-manager-trigger</artifactId>
<version>1.0.2-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<scm>
<connection>scm:svn:http://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId}</connection>
<developerConnection>scm:svn:https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId}</developerConnection>

View File

@ -2,16 +2,10 @@ package org.gcube.contentmanager.storageserver.accounting;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import org.gcube.accounting.datamodel.RawUsageRecord;
import org.gcube.accounting.exception.InvalidValueException;
import org.gcube.accounting.messaging.ResourceAccounting;
import org.gcube.accounting.messaging.ResourceAccountingFactory;
import org.slf4j.Logger;
@ -19,9 +13,6 @@ import org.slf4j.LoggerFactory;
public class ReportAccountingImpl implements Report {
final Logger logger = LoggerFactory.getLogger(ReportAccountingImpl.class);
// storage usage record
// public RawUsageRecord sur;
// storage status record
public RawUsageRecord ssr;
public ResourceAccounting raFactory;
@ -45,11 +36,8 @@ import org.slf4j.LoggerFactory;
logger.info("set accounting generic properties: resourceType: "+resourceType+" consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime+" lastAccess "+lastAccess+" owner "+ owner);
if(raFactory==null) init();
RawUsageRecord sr = new RawUsageRecord();
// generic properties
// sur.setResourceType("storage-usage");
sr.setResourceType(resourceType);
if(consumerId!=null) sr.setConsumerId(consumerId);
// ur.setResourceOwner("paolo.fabriani");
if(resourceScope !=null) sr.setResourceScope(resourceScope);
//set creation time
if(creationTime!=null){
@ -61,7 +49,6 @@ import org.slf4j.LoggerFactory;
logger.error("Error in parsing date: "+creationTime+" exc msg: "+e.getMessage());
}
sr.setCreateTime(date);
// set the mandatory fields
try {
date = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z").parse(lastAccess);
sr.setStartTime(date);
@ -73,23 +60,19 @@ import org.slf4j.LoggerFactory;
if(owner != null) sr.setResourceOwner(owner);
// end mandatory files
}
logger.debug("generic fields completed ");
return sr;
}
@Override
public RawUsageRecord setSpecificProperties( RawUsageRecord sur, String operation, String size, String filePath, String callerIP, String dataType, String dataCount) {
logger.info("set accounting properties: operation: "+operation+" size: "+size+ " remotePath: "+filePath+" callerIP "+callerIP+" dataType "+dataType+" dataCount "+dataCount);
if(sur==null) sur = new RawUsageRecord();
logger.info("set accounting properties: operation: "+operation+" size: "+size+ " remotePath: "+filePath+" callerIP "+callerIP+" dataType "+dataType+" dataCount "+dataCount);
if(sur==null) sur = new RawUsageRecord();
if (operation!=null) sur.setResourceSpecificProperty("operationType",operation);
if(size!= null) sur.setResourceSpecificProperty("dataVolume", size);
if(filePath != null) sur.setResourceSpecificProperty("remotePath", filePath);
// if(id!= null) sur.setResourceSpecificProperty("id", id);
if(callerIP!=null) sur.setResourceSpecificProperty("callerIP", callerIP);
// if(lastAccess!=null){
// sur.setResourceSpecificProperty("lastAccess", lastAccess);
//
// }
sur.setResourceSpecificProperty("dataType",dataType);
sur.setResourceSpecificProperty("dataCount", dataCount);
return sur;
@ -97,10 +80,16 @@ import org.slf4j.LoggerFactory;
@Override
public void send(RawUsageRecord sur) {
logger.info("report sending...");
if(raFactory!=null)
try {
raFactory = ResourceAccountingFactory.getResourceAccountingInstance();
} catch (Exception e) {
e.printStackTrace();
}
if(raFactory!=null){
logger.info("report sending...");
raFactory.sendAccountingMessage(sur);
else
logger.info(" report send ");
}else
logger.error("Problem on building accounting record: Factory Object is null ");
}

View File

@ -11,15 +11,17 @@ import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import org.bson.types.BSONTimestamp;
import org.gcube.contentmanager.storageserver.parse.JsonParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReadingMongoOplog extends Thread{
final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class);
public String DBNAME="remotefs";
public static String DBNAME="remotefs";
private ServerAddress[] server;
private Mongo mongoClient;
private DB local;
@ -28,10 +30,12 @@ public class ReadingMongoOplog extends Thread{
private String user;
private String password;
private int number;
private List<String> srvs;
public ReadingMongoOplog(List<String> srvs, CubbyHole c, int numberT){
this.c=c;
this.number=numberT;
this.srvs=srvs;
setupServerAddress(srvs);
initBackend();
}
@ -48,6 +52,7 @@ public class ReadingMongoOplog extends Thread{
}
public void run() {
// check oplog collection
DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1);
if (!lastCursor.hasNext()) {
logger.error("no oplog!");
@ -68,8 +73,8 @@ public class ReadingMongoOplog extends Thread{
// check if discard or process the current DB record
if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
if(x.containsField("o")){
// parser.jsonRecordParser(x);
c.put(x);
// parser.runWithoutThread(x);
logger.info("Producer #" + this.number + " put: " + x);
}else{
logger.info("operation is not accounted");
@ -82,15 +87,13 @@ public class ReadingMongoOplog extends Thread{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@SuppressWarnings("deprecation")
// @SuppressWarnings("deprecation")
private void initBackend() {
mongoClient = new Mongo(Arrays.asList(server));//"146.48.123.71"
mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71"
local = mongoClient.getDB("local");
try {
Thread.sleep(1000);
@ -116,11 +119,10 @@ public class ReadingMongoOplog extends Thread{
i++;
}
}else{
logger.error("MongoDB server not Setted. Please set one or more servers");
throw new RuntimeException("MongoDB server not Setted. Please set one or more servers");
logger.error("MongoDB server not set. Please set one or more servers");
throw new RuntimeException("MongoDB server not set. Please set one or more servers");
}
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

View File

@ -1,8 +1,5 @@
package org.gcube.contentmanager.storageserver.parse;
import java.util.List;
import org.bson.types.ObjectId;
import org.gcube.accounting.datamodel.RawUsageRecord;
import org.gcube.contentmanager.storageserver.accounting.Report;
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
@ -36,12 +33,10 @@ public class JsonParser extends Thread{
private String lastUser;
private int linkCount;
private String delete;
// private String id;
private String callerIp;
private String user;
private String password;
String[] server;
// private String previousInsert;
public JsonParser(String[] srvs, CubbyHole c, int number){
this.c=c;
@ -69,8 +64,9 @@ public class JsonParser extends Thread{
}
}
private void init() throws ReportException{
report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE);
report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE);
report.init();
}
@ -85,12 +81,14 @@ public class JsonParser extends Thread{
// retrieve object fields
DBObject obj=(DBObject)x.get("o");
retrieveObjectFields(obj);
// set object dimension
long length=-1;
if(obj.get("length")!=null) length=(long)obj.get("length");
logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){
//convert from byte to kb
length=length/1024;
// check scope
String scope=null;
if((filename!=null)&& (filename.contains("/")))
scope=retrieveScopeFromRemoteFilePath(filename);
@ -107,14 +105,24 @@ public class JsonParser extends Thread{
// it is an update on a link object this operation doesn't be accounted
logger.info("update on link object is not accounted. Skip next ");
continue;
}else if((lastOperation==null) || (lastUser==null)){
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
}
StorageStatusRecord ssr=null;
try{
mongo=new MongoDB(server, user, password);
ssr=mongo.update(lastUser, length, 1, lastOperation);
mongo.close();
}catch(Exception e){
logger.error("Problem in updating storage status record: "+e.getMessage());
}
try{
logger.info(" operation accounted "+lastOperation);
//call to the accounting library
report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+"");
}catch(Exception e){
logger.error("Problem sending accounting report. Exception message: "+e.getMessage());
}
// operation=mappingOperationField(op, id, delete, lastAccess);
mongo=new MongoDB(server, user, password);
StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation);
mongo.close();
//call to the accounting library
report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+"");
logger.info(" operation accounted "+lastOperation);
logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
}else{
logger.info("operation is not accounted: invalid scope: "+scope);
@ -132,6 +140,78 @@ public class JsonParser extends Thread{
}
}
}
public void runWithoutThread(DBObject x){
try {
report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE);
} catch (ReportException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
report.init();
// DBObject x=null;
MongoDB mongo=null;
try{
// x=c.get();
logger.info("Consumer #" + this.number + " got: " + x);
op = (String) x.get("op");
// retrieve object fields
DBObject obj=(DBObject)x.get("o");
retrieveObjectFields(obj);
long length=-1;
if(obj.get("length")!=null) length=(long)obj.get("length");
logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){
//convert from byte to kb
length=length/1024;
String scope=null;
if((filename!=null)&& (filename.contains("/")))
scope=retrieveScopeFromRemoteFilePath(filename);
else{
// field added on storage manager library for retrieve scope. Used only if it is a link delete
String pathString=(String)obj.get("onScope");
scope=retrieveScopeFromRemoteFilePath(pathString);
}
boolean validScope=ValidationUtils.validationScope(scope);
if(validScope){
if(delete!=null){
lastOperation="DELETE";
}else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){
// it is an update on a link object this operation doesn't be accounted
logger.info("update on link object is not accounted. Skip next ");
// continue;
}else if((lastOperation==null) || (lastUser==null)){
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
}
// operation=mappingOperationField(op, id, delete, lastAccess);
StorageStatusRecord ssr=null;
try{
mongo=new MongoDB(server, user, password);
ssr=mongo.update(lastUser, length, 1, lastOperation);
mongo.close();
logger.info(" operation accounted "+lastOperation);
logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
//call to the accounting library
report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+"");
}catch(Exception e){
logger.error("Problem sending accounting report. Exception message: "+e.getMessage());
}
}else{
logger.info("operation is not accounted: invalid scope: "+scope);
}
}else{
logger.info("operation is not accounted");
}
}catch(Exception e){
e.printStackTrace();
logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getStackTrace());
logger.info("skip to next record ");
if(mongo!=null)
mongo.close();
}
}
private void retrieveObjectFields(DBObject obj) {
filename = (String) obj.get("filename");
@ -161,10 +241,13 @@ public class JsonParser extends Thread{
// ACCOUNTING CALL TYPE: STORAGE USAGE
RawUsageRecord sur=report.setGenericProperties("storage-usage",lastUser, scope, creationTime, lastAccess, owner);
sur=report.setSpecificProperties(sur, operation, length+"", filename, callerIP, "STORAGE", "1");
logger.info("[accounting call] type: storage usage ");
report.send(sur);
// ACCOUNTING CALL TYPE: STORAGE STATUS
logger.debug("set properties: totVolume: "+totVolume+" totCount "+totCount);
RawUsageRecord ssr=report.setGenericProperties("storage-status",lastUser, scope, creationTime, lastAccess, owner);
sur=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount);
ssr=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount);
logger.info("[accounting call] type: storage status ");
report.send(ssr);
}

View File

@ -14,8 +14,8 @@ public class ValidationUtils {
scope=scopeBean.enclosingScope().toString();
Set<String> scopeSet=new ServiceMapScannerMediator().getScopeKeySet();
for(String scopeItem : scopeSet){
// System.out.println("scope scanned: "+scopeItem);
if(scope.contains(scopeItem))
System.out.println("scope scanned: "+scopeItem);
if(scope.equals(scopeItem))
return true;
}
return false;

View File

@ -8,7 +8,7 @@ import org.junit.Test;
public class ValidationScopeTest {
private String scope="/d4science.research-infrastructures.eu/gCubeApps";
private String scope="/d4science.research-infrastructures.eu/EUBrazilOpenBio";
@Test
public void test() {