diff --git a/pom.xml b/pom.xml index 0e7e50c..8158bf4 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.gcube.contentmanagement storage-manager-trigger - 1.0.2-SNAPSHOT + 1.1.0-SNAPSHOT scm:svn:http://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId} scm:svn:https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId} diff --git a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java index 1517bb4..07d4527 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java @@ -2,16 +2,10 @@ package org.gcube.contentmanager.storageserver.accounting; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Calendar; import java.util.Date; -import java.util.GregorianCalendar; - import org.gcube.accounting.datamodel.RawUsageRecord; -import org.gcube.accounting.exception.InvalidValueException; import org.gcube.accounting.messaging.ResourceAccounting; import org.gcube.accounting.messaging.ResourceAccountingFactory; import org.slf4j.Logger; @@ -19,9 +13,6 @@ import org.slf4j.LoggerFactory; public class ReportAccountingImpl implements Report { final Logger logger = LoggerFactory.getLogger(ReportAccountingImpl.class); -// storage usage record -// public RawUsageRecord sur; -// storage status record public RawUsageRecord ssr; public ResourceAccounting raFactory; @@ -45,11 +36,8 @@ import org.slf4j.LoggerFactory; logger.info("set accounting generic properties: resourceType: "+resourceType+" consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime+" lastAccess "+lastAccess+" owner "+ owner); if(raFactory==null) init(); RawUsageRecord sr = new RawUsageRecord(); - // generic properties -// sur.setResourceType("storage-usage"); sr.setResourceType(resourceType); if(consumerId!=null) sr.setConsumerId(consumerId); - // ur.setResourceOwner("paolo.fabriani"); if(resourceScope !=null) sr.setResourceScope(resourceScope); //set creation time if(creationTime!=null){ @@ -61,7 +49,6 @@ import org.slf4j.LoggerFactory; logger.error("Error in parsing date: "+creationTime+" exc msg: "+e.getMessage()); } sr.setCreateTime(date); - // set the mandatory fields try { date = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z").parse(lastAccess); sr.setStartTime(date); @@ -73,23 +60,19 @@ import org.slf4j.LoggerFactory; if(owner != null) sr.setResourceOwner(owner); // end mandatory files } + logger.debug("generic fields completed "); return sr; } @Override public RawUsageRecord setSpecificProperties( RawUsageRecord sur, String operation, String size, String filePath, String callerIP, String dataType, String dataCount) { - logger.info("set accounting properties: operation: "+operation+" size: "+size+ " remotePath: "+filePath+" callerIP "+callerIP+" dataType "+dataType+" dataCount "+dataCount); - if(sur==null) sur = new RawUsageRecord(); + logger.info("set accounting properties: operation: "+operation+" size: "+size+ " remotePath: "+filePath+" callerIP "+callerIP+" dataType "+dataType+" dataCount "+dataCount); + if(sur==null) sur = new RawUsageRecord(); if (operation!=null) sur.setResourceSpecificProperty("operationType",operation); if(size!= null) sur.setResourceSpecificProperty("dataVolume", size); if(filePath != null) sur.setResourceSpecificProperty("remotePath", filePath); -// if(id!= null) sur.setResourceSpecificProperty("id", id); if(callerIP!=null) sur.setResourceSpecificProperty("callerIP", callerIP); -// if(lastAccess!=null){ -// sur.setResourceSpecificProperty("lastAccess", lastAccess); -// -// } sur.setResourceSpecificProperty("dataType",dataType); sur.setResourceSpecificProperty("dataCount", dataCount); return sur; @@ -97,10 +80,16 @@ import org.slf4j.LoggerFactory; @Override public void send(RawUsageRecord sur) { - logger.info("report sending..."); - if(raFactory!=null) + try { + raFactory = ResourceAccountingFactory.getResourceAccountingInstance(); + } catch (Exception e) { + e.printStackTrace(); + } + if(raFactory!=null){ + logger.info("report sending..."); raFactory.sendAccountingMessage(sur); - else + logger.info(" report send "); + }else logger.error("Problem on building accounting record: Factory Object is null "); } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java index c645626..feaf110 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java @@ -11,15 +11,17 @@ import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; +import com.mongodb.MongoClient; import com.mongodb.ServerAddress; import org.bson.types.BSONTimestamp; +import org.gcube.contentmanager.storageserver.parse.JsonParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReadingMongoOplog extends Thread{ final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class); - public String DBNAME="remotefs"; + public static String DBNAME="remotefs"; private ServerAddress[] server; private Mongo mongoClient; private DB local; @@ -28,10 +30,12 @@ public class ReadingMongoOplog extends Thread{ private String user; private String password; private int number; + private List srvs; public ReadingMongoOplog(List srvs, CubbyHole c, int numberT){ this.c=c; this.number=numberT; + this.srvs=srvs; setupServerAddress(srvs); initBackend(); } @@ -48,6 +52,7 @@ public class ReadingMongoOplog extends Thread{ } public void run() { + // check oplog collection DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1); if (!lastCursor.hasNext()) { logger.error("no oplog!"); @@ -68,8 +73,8 @@ public class ReadingMongoOplog extends Thread{ // check if discard or process the current DB record if((x.get("o2")!=null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){ if(x.containsField("o")){ -// parser.jsonRecordParser(x); c.put(x); +// parser.runWithoutThread(x); logger.info("Producer #" + this.number + " put: " + x); }else{ logger.info("operation is not accounted"); @@ -82,15 +87,13 @@ public class ReadingMongoOplog extends Thread{ try { Thread.sleep(1000); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } } - @SuppressWarnings("deprecation") +// @SuppressWarnings("deprecation") private void initBackend() { - mongoClient = new Mongo(Arrays.asList(server));//"146.48.123.71" + mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71" local = mongoClient.getDB("local"); try { Thread.sleep(1000); @@ -116,11 +119,10 @@ public class ReadingMongoOplog extends Thread{ i++; } }else{ - logger.error("MongoDB server not Setted. Please set one or more servers"); - throw new RuntimeException("MongoDB server not Setted. Please set one or more servers"); + logger.error("MongoDB server not set. Please set one or more servers"); + throw new RuntimeException("MongoDB server not set. Please set one or more servers"); } } catch (UnknownHostException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java index 5a261bc..ee089bb 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java @@ -1,8 +1,5 @@ package org.gcube.contentmanager.storageserver.parse; -import java.util.List; - -import org.bson.types.ObjectId; import org.gcube.accounting.datamodel.RawUsageRecord; import org.gcube.contentmanager.storageserver.accounting.Report; import org.gcube.contentmanager.storageserver.accounting.ReportConfig; @@ -36,12 +33,10 @@ public class JsonParser extends Thread{ private String lastUser; private int linkCount; private String delete; -// private String id; private String callerIp; private String user; private String password; String[] server; -// private String previousInsert; public JsonParser(String[] srvs, CubbyHole c, int number){ this.c=c; @@ -69,8 +64,9 @@ public class JsonParser extends Thread{ } } + private void init() throws ReportException{ - report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE); + report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE); report.init(); } @@ -85,12 +81,14 @@ public class JsonParser extends Thread{ // retrieve object fields DBObject obj=(DBObject)x.get("o"); retrieveObjectFields(obj); + // set object dimension long length=-1; if(obj.get("length")!=null) length=(long)obj.get("length"); logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess); if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){ //convert from byte to kb length=length/1024; + // check scope String scope=null; if((filename!=null)&& (filename.contains("/"))) scope=retrieveScopeFromRemoteFilePath(filename); @@ -107,14 +105,24 @@ public class JsonParser extends Thread{ // it is an update on a link object this operation doesn't be accounted logger.info("update on link object is not accounted. Skip next "); continue; + }else if((lastOperation==null) || (lastUser==null)){ + logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next "); + } + StorageStatusRecord ssr=null; + try{ + mongo=new MongoDB(server, user, password); + ssr=mongo.update(lastUser, length, 1, lastOperation); + mongo.close(); + }catch(Exception e){ + logger.error("Problem in updating storage status record: "+e.getMessage()); + } + try{ + logger.info(" operation accounted "+lastOperation); + //call to the accounting library + report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+""); + }catch(Exception e){ + logger.error("Problem sending accounting report. Exception message: "+e.getMessage()); } -// operation=mappingOperationField(op, id, delete, lastAccess); - mongo=new MongoDB(server, user, password); - StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation); - mongo.close(); - //call to the accounting library - report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+""); - logger.info(" operation accounted "+lastOperation); logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp); }else{ logger.info("operation is not accounted: invalid scope: "+scope); @@ -132,6 +140,78 @@ public class JsonParser extends Thread{ } } } + + + public void runWithoutThread(DBObject x){ + try { + report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE); + } catch (ReportException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + report.init(); +// DBObject x=null; + MongoDB mongo=null; + try{ +// x=c.get(); + logger.info("Consumer #" + this.number + " got: " + x); + op = (String) x.get("op"); + // retrieve object fields + DBObject obj=(DBObject)x.get("o"); + retrieveObjectFields(obj); + long length=-1; + if(obj.get("length")!=null) length=(long)obj.get("length"); + logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess); + if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){ + //convert from byte to kb + length=length/1024; + String scope=null; + if((filename!=null)&& (filename.contains("/"))) + scope=retrieveScopeFromRemoteFilePath(filename); + else{ + // field added on storage manager library for retrieve scope. Used only if it is a link delete + String pathString=(String)obj.get("onScope"); + scope=retrieveScopeFromRemoteFilePath(pathString); + } + boolean validScope=ValidationUtils.validationScope(scope); + if(validScope){ + if(delete!=null){ + lastOperation="DELETE"; + }else if ((lastOperation != null) && (op != null) && (lastOperation.equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ + // it is an update on a link object this operation doesn't be accounted + logger.info("update on link object is not accounted. Skip next "); +// continue; + }else if((lastOperation==null) || (lastUser==null)){ + logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next "); + } +// operation=mappingOperationField(op, id, delete, lastAccess); + StorageStatusRecord ssr=null; + try{ + mongo=new MongoDB(server, user, password); + ssr=mongo.update(lastUser, length, 1, lastOperation); + mongo.close(); + logger.info(" operation accounted "+lastOperation); + logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp); + //call to the accounting library + report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+""); + }catch(Exception e){ + logger.error("Problem sending accounting report. Exception message: "+e.getMessage()); + } + }else{ + logger.info("operation is not accounted: invalid scope: "+scope); + } + }else{ + logger.info("operation is not accounted"); + } + + }catch(Exception e){ + e.printStackTrace(); + logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getStackTrace()); + logger.info("skip to next record "); + if(mongo!=null) + mongo.close(); + } + } private void retrieveObjectFields(DBObject obj) { filename = (String) obj.get("filename"); @@ -161,10 +241,13 @@ public class JsonParser extends Thread{ // ACCOUNTING CALL TYPE: STORAGE USAGE RawUsageRecord sur=report.setGenericProperties("storage-usage",lastUser, scope, creationTime, lastAccess, owner); sur=report.setSpecificProperties(sur, operation, length+"", filename, callerIP, "STORAGE", "1"); + logger.info("[accounting call] type: storage usage "); report.send(sur); // ACCOUNTING CALL TYPE: STORAGE STATUS + logger.debug("set properties: totVolume: "+totVolume+" totCount "+totCount); RawUsageRecord ssr=report.setGenericProperties("storage-status",lastUser, scope, creationTime, lastAccess, owner); - sur=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount); + ssr=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount); + logger.info("[accounting call] type: storage status "); report.send(ssr); } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java index 6474171..384fda2 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java @@ -14,8 +14,8 @@ public class ValidationUtils { scope=scopeBean.enclosingScope().toString(); Set scopeSet=new ServiceMapScannerMediator().getScopeKeySet(); for(String scopeItem : scopeSet){ -// System.out.println("scope scanned: "+scopeItem); - if(scope.contains(scopeItem)) + System.out.println("scope scanned: "+scopeItem); + if(scope.equals(scopeItem)) return true; } return false; diff --git a/src/test/java/org/gcube/contentmanager/storageserver/test/ValidationScopeTest.java b/src/test/java/org/gcube/contentmanager/storageserver/test/ValidationScopeTest.java index 048aca5..4f4f76f 100644 --- a/src/test/java/org/gcube/contentmanager/storageserver/test/ValidationScopeTest.java +++ b/src/test/java/org/gcube/contentmanager/storageserver/test/ValidationScopeTest.java @@ -8,7 +8,7 @@ import org.junit.Test; public class ValidationScopeTest { - private String scope="/d4science.research-infrastructures.eu/gCubeApps"; + private String scope="/d4science.research-infrastructures.eu/EUBrazilOpenBio"; @Test public void test() {