From 89459ce212214162bd42a99821378b39ad50a0d8 Mon Sep 17 00:00:00 2001 From: "roberto.cirillo" Date: Wed, 23 Nov 2016 17:36:31 +0000 Subject: [PATCH] bug fix on catch in in UserAccountingConsumer. Upgrade to version 1.6.0-SNAP git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@134643 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 15 +++++ .../accounting/ReportAccountingImpl.java | 12 ++-- .../consumer/UserAccountingConsumer.java | 39 ++++++++----- .../storageserver/data/CubbyHole.java | 4 +- .../storageserver/data/ReadingMongoOplog.java | 55 ++++++++++--------- .../parse/utils/ValidationUtils.java | 4 +- .../storageserver/startup/Configuration.java | 23 +------- 7 files changed, 78 insertions(+), 74 deletions(-) diff --git a/pom.xml b/pom.xml index 9393c80..de18497 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,17 @@ scm:svn:https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId} http://svn.d4science.research-infrastructures.eu/public/d4science/gcube/trunk/content-management/${project.artifactId} + + + + org.gcube.distribution + gcube-bom + LATEST + pom + import + + + ${project.basedir}/distro @@ -74,6 +85,10 @@ accounting-lib [2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT) + + org.gcube.common + authorization-client + diff --git a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java index ebfaf54..42990b6 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/accounting/ReportAccountingImpl.java @@ -64,14 +64,14 @@ import org.slf4j.LoggerFactory; // TODO Auto-generated catch block e.printStackTrace(); } - logger.debug("generic fields completed "); + logger.trace("generic fields completed "); return sur; } protected StorageUsageRecord setProviderUri(StorageUsageRecord sur, String resourceScope) throws InvalidValueException, URISyntaxException { - logger.debug("retrieve root scope from resourceScope "+resourceScope); + logger.trace("retrieve root scope from resourceScope "+resourceScope); String rootScope= resourceScope.substring(1); - logger.debug("rootScope extrapolated is: "+rootScope); + logger.trace("rootScope extrapolated is: "+rootScope); if(sur == null) sur=new StorageUsageRecord(); sur.setProviderURI(new URI(buildProviderURI(resourceScope))); @@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory; if(rootScope.indexOf("/") != -1){ int i=rootScope.indexOf("/"); rootScope=rootScope.substring(0, i); - logger.debug("removed sub scopes. rootScope: "+rootScope); + logger.trace("removed sub scopes. rootScope: "+rootScope); } providerUri="data."+rootScope+".org"; logger.debug("set provider uri: "+providerUri); @@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory; @Override public StorageUsageRecord setSpecificProperties(StorageUsageRecord sur, String filePath, String dataType, String callerIP, String id) { - logger.trace("set accounting properties: remotePath: "+filePath+" dataType "+dataType+" callerIP "+callerIP+" resoruceURI "+id); + logger.trace("set accounting properties: remotePath: "+filePath+" dataType "+dataType+" callerIP "+callerIP+" resourceURI "+id); if(sur==null) sur = new StorageUsageRecord(); try { if(filePath != null){ @@ -172,7 +172,7 @@ import org.slf4j.LoggerFactory; } logger.info(" report send: \n\t"+sur); }else - logger.error("Problem on building accounting record: Factory Object is null "); + logger.error("Problem on building accounting record: Accounting Object is null "); } public String getProviderUri() { diff --git a/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java b/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java index 1de2945..16afb22 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/consumer/UserAccountingConsumer.java @@ -69,14 +69,14 @@ public class UserAccountingConsumer extends Thread{ } public void run() { - logger.debug("Consuming SU record started "); + logger.debug("Consuming SU started "); + MongoDB mongo=null; while(true){ - DBObject x=null; - MongoDB mongo=null; + DBObject x=null; try{ - logger.debug("SU waiting next record... "); + logger.debug("SU waiting next record..."); x=c.get(); - logger.info("Consumer #" + this.number + " got: " + x.get("_id")); + logger.debug("Consumer #" + this.number + " got: " + x ); //get operation op = (String) x.get("op"); @@ -85,26 +85,28 @@ public class UserAccountingConsumer extends Thread{ OpLogRemoteObject record=retrieveObjectFields(obj); // set object dimension - logger.debug("[recordCheck] operation: "+op+"\n\t name: "+record.getName()+"\n\t type: "+record.getType()+"\n\t path: "+record.getFilename()+"\n\t dir Path: "+record.getDir()+"\n\t length: "+record.getLength()+"\n\t owner: "+record.getOwner()+ "\n\t lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+"\n\t lastAccess: "+record.getLastAccess()); + logger.debug("[recordCheck] operation: "+op+" name: "+record.getName()+" type: "+record.getType()+" path: "+record.getFilename()+" dir Path: "+record.getDir()+" length: "+record.getLength()+" owner: "+record.getOwner()+ " lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+" lastAccess: "+record.getLastAccess()); if(((record.getLength() >0) && (((record.getFilename() !=null) && (record.getFilename().length()>0) && (record.getDir().length()>0)&& (record.getDir().contains("/"))) || (record.getLinkCount() > 0)))){ + String id=((DBObject)x.get("o")).get("_id").toString(); + logger.debug("Consumer id " + id ); //convert from byte to kb record.setLength(record.getLength()/1024); // check scope String scope=null; String pathString=(String)obj.get("onScope"); - logger.debug("[recordCheck] pathString value: "+pathString); + logger.debug("[recordCheck] it is a link to: "+pathString); if((record.getDir()!=null)&& (record.getDir().contains("/"))){ scope=retrieveScopeFromRemoteFilePath(record.getDir()); - logger.debug("[recordCheck] scope retrieved: "+scope); }else{ // field added on storage manager library for retrieve scope. Used only if it is a link delete scope=retrieveScopeFromRemoteFilePath(pathString); } + logger.debug("scope retrieved "); boolean validScope=ValidationUtils.validationScope(scope); if(validScope){ if(record.getDelete() != null){ record.setLastOperation("DELETE"); - }else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ + }else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ //add check on operation // it is an update on a link object this operation doesn't be accounted logger.info("[recordCheck] update on link object is not accounted. Skip next "); continue; @@ -122,14 +124,19 @@ public class UserAccountingConsumer extends Thread{ logger.debug("[recordCheck] the caller is dts service: caller "+record.getCallerIp()+ " dts host: "+host+" the new user is: "+record.getLastUser()); } } + }else if((record.getLastOperation() != null) && (record.getLastOperation().equalsIgnoreCase("UPLOAD")) && op.equalsIgnoreCase("u")){ + //it is only a metadata update on a previous insert so it is not need to send the record + logger.info("[recordCheck] metadata update on a previous insert. Record discarded: "+id); + continue; } } - logger.debug("[recordCheck] operation accounted "+record.getLastOperation()); + logger.debug("[recordCheck] accounting operation "+record.getLastOperation()); StorageStatusObject ssr=null; if(isNeedSSReport(record.getLastOperation())){ try{ - logger.debug("[recordCheck] update SS record"); - mongo=new MongoDB(server, user, password); + logger.debug("[recordCheck] update SS record yet"); + if (mongo== null) + mongo=new MongoDB(server, user, password); if(record.getLastOperation().equalsIgnoreCase("COPY")) record.setOwner(record.getLastUser()); ssr=new StorageStatusObject(record.getOwner(), record.getLength(), 1); @@ -159,11 +166,12 @@ public class UserAccountingConsumer extends Thread{ } }catch(Exception e){ - logger.error(" "+e.getCause().getLocalizedMessage()); +// logger.error(" CATCHED EXCEPTION "+e.getCause().getLocalizedMessage()); logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getMessage()); logger.info("skip to next record "); if(mongo!=null) mongo.close(); + } } } @@ -174,7 +182,7 @@ public class UserAccountingConsumer extends Thread{ * @return */ private boolean isNeedSSReport(String lastOperation) { - logger.debug("Last operation is "+lastOperation); + logger.trace("Last operation is "+lastOperation); if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE")) return true; return false; @@ -241,9 +249,10 @@ public class UserAccountingConsumer extends Thread{ i++; while((!split[i].equals("home")) && (!split[i].equals("public"))){ scope=scope+"/"+split[i]; + logger.debug("scope building: "+scope); i++; } - logger.debug("retieved scope: "+scope); + logger.debug("[recordCheck] scope retrieved: "+scope); return scope; }else logger.error("Scope bad format: scope not retrieved from string: "+filename); return null; diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/CubbyHole.java b/src/main/java/org/gcube/contentmanager/storageserver/data/CubbyHole.java index a2a7e12..e4e4ac0 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/CubbyHole.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/CubbyHole.java @@ -26,7 +26,7 @@ public class CubbyHole { } } DBObject value=requestQueue.remove(0); - logger.debug("get element from queue: "+value.get("_id")); + logger.debug("get element from queue: "+value); available = false; notifyAll(); return value; @@ -40,7 +40,7 @@ public class CubbyHole { } catch (InterruptedException e) { } } - logger.debug("put element to queue: "+value.get("_id")); + logger.debug("put element to queue: "+value); requestQueue.addElement(value); available = true; notifyAll(); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java index 83866ec..6e6b24d 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java @@ -71,35 +71,36 @@ public class ReadingMongoOplog extends Thread{ DBCursor cursor = oplog.find(new BasicDBObject("ts", new BasicDBObject("$gt", ts))); cursor.addOption(Bytes.QUERYOPTION_TAILABLE); cursor.addOption(Bytes.QUERYOPTION_AWAITDATA); + cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT); while (cursor.hasNext()) { - DBObject x = cursor.next(); - logger.debug("oplog current object: "+x); - ts = (BSONTimestamp) x.get("ts"); - String ns=(String)x.get("ns"); - // check if discard or process the current DB record - if((x.get("o2") != null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){ - if(x.containsField("o")){ - // c1 buffer for suer accounting - c1.put(x); - // c2 buffer for folder accounting (TODO) - if(c2 !=null) - c2.put(x); -// parser.runWithoutThread(x); - logger.debug("Producer #" + this.number + " put: " + x.get("_id")); - }else{ - logger.debug("operation is not accounted"); - } - }else{ - logger.debug("record discarded: \t"+x.get("_id")); - } + try{ + DBObject x = cursor.next(); + logger.debug("oplog current object: "+x); + ts = (BSONTimestamp) x.get("ts"); + String ns=(String)x.get("ns"); + // check if discard or process the current DB record + if((x.get("o2") != null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){ + if(x.containsField("o")){ // add check on operation type + // c1 buffer for suer accounting + c1.put(x); + // c2 buffer for folder accounting (TODO) + if(c2 !=null) + c2.put(x); + // parser.runWithoutThread(x); + + }else{ + logger.debug("operation is not accounted"); + } + }else{ + logger.debug("object discarded "); + } + }catch(Exception e){ + logger.error("Exception throws: "+e.getMessage()); + e.printStackTrace(); + } + logger.debug("...waiting new object... "); } - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - + logger.warn("out of the cycle "); } } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java index 779f7b6..b31e2e7 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java @@ -14,13 +14,13 @@ public class ValidationUtils { private static final Logger logger = LoggerFactory.getLogger(ValidationUtils.class); public static boolean validationScope(String scope){ - logger.info("scope Validation for scope "+scope); + logger.debug("scope Validation for scope "+scope); ScopeBean scopeBean=new ScopeBean(scope); if((scopeBean.is(Type.VRE))) scope=scopeBean.enclosingScope().toString(); Set scopeSet=new ServiceMapScannerMediator().getScopeKeySet(); for(String scopeItem : scopeSet){ - logger.info("scope scanned: "+scopeItem); + logger.trace("scope scanned: "+scopeItem); if(scope.equals(scopeItem)) return true; } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java index 084c762..ce0cffd 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java @@ -45,6 +45,7 @@ public class Configuration { this.scope=scope; if(!ValidationUtils.validationScope(scope)) throw new IllegalArgumentException("invalid scope exception: "+scope); + ScopeProvider.instance.set(scope); } @@ -152,28 +153,6 @@ public class Configuration { } } -// private String retrievePropertyValue(String name, String scope) { -// String savedScope=null; -// if(scope!=null){ -// savedScope=ScopeProvider.instance.get(); -// ScopeProvider.instance.set(scope); -// } -// SimpleQuery query = queryFor(ServiceEndpoint.class); -// query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' "); -// DiscoveryClient client = clientFor(ServiceEndpoint.class); -// List resources = client.submit(query); -// ServiceEndpoint res=resources.get(0); -// Iterator it= res.profile().accessPoints().iterator(); -// AccessPoint ap=(AccessPoint)it.next(); -// Mapmap= ap.propertyMap(); -// Property type=map.get(name); -// String value=type.value(); -// if(scope!=null){ -// ScopeProvider.instance.set(savedScope); -// } -// return value; -// } - private String retrievePropertyValue(ServiceEndpoint res, String name) { Iterator it= res.profile().accessPoints().iterator(); AccessPoint ap=(AccessPoint)it.next();