added filter on caller dts endpoint
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@100610 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
23561c3a80
commit
7aeb636052
|
@ -67,7 +67,7 @@ public class ReadingMongoOplog extends Thread{
|
||||||
cursor.addOption(Bytes.QUERYOPTION_AWAITDATA);
|
cursor.addOption(Bytes.QUERYOPTION_AWAITDATA);
|
||||||
while (cursor.hasNext()) {
|
while (cursor.hasNext()) {
|
||||||
DBObject x = cursor.next();
|
DBObject x = cursor.next();
|
||||||
logger.info("oplog current object: "+x);
|
logger.debug("oplog current object: "+x);
|
||||||
ts = (BSONTimestamp) x.get("ts");
|
ts = (BSONTimestamp) x.get("ts");
|
||||||
String ns=(String)x.get("ns");
|
String ns=(String)x.get("ns");
|
||||||
// check if discard or process the current DB record
|
// check if discard or process the current DB record
|
||||||
|
@ -77,7 +77,7 @@ public class ReadingMongoOplog extends Thread{
|
||||||
// parser.runWithoutThread(x);
|
// parser.runWithoutThread(x);
|
||||||
logger.info("Producer #" + this.number + " put: " + x);
|
logger.info("Producer #" + this.number + " put: " + x);
|
||||||
}else{
|
}else{
|
||||||
logger.info("operation is not accounted");
|
logger.debug("operation is not accounted");
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
logger.debug("record discarded: \t"+x);
|
logger.debug("record discarded: \t"+x);
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package org.gcube.contentmanager.storageserver.parse;
|
package org.gcube.contentmanager.storageserver.parse;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.gcube.accounting.datamodel.RawUsageRecord;
|
import org.gcube.accounting.datamodel.RawUsageRecord;
|
||||||
import org.gcube.contentmanager.storageserver.accounting.Report;
|
import org.gcube.contentmanager.storageserver.accounting.Report;
|
||||||
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
|
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
|
||||||
|
@ -37,11 +39,13 @@ public class JsonParser extends Thread{
|
||||||
private String user;
|
private String user;
|
||||||
private String password;
|
private String password;
|
||||||
String[] server;
|
String[] server;
|
||||||
|
List<String> dtsHosts;
|
||||||
|
|
||||||
public JsonParser(String[] srvs, CubbyHole c, int number){
|
public JsonParser(String[] srvs, CubbyHole c, int number,List<String> dtsHosts){
|
||||||
this.c=c;
|
this.c=c;
|
||||||
this.number=number;
|
this.number=number;
|
||||||
this.server=srvs;
|
this.server=srvs;
|
||||||
|
this.dtsHosts=dtsHosts;
|
||||||
// init the accounting report
|
// init the accounting report
|
||||||
try {
|
try {
|
||||||
init();
|
init();
|
||||||
|
@ -50,10 +54,11 @@ public class JsonParser extends Thread{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number){
|
public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number, List<String> dtsHosts){
|
||||||
this.c=c;
|
this.c=c;
|
||||||
this.number=number;
|
this.number=number;
|
||||||
this.server=srvs;
|
this.server=srvs;
|
||||||
|
this.dtsHosts=dtsHosts;
|
||||||
this.user=user;
|
this.user=user;
|
||||||
this.password=password;
|
this.password=password;
|
||||||
// init the accounting report
|
// init the accounting report
|
||||||
|
@ -108,6 +113,13 @@ public class JsonParser extends Thread{
|
||||||
}else if((lastOperation==null) || (lastUser==null)){
|
}else if((lastOperation==null) || (lastUser==null)){
|
||||||
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
|
logger.warn("lastOperation: "+lastOperation+" lastUser: "+lastUser+". These values cannot be null. Skip next ");
|
||||||
}
|
}
|
||||||
|
// if the lastoperation is download and the caller is a dts host then it isn't a real user but it is a workspace accounting record
|
||||||
|
if(lastOperation != null && lastOperation.equalsIgnoreCase("DOWNLOAD")){
|
||||||
|
for(String host: dtsHosts){
|
||||||
|
if(host.equalsIgnoreCase(callerIp))
|
||||||
|
lastUser="workspace.accounting";
|
||||||
|
}
|
||||||
|
}
|
||||||
StorageStatusRecord ssr=null;
|
StorageStatusRecord ssr=null;
|
||||||
if(isNeedSSReport(lastOperation)){
|
if(isNeedSSReport(lastOperation)){
|
||||||
try{
|
try{
|
||||||
|
@ -157,7 +169,7 @@ public class JsonParser extends Thread{
|
||||||
|
|
||||||
public void runWithoutThread(DBObject x){
|
public void runWithoutThread(DBObject x){
|
||||||
try {
|
try {
|
||||||
report=new ReportFactory().getReport(ReportConfig.ACCOUNTING_TYPE);
|
report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE);
|
||||||
} catch (ReportException e1) {
|
} catch (ReportException e1) {
|
||||||
// TODO Auto-generated catch block
|
// TODO Auto-generated catch block
|
||||||
e1.printStackTrace();
|
e1.printStackTrace();
|
||||||
|
|
|
@ -1,13 +1,18 @@
|
||||||
package org.gcube.contentmanager.storageserver.parse.utils;
|
package org.gcube.contentmanager.storageserver.parse.utils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.gcube.common.scope.impl.ScopeBean;
|
import org.gcube.common.scope.impl.ScopeBean;
|
||||||
import org.gcube.common.scope.impl.ScopeBean.Type;
|
import org.gcube.common.scope.impl.ScopeBean.Type;
|
||||||
import org.gcube.common.scope.impl.ServiceMapScannerMediator;
|
import org.gcube.common.scope.impl.ServiceMapScannerMediator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class ValidationUtils {
|
public class ValidationUtils {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ValidationUtils.class);
|
||||||
|
|
||||||
public static boolean validationScope(String scope){
|
public static boolean validationScope(String scope){
|
||||||
ScopeBean scopeBean=new ScopeBean(scope);
|
ScopeBean scopeBean=new ScopeBean(scope);
|
||||||
if((scopeBean.is(Type.VRE)))
|
if((scopeBean.is(Type.VRE)))
|
||||||
|
@ -20,5 +25,28 @@ public class ValidationUtils {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ArrayList<String> getVOScopes(String scope){
|
||||||
|
ArrayList<String> vos=new ArrayList<String>();
|
||||||
|
ScopeBean scopeBean=new ScopeBean(scope);
|
||||||
|
//retrieve INFRA scope
|
||||||
|
while(!scopeBean.is(Type.INFRASTRUCTURE)){
|
||||||
|
logger.debug("the scope "+scope+" is not an INFRA scope ");
|
||||||
|
scopeBean=new ScopeBean(scopeBean.enclosingScope().toString());
|
||||||
|
}
|
||||||
|
scope=scopeBean.toString();
|
||||||
|
if(scopeBean.is(Type.INFRASTRUCTURE)){
|
||||||
|
Set<String> scopeSet=new ServiceMapScannerMediator().getScopeKeySet();
|
||||||
|
for(String scopeItem : scopeSet){
|
||||||
|
//retrieve all Vo scopes
|
||||||
|
System.out.println("scope scanned: "+scopeItem);
|
||||||
|
if(scopeItem.contains(scope) && (new ScopeBean(scopeItem).is(Type.VO))){
|
||||||
|
logger.info("found vo scope: "+scopeItem);
|
||||||
|
vos.add(scopeItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return vos;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,15 @@ package org.gcube.contentmanager.storageserver.startup;
|
||||||
|
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.clientFor;
|
import static org.gcube.resources.discovery.icclient.ICFactory.clientFor;
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
|
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.gcube.common.encryption.StringEncrypter;
|
import org.gcube.common.encryption.StringEncrypter;
|
||||||
|
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
||||||
|
import org.gcube.common.resources.gcore.GCoreEndpoint.Profile.Endpoint;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
|
import org.gcube.common.resources.gcore.ServiceEndpoint.Property;
|
||||||
|
@ -137,27 +141,27 @@ public class Configuration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String retrievePropertyValue(String name, String scope) {
|
// private String retrievePropertyValue(String name, String scope) {
|
||||||
String savedScope=null;
|
// String savedScope=null;
|
||||||
if(scope!=null){
|
// if(scope!=null){
|
||||||
savedScope=ScopeProvider.instance.get();
|
// savedScope=ScopeProvider.instance.get();
|
||||||
ScopeProvider.instance.set(scope);
|
// ScopeProvider.instance.set(scope);
|
||||||
}
|
// }
|
||||||
SimpleQuery query = queryFor(ServiceEndpoint.class);
|
// SimpleQuery query = queryFor(ServiceEndpoint.class);
|
||||||
query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' ");
|
// query.addCondition("$resource/Profile/Category/text() eq 'DataStorage' and $resource/Profile/Name eq 'StorageManager' ");
|
||||||
DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
|
// DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
|
||||||
List<ServiceEndpoint> resources = client.submit(query);
|
// List<ServiceEndpoint> resources = client.submit(query);
|
||||||
ServiceEndpoint res=resources.get(0);
|
// ServiceEndpoint res=resources.get(0);
|
||||||
Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
|
// Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
|
||||||
AccessPoint ap=(AccessPoint)it.next();
|
// AccessPoint ap=(AccessPoint)it.next();
|
||||||
Map<String, Property>map= ap.propertyMap();
|
// Map<String, Property>map= ap.propertyMap();
|
||||||
Property type=map.get(name);
|
// Property type=map.get(name);
|
||||||
String value=type.value();
|
// String value=type.value();
|
||||||
if(scope!=null){
|
// if(scope!=null){
|
||||||
ScopeProvider.instance.set(savedScope);
|
// ScopeProvider.instance.set(savedScope);
|
||||||
}
|
// }
|
||||||
return value;
|
// return value;
|
||||||
}
|
// }
|
||||||
|
|
||||||
private String retrievePropertyValue(ServiceEndpoint res, String name) {
|
private String retrievePropertyValue(ServiceEndpoint res, String name) {
|
||||||
Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
|
Iterator<AccessPoint> it= res.profile().accessPoints().iterator();
|
||||||
|
@ -169,5 +173,60 @@ public class Configuration {
|
||||||
else
|
else
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<String> retrieveDTSHosts(){
|
||||||
|
ArrayList<String> scopes=ValidationUtils.getVOScopes(scope);
|
||||||
|
ArrayList<String> hosts= new ArrayList<String>();
|
||||||
|
for(String currentScope:scopes){
|
||||||
|
String host=getHosts("DataTransformation", "DataTransformationService", currentScope);
|
||||||
|
System.out.println("host found: "+host+ " in scope: "+currentScope);
|
||||||
|
if(host!=null)
|
||||||
|
hosts.add(host);
|
||||||
|
|
||||||
|
}
|
||||||
|
for(String host : hosts){
|
||||||
|
System.out.println("DTS host: "+host);
|
||||||
|
}
|
||||||
|
return hosts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHosts(String serviceClass, String serviceName) {
|
||||||
|
return getHosts(serviceClass, serviceName, scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHosts(String serviceClass, String serviceName, String scope) {
|
||||||
|
String host=null;
|
||||||
|
String currentScope=ScopeProvider.instance.get();
|
||||||
|
ScopeProvider.instance.set(scope);
|
||||||
|
SimpleQuery query = queryFor(GCoreEndpoint.class);
|
||||||
|
query.addCondition("$resource/Profile/ServiceClass eq '"+serviceClass+"' and $resource/Profile/ServiceName eq '"+serviceName+"' ");
|
||||||
|
DiscoveryClient<GCoreEndpoint> client = clientFor(GCoreEndpoint.class);
|
||||||
|
try{
|
||||||
|
List<GCoreEndpoint> resources = client.submit(query);
|
||||||
|
if(resources.size()>0){
|
||||||
|
GCoreEndpoint res=resources.get(0);
|
||||||
|
Iterator<Endpoint> it=res.profile().endpoints().iterator();
|
||||||
|
if(it.hasNext()){
|
||||||
|
Endpoint endpoint=it.next();
|
||||||
|
host=endpoint.uri().toString();
|
||||||
|
if(host.contains("//")){
|
||||||
|
int begin=host.indexOf("//");
|
||||||
|
host=host.substring(begin+2);
|
||||||
|
System.out.println("phase#1 "+ host );
|
||||||
|
String [] uris=host.split(":");
|
||||||
|
System.out.println("phase#2 "+uris[0]);
|
||||||
|
host=uris[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ScopeProvider.instance.set(currentScope);
|
||||||
|
}
|
||||||
|
}catch(Exception e){
|
||||||
|
logger.error("FAIL to retrieve resource from scope "+scope+" cause: "+e.getMessage());
|
||||||
|
ScopeProvider.instance.set(currentScope);
|
||||||
|
}
|
||||||
|
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,10 @@
|
||||||
package org.gcube.contentmanager.storageserver.startup;
|
package org.gcube.contentmanager.storageserver.startup;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.List;
|
||||||
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.contentmanager.storageserver.data.CubbyHole;
|
import org.gcube.contentmanager.storageserver.data.CubbyHole;
|
||||||
import org.gcube.contentmanager.storageserver.data.ReadingMongoOplog;
|
import org.gcube.contentmanager.storageserver.data.ReadingMongoOplog;
|
||||||
import org.gcube.contentmanager.storageserver.parse.JsonParser;
|
import org.gcube.contentmanager.storageserver.parse.JsonParser;
|
||||||
//ClaSSPATH
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLClassLoader;
|
|
||||||
|
|
||||||
public class Startup {
|
public class Startup {
|
||||||
|
|
||||||
|
@ -43,7 +38,9 @@ public class Startup {
|
||||||
user=args[2];
|
user=args[2];
|
||||||
password=args[3];
|
password=args[3];
|
||||||
}
|
}
|
||||||
String[] server=retrieveConfiguration();
|
Configuration cfg=new Configuration(scope, user, password);
|
||||||
|
String[] server=retrieveServerConfiguration(cfg);
|
||||||
|
List<String> dtsHosts=retrieveDTSConfiguration(cfg);
|
||||||
CubbyHole c = new CubbyHole();
|
CubbyHole c = new CubbyHole();
|
||||||
ReadingMongoOplog producer=null;
|
ReadingMongoOplog producer=null;
|
||||||
if(args.length == 3)
|
if(args.length == 3)
|
||||||
|
@ -52,16 +49,21 @@ public class Startup {
|
||||||
producer=new ReadingMongoOplog( Arrays.asList(server), c, 1 );
|
producer=new ReadingMongoOplog( Arrays.asList(server), c, 1 );
|
||||||
JsonParser consumer=null;
|
JsonParser consumer=null;
|
||||||
if(args.length == 3)
|
if(args.length == 3)
|
||||||
consumer=new JsonParser(server, c, 1);
|
consumer=new JsonParser(server, c, 1, dtsHosts);
|
||||||
else
|
else
|
||||||
consumer=new JsonParser(server, args[1], args[2], c, 1);
|
consumer=new JsonParser(server, args[1], args[2], c, 1, dtsHosts);
|
||||||
producer.start();
|
producer.start();
|
||||||
consumer.start();
|
consumer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String[] retrieveConfiguration() {
|
private static String[] retrieveServerConfiguration(Configuration c) {
|
||||||
Configuration c=new Configuration(scope, user, password);
|
|
||||||
return c.getServerAccess();
|
return c.getServerAccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<String> retrieveDTSConfiguration(Configuration c){
|
||||||
|
return c.retrieveDTSHosts();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,21 +1,37 @@
|
||||||
package org.gcube.contentmanager.storageserver.startup;
|
package org.gcube.contentmanager.storageserver.startup;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import java.util.List;
|
||||||
import junit.framework.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class ConfigurationTest {
|
public class ConfigurationTest {
|
||||||
|
|
||||||
String scope="/gcube/devsec";
|
static String scope="/d4science.research-infrastructures.eu/FARM";
|
||||||
String user=null;
|
static String user=null;
|
||||||
String password=null;
|
static String password=null;
|
||||||
|
static String serviceClass="DataTransformation";
|
||||||
|
static String serviceName="DataTransformationService";
|
||||||
|
static Configuration c=null;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init(){
|
||||||
|
c=new Configuration(scope, user, password);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() {
|
public void serverAccess() {
|
||||||
Configuration c=new Configuration(scope, user, password);
|
|
||||||
Assert.assertNotNull(c.getServerAccess());
|
Assert.assertNotNull(c.getServerAccess());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getDTSHostsTest(){
|
||||||
|
List<String> hosts=c.retrieveDTSHosts();
|
||||||
|
for (String host:hosts){
|
||||||
|
System.out.println("host: "+host);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue