bug fix on catch in in UserAccountingConsumer. Upgrade to version 1.6.0-SNAP

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@134643 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
roberto.cirillo 2016-11-23 17:36:31 +00:00
parent a003a73bc3
commit 89459ce212
7 changed files with 78 additions and 74 deletions

15
pom.xml
View File

@ -14,6 +14,17 @@
<developerConnection>scm:svn:https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/${project.artifactId}</developerConnection>
<url>http://svn.d4science.research-infrastructures.eu/public/d4science/gcube/trunk/content-management/${project.artifactId}</url>
</scm>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.gcube.distribution</groupId>
<artifactId>gcube-bom</artifactId>
<version>LATEST</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<distroDirectory>${project.basedir}/distro</distroDirectory>
</properties>
@ -74,6 +85,10 @@
<artifactId>accounting-lib</artifactId>
<version>[2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -64,14 +64,14 @@ import org.slf4j.LoggerFactory;
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug("generic fields completed ");
logger.trace("generic fields completed ");
return sur;
}
protected StorageUsageRecord setProviderUri(StorageUsageRecord sur, String resourceScope) throws InvalidValueException, URISyntaxException {
logger.debug("retrieve root scope from resourceScope "+resourceScope);
logger.trace("retrieve root scope from resourceScope "+resourceScope);
String rootScope= resourceScope.substring(1);
logger.debug("rootScope extrapolated is: "+rootScope);
logger.trace("rootScope extrapolated is: "+rootScope);
if(sur == null)
sur=new StorageUsageRecord();
sur.setProviderURI(new URI(buildProviderURI(resourceScope)));
@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
if(rootScope.indexOf("/") != -1){
int i=rootScope.indexOf("/");
rootScope=rootScope.substring(0, i);
logger.debug("removed sub scopes. rootScope: "+rootScope);
logger.trace("removed sub scopes. rootScope: "+rootScope);
}
providerUri="data."+rootScope+".org";
logger.debug("set provider uri: "+providerUri);
@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
@Override
public StorageUsageRecord setSpecificProperties(StorageUsageRecord sur, String filePath,
String dataType, String callerIP, String id) {
logger.trace("set accounting properties: remotePath: "+filePath+" dataType "+dataType+" callerIP "+callerIP+" resoruceURI "+id);
logger.trace("set accounting properties: remotePath: "+filePath+" dataType "+dataType+" callerIP "+callerIP+" resourceURI "+id);
if(sur==null) sur = new StorageUsageRecord();
try {
if(filePath != null){
@ -172,7 +172,7 @@ import org.slf4j.LoggerFactory;
}
logger.info(" report send: \n\t"+sur);
}else
logger.error("Problem on building accounting record: Factory Object is null ");
logger.error("Problem on building accounting record: Accounting Object is null ");
}
public String getProviderUri() {

View File

@ -69,14 +69,14 @@ public class UserAccountingConsumer extends Thread{
}
public void run() {
logger.debug("Consuming SU record started ");
logger.debug("Consuming SU started ");
MongoDB mongo=null;
while(true){
DBObject x=null;
MongoDB mongo=null;
DBObject x=null;
try{
logger.debug("SU waiting next record... ");
logger.debug("SU waiting next record...");
x=c.get();
logger.info("Consumer #" + this.number + " got: " + x.get("_id"));
logger.debug("Consumer #" + this.number + " got: " + x );
//get operation
op = (String) x.get("op");
@ -85,26 +85,28 @@ public class UserAccountingConsumer extends Thread{
OpLogRemoteObject record=retrieveObjectFields(obj);
// set object dimension
logger.debug("[recordCheck] operation: "+op+"\n\t name: "+record.getName()+"\n\t type: "+record.getType()+"\n\t path: "+record.getFilename()+"\n\t dir Path: "+record.getDir()+"\n\t length: "+record.getLength()+"\n\t owner: "+record.getOwner()+ "\n\t lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+"\n\t lastAccess: "+record.getLastAccess());
logger.debug("[recordCheck] operation: "+op+" name: "+record.getName()+" type: "+record.getType()+" path: "+record.getFilename()+" dir Path: "+record.getDir()+" length: "+record.getLength()+" owner: "+record.getOwner()+ " lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+" lastAccess: "+record.getLastAccess());
if(((record.getLength() >0) && (((record.getFilename() !=null) && (record.getFilename().length()>0) && (record.getDir().length()>0)&& (record.getDir().contains("/"))) || (record.getLinkCount() > 0)))){
String id=((DBObject)x.get("o")).get("_id").toString();
logger.debug("Consumer id " + id );
//convert from byte to kb
record.setLength(record.getLength()/1024);
// check scope
String scope=null;
String pathString=(String)obj.get("onScope");
logger.debug("[recordCheck] pathString value: "+pathString);
logger.debug("[recordCheck] it is a link to: "+pathString);
if((record.getDir()!=null)&& (record.getDir().contains("/"))){
scope=retrieveScopeFromRemoteFilePath(record.getDir());
logger.debug("[recordCheck] scope retrieved: "+scope);
}else{
// field added on storage manager library for retrieve scope. Used only if it is a link delete
scope=retrieveScopeFromRemoteFilePath(pathString);
}
logger.debug("scope retrieved ");
boolean validScope=ValidationUtils.validationScope(scope);
if(validScope){
if(record.getDelete() != null){
record.setLastOperation("DELETE");
}else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){
}else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ //add check on operation
// it is an update on a link object this operation doesn't be accounted
logger.info("[recordCheck] update on link object is not accounted. Skip next ");
continue;
@ -122,14 +124,19 @@ public class UserAccountingConsumer extends Thread{
logger.debug("[recordCheck] the caller is dts service: caller "+record.getCallerIp()+ " dts host: "+host+" the new user is: "+record.getLastUser());
}
}
}else if((record.getLastOperation() != null) && (record.getLastOperation().equalsIgnoreCase("UPLOAD")) && op.equalsIgnoreCase("u")){
//it is only a metadata update on a previous insert so it is not need to send the record
logger.info("[recordCheck] metadata update on a previous insert. Record discarded: "+id);
continue;
}
}
logger.debug("[recordCheck] operation accounted "+record.getLastOperation());
logger.debug("[recordCheck] accounting operation "+record.getLastOperation());
StorageStatusObject ssr=null;
if(isNeedSSReport(record.getLastOperation())){
try{
logger.debug("[recordCheck] update SS record");
mongo=new MongoDB(server, user, password);
logger.debug("[recordCheck] update SS record yet");
if (mongo== null)
mongo=new MongoDB(server, user, password);
if(record.getLastOperation().equalsIgnoreCase("COPY"))
record.setOwner(record.getLastUser());
ssr=new StorageStatusObject(record.getOwner(), record.getLength(), 1);
@ -159,11 +166,12 @@ public class UserAccountingConsumer extends Thread{
}
}catch(Exception e){
logger.error(" "+e.getCause().getLocalizedMessage());
// logger.error(" CATCHED EXCEPTION "+e.getCause().getLocalizedMessage());
logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getMessage());
logger.info("skip to next record ");
if(mongo!=null)
mongo.close();
}
}
}
@ -174,7 +182,7 @@ public class UserAccountingConsumer extends Thread{
* @return
*/
private boolean isNeedSSReport(String lastOperation) {
logger.debug("Last operation is "+lastOperation);
logger.trace("Last operation is "+lastOperation);
if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("DELETE"))
return true;
return false;
@ -241,9 +249,10 @@ public class UserAccountingConsumer extends Thread{
i++;
while((!split[i].equals("home")) && (!split[i].equals("public"))){
scope=scope+"/"+split[i];
logger.debug("scope building: "+scope);
i++;
}
logger.debug("retieved scope: "+scope);
logger.debug("[recordCheck] scope retrieved: "+scope);
return scope;
}else logger.error("Scope bad format: scope not retrieved from string: "+filename);
return null;

View File

@ -26,7 +26,7 @@ public class CubbyHole {
}
}
DBObject value=requestQueue.remove(0);
logger.debug("get element from queue: "+value.get("_id"));
logger.debug("get element from queue: "+value);
available = false;
notifyAll();
return value;
@ -40,7 +40,7 @@ public class CubbyHole {
} catch (InterruptedException e) {
}
}
logger.debug("put element to queue: "+value.get("_id"));
logger.debug("put element to queue: "+value);
requestQueue.addElement(value);
available = true;
notifyAll();

View File

@ -71,35 +71,36 @@ public class ReadingMongoOplog extends Thread{
DBCursor cursor = oplog.find(new BasicDBObject("ts", new BasicDBObject("$gt", ts)));
cursor.addOption(Bytes.QUERYOPTION_TAILABLE);
cursor.addOption(Bytes.QUERYOPTION_AWAITDATA);
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT);
while (cursor.hasNext()) {
DBObject x = cursor.next();
logger.debug("oplog current object: "+x);
ts = (BSONTimestamp) x.get("ts");
String ns=(String)x.get("ns");
// check if discard or process the current DB record
if((x.get("o2") != null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
if(x.containsField("o")){
// c1 buffer for suer accounting
c1.put(x);
// c2 buffer for folder accounting (TODO)
if(c2 !=null)
c2.put(x);
// parser.runWithoutThread(x);
logger.debug("Producer #" + this.number + " put: " + x.get("_id"));
}else{
logger.debug("operation is not accounted");
}
}else{
logger.debug("record discarded: \t"+x.get("_id"));
}
try{
DBObject x = cursor.next();
logger.debug("oplog current object: "+x);
ts = (BSONTimestamp) x.get("ts");
String ns=(String)x.get("ns");
// check if discard or process the current DB record
if((x.get("o2") != null) || (ns.equalsIgnoreCase(DBNAME+".fs.files"))){
if(x.containsField("o")){ // add check on operation type
// c1 buffer for suer accounting
c1.put(x);
// c2 buffer for folder accounting (TODO)
if(c2 !=null)
c2.put(x);
// parser.runWithoutThread(x);
}else{
logger.debug("operation is not accounted");
}
}else{
logger.debug("object discarded ");
}
}catch(Exception e){
logger.error("Exception throws: "+e.getMessage());
e.printStackTrace();
}
logger.debug("...waiting new object... ");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
logger.warn("out of the cycle ");
}
}

View File

@ -14,13 +14,13 @@ public class ValidationUtils {
private static final Logger logger = LoggerFactory.getLogger(ValidationUtils.class);
public static boolean validationScope(String scope){
logger.info("scope Validation for scope "+scope);
logger.debug("scope Validation for scope "+scope);
ScopeBean scopeBean=new ScopeBean(scope);
if((scopeBean.is(Type.VRE)))
scope=scopeBean.enclosingScope().toString();
Set<String> scopeSet=new ServiceMapScannerMediator().getScopeKeySet();
for(String scopeItem : scopeSet){
logger.info("scope scanned: "+scopeItem);
logger.trace("scope scanned: "+scopeItem);
if(scope.equals(scopeItem))
return true;
}

View File

@ -45,6 +45,7 @@ public class Configuration {
this.scope=scope;
if(!ValidationUtils.validationScope(scope))
throw new IllegalArgumentException("invalid scope exception: "+scope);
ScopeProvider.instance.set(scope);
}
@ -152,28 +153,6 @@ public class Configuration {
}
}
// private String retrievePropertyValue(String name, String scope) {
// String savedScope=null;
// if(scope!=null){
// savedScope=ScopeProvider.instance.get();
// ScopeProvider.instance.set(scope);
// }
// SimpleQuery query = queryFor(ServiceEndpoint.class);
// query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' ");
// DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
// List<ServiceEndpoint> resources = client.submit(query);
// ServiceEndpoint res=resources.get(0);
// Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
// AccessPoint ap=(AccessPoint)it.next();
// Map<String, Property>map= ap.propertyMap();
// Property type=map.get(name);
// String value=type.value();
// if(scope!=null){
// ScopeProvider.instance.set(savedScope);
// }
// return value;
// }
private String retrievePropertyValue(ServiceEndpoint res, String name) {
Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
AccessPoint ap=(AccessPoint)it.next();