From 7aeb636052579f520b51bc508b09d822e7001e19 Mon Sep 17 00:00:00 2001 From: "roberto.cirillo" Date: Fri, 10 Oct 2014 14:32:23 +0000 Subject: [PATCH] added filter on caller dts endpoint git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@100610 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../storageserver/data/ReadingMongoOplog.java | 4 +- .../storageserver/parse/JsonParser.java | 18 +++- .../parse/utils/ValidationUtils.java | 28 +++++ .../storageserver/startup/Configuration.java | 101 ++++++++++++++---- .../storageserver/startup/Startup.java | 24 +++-- .../startup/ConfigurationTest.java | 32 ++++-- 6 files changed, 162 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java index feaf110..7323269 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java @@ -67,7 +67,7 @@ public class ReadingMongoOplog extends Thread{ cursor.addOption(Bytes.QUERYOPTION_AWAITDATA); while (cursor.hasNext()) { DBObject x = cursor.next(); - logger.info("oplog current object: "+x); + logger.debug("oplog current object: "+x); ts = (BSONTimestamp) x.get("ts"); String ns=(String)x.get("ns"); // check if discard or process the current DB record @@ -77,7 +77,7 @@ public class ReadingMongoOplog extends Thread{ // parser.runWithoutThread(x); logger.info("Producer #" + this.number + " put: " + x); }else{ - logger.info("operation is not accounted"); + logger.debug("operation is not accounted"); } }else{ logger.debug("record discarded: \t"+x); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java index 7e4a620..5e19fad 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/JsonParser.java @@ -1,5 +1,7 @@ package org.gcube.contentmanager.storageserver.parse; +import java.util.List; + import org.gcube.accounting.datamodel.RawUsageRecord; import org.gcube.contentmanager.storageserver.accounting.Report; import org.gcube.contentmanager.storageserver.accounting.ReportConfig; @@ -37,11 +39,13 @@ public class JsonParser extends Thread{ private String user; private String password; String[] server; + List dtsHosts; - public JsonParser(String[] srvs, CubbyHole c, int number){ + public JsonParser(String[] srvs, CubbyHole c, int number,List dtsHosts){ this.c=c; this.number=number; this.server=srvs; + this.dtsHosts=dtsHosts; // init the accounting report try { init(); @@ -50,10 +54,11 @@ public class JsonParser extends Thread{ } } - public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number){ + public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number, List dtsHosts){ this.c=c; this.number=number; this.server=srvs; + this.dtsHosts=dtsHosts; this.user=user; this.password=password; // init the accounting report @@ -108,6 +113,13 @@ public class JsonParser extends Thread{ }else if((lastOperation==null) || (lastUser==null)){ logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next "); } + // if the lastoperation is download and the caller is a dts host then it isn't a real user but it is a workspace accounting record + if(lastOperation != null && lastOperation.equalsIgnoreCase("DOWNLOAD")){ + for(String host: dtsHosts){ + if(host.equalsIgnoreCase(callerIp)) + lastUser="workspace.accounting"; + } + } StorageStatusRecord ssr=null; if(isNeedSSReport(lastOperation)){ try{ @@ -157,7 +169,7 @@ public class JsonParser extends Thread{ public void runWithoutThread(DBObject x){ try { - report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE); + report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE); } catch (ReportException e1) { // TODO Auto-generated catch block e1.printStackTrace(); diff --git a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java index 384fda2..cf04166 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/parse/utils/ValidationUtils.java @@ -1,13 +1,18 @@ package org.gcube.contentmanager.storageserver.parse.utils; +import java.util.ArrayList; import java.util.Set; import org.gcube.common.scope.impl.ScopeBean; import org.gcube.common.scope.impl.ScopeBean.Type; import org.gcube.common.scope.impl.ServiceMapScannerMediator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ValidationUtils { + private static final Logger logger = LoggerFactory.getLogger(ValidationUtils.class); + public static boolean validationScope(String scope){ ScopeBean scopeBean=new ScopeBean(scope); if((scopeBean.is(Type.VRE))) @@ -20,5 +25,28 @@ public class ValidationUtils { } return false; } + + public static ArrayList getVOScopes(String scope){ + ArrayList vos=new ArrayList(); + ScopeBean scopeBean=new ScopeBean(scope); + //retrieve INFRA scope + while(!scopeBean.is(Type.INFRASTRUCTURE)){ + logger.debug("the scope "+scope+" is not an INFRA scope "); + scopeBean=new ScopeBean(scopeBean.enclosingScope().toString()); + } + scope=scopeBean.toString(); + if(scopeBean.is(Type.INFRASTRUCTURE)){ + Set scopeSet=new ServiceMapScannerMediator().getScopeKeySet(); + for(String scopeItem : scopeSet){ + //retrieve all Vo scopes + System.out.println("scope scanned: "+scopeItem); + if(scopeItem.contains(scope) && (new ScopeBean(scopeItem).is(Type.VO))){ + logger.info("found vo scope: "+scopeItem); + vos.add(scopeItem); + } + } + } + return vos; + } } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java index 6ec2ed3..76fda1f 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Configuration.java @@ -2,11 +2,15 @@ package org.gcube.contentmanager.storageserver.startup; import static org.gcube.resources.discovery.icclient.ICFactory.clientFor; import static org.gcube.resources.discovery.icclient.ICFactory.queryFor; + +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import org.gcube.common.encryption.StringEncrypter; +import org.gcube.common.resources.gcore.GCoreEndpoint; +import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint; import org.gcube.common.resources.gcore.ServiceEndpoint; import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; import org.gcube.common.resources.gcore.ServiceEndpoint.Property; @@ -137,27 +141,27 @@ public class Configuration { } } - private String retrievePropertyValue(String name, String scope) { - String savedScope=null; - if(scope!=null){ - savedScope=ScopeProvider.instance.get(); - ScopeProvider.instance.set(scope); - } - SimpleQuery query = queryFor(ServiceEndpoint.class); - query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' "); - DiscoveryClient client = clientFor(ServiceEndpoint.class); - List resources = client.submit(query); - ServiceEndpoint res=resources.get(0); - Iterator it= res.profile().accessPoints().iterator(); - AccessPoint ap=(AccessPoint)it.next(); - Mapmap= ap.propertyMap(); - Property type=map.get(name); - String value=type.value(); - if(scope!=null){ - ScopeProvider.instance.set(savedScope); - } - return value; - } +// private String retrievePropertyValue(String name, String scope) { +// String savedScope=null; +// if(scope!=null){ +// savedScope=ScopeProvider.instance.get(); +// ScopeProvider.instance.set(scope); +// } +// SimpleQuery query = queryFor(ServiceEndpoint.class); +// query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' "); +// DiscoveryClient client = clientFor(ServiceEndpoint.class); +// List resources = client.submit(query); +// ServiceEndpoint res=resources.get(0); +// Iterator it= res.profile().accessPoints().iterator(); +// AccessPoint ap=(AccessPoint)it.next(); +// Mapmap= ap.propertyMap(); +// Property type=map.get(name); +// String value=type.value(); +// if(scope!=null){ +// ScopeProvider.instance.set(savedScope); +// } +// return value; +// } private String retrievePropertyValue(ServiceEndpoint res, String name) { Iterator it= res.profile().accessPoints().iterator(); @@ -169,5 +173,60 @@ public class Configuration { else return null; } + + public List retrieveDTSHosts(){ + ArrayList scopes=ValidationUtils.getVOScopes(scope); + ArrayList hosts= new ArrayList(); + for(String currentScope:scopes){ + String host=getHosts("DataTransformation", "DataTransformationService", currentScope); + System.out.println("host found: "+host+ " in scope: "+currentScope); + if(host!=null) + hosts.add(host); + + } + for(String host : hosts){ + System.out.println("DTS host: "+host); + } + return hosts; + } + + public String getHosts(String serviceClass, String serviceName) { + return getHosts(serviceClass, serviceName, scope); + } + + public String getHosts(String serviceClass, String serviceName, String scope) { + String host=null; + String currentScope=ScopeProvider.instance.get(); + ScopeProvider.instance.set(scope); + SimpleQuery query = queryFor(GCoreEndpoint.class); + query.addCondition("$resource/Profile/ServiceClass eq '"+serviceClass+"' and $resource/Profile/ServiceName eq '"+serviceName+"' "); + DiscoveryClient client = clientFor(GCoreEndpoint.class); + try{ + List resources = client.submit(query); + if(resources.size()>0){ + GCoreEndpoint res=resources.get(0); + Iterator it=res.profile().endpoints().iterator(); + if(it.hasNext()){ + Endpoint endpoint=it.next(); + host=endpoint.uri().toString(); + if(host.contains("//")){ + int begin=host.indexOf("//"); + host=host.substring(begin+2); + System.out.println("phase#1 "+ host ); + String [] uris=host.split(":"); + System.out.println("phase#2 "+uris[0]); + host=uris[0]; + } + } + ScopeProvider.instance.set(currentScope); + } + }catch(Exception e){ + logger.error("FAIL to retrieve resource from scope "+scope+" cause: "+e.getMessage()); + ScopeProvider.instance.set(currentScope); + } + + return host; + } + } diff --git a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java index f3d5458..2e5396e 100644 --- a/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java +++ b/src/main/java/org/gcube/contentmanager/storageserver/startup/Startup.java @@ -1,15 +1,10 @@ package org.gcube.contentmanager.storageserver.startup; import java.util.Arrays; -import java.util.HashMap; - -import org.gcube.common.scope.api.ScopeProvider; +import java.util.List; import org.gcube.contentmanager.storageserver.data.CubbyHole; import org.gcube.contentmanager.storageserver.data.ReadingMongoOplog; import org.gcube.contentmanager.storageserver.parse.JsonParser; -//ClaSSPATH -import java.net.URL; -import java.net.URLClassLoader; public class Startup { @@ -43,7 +38,9 @@ public class Startup { user=args[2]; password=args[3]; } - String[] server=retrieveConfiguration(); + Configuration cfg=new Configuration(scope, user, password); + String[] server=retrieveServerConfiguration(cfg); + List dtsHosts=retrieveDTSConfiguration(cfg); CubbyHole c = new CubbyHole(); ReadingMongoOplog producer=null; if(args.length == 3) @@ -52,16 +49,21 @@ public class Startup { producer=new ReadingMongoOplog( Arrays.asList(server), c, 1 ); JsonParser consumer=null; if(args.length == 3) - consumer=new JsonParser(server, c, 1); + consumer=new JsonParser(server, c, 1, dtsHosts); else - consumer=new JsonParser(server, args[1], args[2], c, 1); + consumer=new JsonParser(server, args[1], args[2], c, 1, dtsHosts); producer.start(); consumer.start(); } - private static String[] retrieveConfiguration() { - Configuration c=new Configuration(scope, user, password); + private static String[] retrieveServerConfiguration(Configuration c) { return c.getServerAccess(); } + + private static List retrieveDTSConfiguration(Configuration c){ + return c.retrieveDTSHosts(); + } + + } diff --git a/src/test/java/org/gcube/contentmanager/storageserver/startup/ConfigurationTest.java b/src/test/java/org/gcube/contentmanager/storageserver/startup/ConfigurationTest.java index 0132b39..4705f23 100644 --- a/src/test/java/org/gcube/contentmanager/storageserver/startup/ConfigurationTest.java +++ b/src/test/java/org/gcube/contentmanager/storageserver/startup/ConfigurationTest.java @@ -1,21 +1,37 @@ package org.gcube.contentmanager.storageserver.startup; -import static org.junit.Assert.*; -import junit.framework.Assert; - +import java.util.List; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; public class ConfigurationTest { - String scope="/gcube/devsec"; - String user=null; - String password=null; + static String scope="/d4science.research-infrastructures.eu/FARM"; + static String user=null; + static String password=null; + static String serviceClass="DataTransformation"; + static String serviceName="DataTransformationService"; + static Configuration c=null; + + @BeforeClass + public static void init(){ + c=new Configuration(scope, user, password); + } @Test - public void test() { - Configuration c=new Configuration(scope, user, password); + public void serverAccess() { Assert.assertNotNull(c.getServerAccess()); } + + + @Test + public void getDTSHostsTest(){ + List hosts=c.retrieveDTSHosts(); + for (String host:hosts){ + System.out.println("host: "+host); + } + } + }