git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/data-transfer/data-transfer-library@131051 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
7f8b2880fd
commit
60b717d1f6
|
@ -16,6 +16,7 @@ import org.gcube.data.transfer.library.faults.SourceNotSetException;
|
|||
import org.gcube.data.transfer.library.faults.UnreachableNodeException;
|
||||
import org.gcube.data.transfer.library.transferers.Transferer;
|
||||
import org.gcube.data.transfer.library.transferers.TransfererBuilder;
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
|
||||
@Slf4j
|
||||
public class DataTransferClient {
|
||||
|
@ -36,6 +37,11 @@ public class DataTransferClient {
|
|||
return new DataTransferClient(TransfererBuilder.getTransfererByhostingNodeId(id));
|
||||
}
|
||||
|
||||
|
||||
public TransferCapabilities getDestinationCapabilities() throws InitializationException{
|
||||
return this.transferer.getDestinationCapabilities();
|
||||
}
|
||||
|
||||
@Synchronized("transferer")
|
||||
public TransferResult localFile(String path) throws InvalidSourceException, SourceNotSetException, FailedTransferException, InitializationException{
|
||||
if(transferer==null) throw new RuntimeException("Transferer not set, please set destination before trying to transfer");
|
||||
|
|
|
@ -11,11 +11,11 @@ public class CapabilitiesCache extends TTLCache<TransferCapabilities> {
|
|||
|
||||
private static CapabilitiesCache instance=null;
|
||||
|
||||
@Synchronized
|
||||
public static CapabilitiesCache getInstance(){
|
||||
if(instance==null)instance=new CapabilitiesCache();
|
||||
return instance;
|
||||
}
|
||||
// @Synchronized
|
||||
// public static CapabilitiesCache getInstance(){
|
||||
// if(instance==null)instance=new CapabilitiesCache();
|
||||
// return instance;
|
||||
// }
|
||||
|
||||
private CapabilitiesCache(){
|
||||
super(5*60*1000l,2*60*1000l,"Capabilities");
|
||||
|
@ -23,7 +23,7 @@ public class CapabilitiesCache extends TTLCache<TransferCapabilities> {
|
|||
|
||||
|
||||
@Override
|
||||
protected TransferCapabilities getNew(String id) {
|
||||
protected TransferCapabilities getNew(String id) throws Exception{
|
||||
log.debug("Getting capabilties for host "+id);
|
||||
return new Client(id).getCapabilties();
|
||||
}
|
||||
|
|
|
@ -5,11 +5,9 @@ import java.util.Map.Entry;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Synchronized;
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -21,26 +19,32 @@ public abstract class TTLCache<T> {
|
|||
|
||||
// STATIC
|
||||
|
||||
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
// private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,new ThreadFactory(){
|
||||
// public Thread newThread(Runnable r) {
|
||||
// Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
// t.setDaemon(true);
|
||||
// return t;
|
||||
// }
|
||||
// });
|
||||
private static final HashSet<TTLCache<?>> createdMaps=new HashSet<TTLCache<?>>();
|
||||
|
||||
|
||||
static {
|
||||
scheduler.scheduleAtFixedRate(new Runnable() {
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
log.debug("Running Maps Cleaner, maps count : "+createdMaps.size());
|
||||
int removed=0;
|
||||
for(TTLCache<?> theMap:createdMaps)
|
||||
theMap.purgeItems();
|
||||
log.debug("Removed "+removed+" old tickets");
|
||||
|
||||
}
|
||||
}, 3, 3, TimeUnit.MINUTES);
|
||||
|
||||
// scheduler.scheduleAtFixedRate(new Runnable() {
|
||||
//
|
||||
//
|
||||
//
|
||||
// @Override
|
||||
// public void run() {
|
||||
// log.debug("Running Maps Cleaner, maps count : "+createdMaps.size());
|
||||
// int removed=0;
|
||||
// for(TTLCache<?> theMap:createdMaps)
|
||||
// theMap.purgeItems();
|
||||
// log.debug("Removed "+removed+" old tickets");
|
||||
//
|
||||
// }
|
||||
// }, 3, 3, TimeUnit.MINUTES);
|
||||
//
|
||||
}
|
||||
|
||||
|
||||
|
@ -65,7 +69,7 @@ public abstract class TTLCache<T> {
|
|||
private String cacheName;
|
||||
|
||||
@Synchronized
|
||||
public T getObject(String id){
|
||||
public T getObject(String id) throws Exception{
|
||||
if(!theMap.contains(id)||System.currentTimeMillis()-theMap.get(id).getCreationTime()>objectTTL)
|
||||
theMap.put(id, new TTLContainer<T>(getNew(id)));
|
||||
return theMap.get(id).getTheObject();
|
||||
|
@ -85,6 +89,6 @@ public abstract class TTLCache<T> {
|
|||
return removed;
|
||||
}
|
||||
|
||||
protected abstract T getNew(String id);
|
||||
protected abstract T getNew(String id) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -4,56 +4,93 @@ import javax.ws.rs.client.ClientBuilder;
|
|||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.bind.JAXBElement;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.gcube.data.transfer.library.faults.CommunicationException;
|
||||
import org.gcube.data.transfer.library.faults.DataTransferException;
|
||||
import org.gcube.data.transfer.library.faults.RemoteServiceException;
|
||||
import org.gcube.data.transfer.library.faults.ServiceNotFoundException;
|
||||
import org.gcube.data.transfer.model.ServiceConstants;
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
import org.gcube.data.transfer.model.TransferRequest;
|
||||
import org.gcube.data.transfer.model.TransferTicket;
|
||||
import org.glassfish.jersey.client.ClientConfig;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class Client {
|
||||
|
||||
private static ClientConfig config=null;
|
||||
|
||||
|
||||
static{
|
||||
log.debug("Creating configuration ..");
|
||||
config=new ClientConfig();
|
||||
config.register(AuthorizationFilter.class);
|
||||
config.register(JAXBElement.class);
|
||||
}
|
||||
|
||||
|
||||
private String endpoint;
|
||||
|
||||
private WebTarget rootTarget;
|
||||
|
||||
public Client(String endpoint){
|
||||
log.debug("Creating client for base "+endpoint);
|
||||
this.endpoint=endpoint+"";
|
||||
rootTarget= ClientBuilder.newClient(config).target(endpoint).path("/data-transfer-service/").path(ServiceConstants.APPLICATION_PATH);
|
||||
|
||||
public Client(String endpoint) throws ServiceNotFoundException{
|
||||
try{
|
||||
log.debug("Creating client for base "+endpoint);
|
||||
this.endpoint=endpoint+"";
|
||||
rootTarget= ClientBuilder.newClient(config).target(endpoint).path("data-transfer-service").path(ServiceConstants.APPLICATION_PATH);
|
||||
// checkResponse(rootTarget.request().get());
|
||||
|
||||
log.debug("Root Taget IS {} ",rootTarget.getUri());
|
||||
}catch(Exception e){
|
||||
throw new ServiceNotFoundException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public String getEndpoint() {
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public TransferCapabilities getCapabilties(){
|
||||
log.debug("Getting capabilities to {} ",endpoint);
|
||||
return rootTarget.path(ServiceConstants.CAPABILTIES_SERVLET_NAME).request(MediaType.APPLICATION_XML_TYPE).get(TransferCapabilities.class);
|
||||
|
||||
public TransferCapabilities getCapabilties() throws CommunicationException{
|
||||
WebTarget capabilitiesTarget=rootTarget.path(ServiceConstants.CAPABILTIES_SERVLET_NAME);
|
||||
log.debug("Getting capabilities from {}, path is {} ",endpoint,capabilitiesTarget.getUri());
|
||||
try{
|
||||
Response resp=capabilitiesTarget.request().accept(MediaType.APPLICATION_XML_TYPE).get();
|
||||
checkResponse(resp);
|
||||
return resp.readEntity(TransferCapabilities.class);
|
||||
}catch(Exception e){
|
||||
throw new CommunicationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public TransferTicket submit(TransferRequest request){
|
||||
|
||||
|
||||
public TransferTicket submit(TransferRequest request) throws RemoteServiceException{
|
||||
log.debug("Sending request {} to {}",request,endpoint);
|
||||
return rootTarget.path(ServiceConstants.REQUESTS_SERVLET_NAME).request(MediaType.APPLICATION_XML_TYPE).post(Entity.entity(request,MediaType.APPLICATION_XML),TransferTicket.class);
|
||||
try{
|
||||
Response resp=rootTarget.path(ServiceConstants.REQUESTS_SERVLET_NAME).request(MediaType.APPLICATION_XML_TYPE).post(Entity.entity(request,MediaType.APPLICATION_XML));
|
||||
checkResponse(resp);
|
||||
return resp.readEntity(TransferTicket.class);
|
||||
}catch(Exception e){
|
||||
throw new RemoteServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public TransferTicket getTransferStatus(String transferId){
|
||||
|
||||
public TransferTicket getTransferStatus(String transferId) throws RemoteServiceException{
|
||||
log.debug("Requesting transfer status [id = {}, endpoint={}]",transferId,endpoint);
|
||||
return rootTarget.path(ServiceConstants.STATUS_SERVLET_NAME).path(transferId).request(MediaType.APPLICATION_XML).get(TransferTicket.class);
|
||||
try{
|
||||
Response resp=rootTarget.path(ServiceConstants.STATUS_SERVLET_NAME).path(transferId).request(MediaType.APPLICATION_XML).get();
|
||||
checkResponse(resp);
|
||||
return resp.readEntity(TransferTicket.class);
|
||||
}catch(Exception e){
|
||||
throw new RemoteServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void checkResponse(Response toCheck) throws Exception{
|
||||
switch(toCheck.getStatusInfo().getFamily()){
|
||||
case SUCCESSFUL : break;
|
||||
default : throw new Exception("Unexpected Response code : "+toCheck.getStatus());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package org.gcube.data.transfer.library.faults;
|
||||
|
||||
public class CommunicationException extends DataTransferException {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public CommunicationException() {
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public CommunicationException(String message) {
|
||||
super(message);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public CommunicationException(Throwable cause) {
|
||||
super(cause);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public CommunicationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public CommunicationException(String message, Throwable cause, boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package org.gcube.data.transfer.library.faults;
|
||||
|
||||
public class RemoteServiceException extends DataTransferException {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = 5320978791087129997L;
|
||||
|
||||
public RemoteServiceException() {
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public RemoteServiceException(String message) {
|
||||
super(message);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public RemoteServiceException(Throwable cause) {
|
||||
super(cause);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public RemoteServiceException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
public RemoteServiceException(String message, Throwable cause, boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
// TODO Auto-generated constructor stub
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,11 @@ public class StorageSource extends Source<String> {
|
|||
|
||||
@Override
|
||||
public boolean validate() throws InvalidSourceException {
|
||||
if(!StorageUtils.checkStorageId(id)) throw new InvalidSourceException("Invalid storage ID "+id);
|
||||
try{
|
||||
if(!StorageUtils.checkStorageId(id)) throw new Exception("Not valid");
|
||||
}catch(Exception e){
|
||||
throw new InvalidSourceException("Invalid storage ID "+id);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,9 +7,11 @@ import lombok.extern.slf4j.Slf4j;
|
|||
|
||||
import org.gcube.data.transfer.library.TransferResult;
|
||||
import org.gcube.data.transfer.library.client.Client;
|
||||
import org.gcube.data.transfer.library.faults.CommunicationException;
|
||||
import org.gcube.data.transfer.library.faults.FailedTransferException;
|
||||
import org.gcube.data.transfer.library.faults.InitializationException;
|
||||
import org.gcube.data.transfer.library.faults.InvalidSourceException;
|
||||
import org.gcube.data.transfer.library.faults.RemoteServiceException;
|
||||
import org.gcube.data.transfer.library.faults.SourceNotSetException;
|
||||
import org.gcube.data.transfer.library.model.LocalSource;
|
||||
import org.gcube.data.transfer.library.model.Source;
|
||||
|
@ -29,9 +31,9 @@ public abstract class Transferer {
|
|||
this.client=client;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
protected Source source=null;
|
||||
protected boolean prepared=false;
|
||||
|
||||
|
@ -75,21 +77,25 @@ public abstract class Transferer {
|
|||
}
|
||||
|
||||
protected TransferResult doTheTransfer(TransferRequest request) throws FailedTransferException{
|
||||
TransferTicket submissionResponse= client.submit(request);
|
||||
boolean continuePolling=true;
|
||||
TransferTicket ticket=null;
|
||||
do{
|
||||
ticket=client.getTransferStatus(submissionResponse.getId());
|
||||
System.out.println("Status : "+ticket);
|
||||
continuePolling=ticket.getStatus().equals(Status.PENDING)||ticket.getStatus().equals(Status.TRANSFERRING)||ticket.getStatus().equals(Status.WAITING);
|
||||
try{
|
||||
Thread.sleep(500);
|
||||
}catch(InterruptedException e){}
|
||||
}while(continuePolling);
|
||||
if(ticket.getStatus().equals(Status.ERROR)) throw new FailedTransferException("Remote Message : "+ticket.getMessage());
|
||||
if(ticket.getStatus().equals(Status.STOPPED)) throw new FailedTransferException("Stopped transfer : "+ticket.getMessage());
|
||||
long elapsedTime=System.currentTimeMillis()-ticket.getSubmissionTime().value.getTimeInMillis();
|
||||
return new TransferResult(source, client.getEndpoint(), elapsedTime, ticket.getTransferredBytes(), ticket.getDestinationFileName());
|
||||
try{
|
||||
TransferTicket submissionResponse= client.submit(request);
|
||||
boolean continuePolling=true;
|
||||
TransferTicket ticket=null;
|
||||
do{
|
||||
ticket=client.getTransferStatus(submissionResponse.getId());
|
||||
System.out.println("Status : "+ticket);
|
||||
continuePolling=ticket.getStatus().equals(Status.PENDING)||ticket.getStatus().equals(Status.TRANSFERRING)||ticket.getStatus().equals(Status.WAITING);
|
||||
try{
|
||||
Thread.sleep(500);
|
||||
}catch(InterruptedException e){}
|
||||
}while(continuePolling);
|
||||
if(ticket.getStatus().equals(Status.ERROR)) throw new FailedTransferException("Remote Message : "+ticket.getMessage());
|
||||
if(ticket.getStatus().equals(Status.STOPPED)) throw new FailedTransferException("Stopped transfer : "+ticket.getMessage());
|
||||
long elapsedTime=System.currentTimeMillis()-ticket.getSubmissionTime().value.getTimeInMillis();
|
||||
return new TransferResult(source, client.getEndpoint(), elapsedTime, ticket.getTransferredBytes(), ticket.getDestinationFileName());
|
||||
}catch(RemoteServiceException e){
|
||||
throw new FailedTransferException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkSource() throws SourceNotSetException, InvalidSourceException{
|
||||
|
@ -104,12 +110,16 @@ public abstract class Transferer {
|
|||
prepared=true;
|
||||
}
|
||||
protected void clean(){
|
||||
|
||||
|
||||
}
|
||||
|
||||
public TransferCapabilities getDestinationCapabilities(){
|
||||
return client.getCapabilties();
|
||||
public TransferCapabilities getDestinationCapabilities() throws InitializationException {
|
||||
try{
|
||||
return client.getCapabilties();
|
||||
}catch(Exception e){
|
||||
throw new InitializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -44,12 +44,12 @@ public class TransfererBuilder {
|
|||
//
|
||||
// if(!Utils.pingURL(finalHost, timeout)) throw new ServiceNotFoundException("No DT Service found @ "+finalHost);
|
||||
// log.debug("Host is ok, getting targetCapabilities");
|
||||
TransferCapabilities cap=CapabilitiesCache.getInstance().getObject(baseUrl);
|
||||
// TransferCapabilities cap=CapabilitiesCache.getInstance().getObject(baseUrl);
|
||||
|
||||
// TODO determine method by capabilities checking
|
||||
|
||||
return new HTTPTransferer(new Client(baseUrl));
|
||||
}catch(MalformedURLException e){
|
||||
}catch(Exception e){
|
||||
throw new ServiceNotFoundException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.io.FileNotFoundException;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.bson.types.ObjectId;
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
||||
import org.gcube.contentmanagement.blobstorage.transport.backend.RemoteBackendException;
|
||||
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
||||
|
@ -29,7 +30,8 @@ public class StorageUtils {
|
|||
}
|
||||
|
||||
public static final boolean checkStorageId(String id){
|
||||
return ObjectId.isValid(id);
|
||||
ScopeProvider.instance.get();
|
||||
return getClient().getHttpUrl().RFile(id)!=null;
|
||||
}
|
||||
|
||||
public static final String getUrlById(String id){
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package org.gcube.data.transfer.library;
|
||||
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
|
||||
public class ErrorReport extends TransferReport {
|
||||
|
||||
String id;
|
||||
|
||||
|
||||
public ErrorReport(String id) {
|
||||
super(null);
|
||||
this.id=id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String print() {
|
||||
return "ERROR : "+id;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
package org.gcube.data.transfer.library;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.data.transfer.library.TransferReport.ReportType;
|
||||
import org.gcube.data.transfer.library.faults.FailedTransferException;
|
||||
import org.gcube.data.transfer.library.faults.InitializationException;
|
||||
import org.gcube.data.transfer.library.faults.InvalidSourceException;
|
||||
import org.gcube.data.transfer.library.faults.SourceNotSetException;
|
||||
|
||||
import ch.qos.logback.core.util.ExecutorServiceUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class MultipleTransferBenchmark {
|
||||
|
||||
|
||||
static List<String> endpoints;
|
||||
static List<String> nodeIds;
|
||||
|
||||
|
||||
static List<String> files;
|
||||
static List<String> uris;
|
||||
static List<String> storageIds;
|
||||
|
||||
static ExecutorService executor=ExecutorServiceUtil.newExecutorService();
|
||||
|
||||
|
||||
static CountDownLatch doneSignal;
|
||||
|
||||
static ArrayList<TransferReport> reports=new ArrayList<>();
|
||||
|
||||
|
||||
static String scope="/gcube/devsec";
|
||||
|
||||
static{
|
||||
endpoints=Arrays.asList(new String[]{
|
||||
"http://node4-d-d4s.d4science.org:80/data-transfer-service/gcube/service",
|
||||
"http://node3-d-d4s.d4science.org:80/data-transfer-service/gcube/service"
|
||||
});
|
||||
doneSignal=new CountDownLatch(endpoints.size());
|
||||
|
||||
|
||||
|
||||
nodeIds=new ArrayList<>();
|
||||
files=new ArrayList<>();
|
||||
files.add("/home/fabio/Documents/Pictures/web_trend_map.png");
|
||||
files.add("/home/fabio/Documents/Pictures/web_trend_map.png");
|
||||
files.add("/home/fabio/Documents/Pictures/web_trend_map.png");
|
||||
files.add("/home/fabio/Documents/Pictures/web_trend_map.png");
|
||||
uris=new ArrayList<>();
|
||||
uris.add("http://goo.gl/r5jFZ9");
|
||||
storageIds=new ArrayList<>();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public static void main(String[] args) throws InitializationException{
|
||||
ScopeProvider.instance.set(scope);
|
||||
Map<String,DataTransferClient> clients=new HashMap<>();
|
||||
for(String endpoint:endpoints)
|
||||
clients.put(endpoint,DataTransferClient.getInstanceByEndpoint(endpoint));
|
||||
for(String id:nodeIds)
|
||||
clients.put(id,DataTransferClient.getInstanceByEndpoint(id));
|
||||
|
||||
|
||||
|
||||
|
||||
HashSet<DataTransferClient> startedTests=new HashSet<>();
|
||||
for(final Entry<String,DataTransferClient> entry:clients.entrySet()){
|
||||
if(!startedTests.contains(entry.getValue())){
|
||||
startedTests.add(entry.getValue());
|
||||
|
||||
executor.execute(new Runnable(){
|
||||
|
||||
final DataTransferClient dt=entry.getValue();
|
||||
final String id=entry.getKey();
|
||||
@Override
|
||||
public void run() {
|
||||
try{
|
||||
TransferReport report=new TransferReport(dt.getDestinationCapabilities());
|
||||
ScopeProvider.instance.set(scope);
|
||||
|
||||
log.debug("Sending files to {} ",dt.getDestinationCapabilities());
|
||||
for(String f:files){
|
||||
try {
|
||||
TransferResult res=dt.localFile(f);
|
||||
report.addReport(ReportType.local,f, res.getTransferedBytes(), res.getElapsedTime());
|
||||
} catch (InvalidSourceException | SourceNotSetException
|
||||
| FailedTransferException
|
||||
| InitializationException e) {
|
||||
log.error("Unable to send file {} to {}, error message : {}",f,dt.getDestinationCapabilities().getHostName(),e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("Sending uris to {}",dt.getDestinationCapabilities());
|
||||
for(String f:uris){
|
||||
try {
|
||||
TransferResult res=dt.httpSource(f);
|
||||
report.addReport(ReportType.uri,f, res.getTransferedBytes(), res.getElapsedTime());
|
||||
} catch (InvalidSourceException | SourceNotSetException
|
||||
| FailedTransferException
|
||||
| InitializationException e) {
|
||||
log.error("Unable to send uri {} to {}, error message : {}",f,dt.getDestinationCapabilities().getHostName(),e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("Sending storageIds to {}",dt.getDestinationCapabilities());
|
||||
for(String f:storageIds){
|
||||
try {
|
||||
TransferResult res=dt.storageId(f);
|
||||
report.addReport(ReportType.storage,f, res.getTransferedBytes(), res.getElapsedTime());
|
||||
} catch (InvalidSourceException | SourceNotSetException
|
||||
| FailedTransferException
|
||||
| InitializationException e) {
|
||||
log.error("Unable to send storageId {} to {}, error message : {}",f,dt.getDestinationCapabilities().getHostName(),e.getMessage());
|
||||
}
|
||||
}
|
||||
reports.add(report);
|
||||
}catch(Exception e){
|
||||
reports.add(new ErrorReport(id));
|
||||
}finally{
|
||||
doneSignal.countDown();
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
doneSignal.await();
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
System.out.println("*****************************");
|
||||
for(TransferReport r:reports)System.out.println(r.print());
|
||||
|
||||
ExecutorServiceUtil.shutdown(executor);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package org.gcube.data.transfer.library;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.contentmanagement.blobstorage.transport.backend.RemoteBackendException;
|
||||
import org.gcube.data.transfer.library.utils.StorageUtils;
|
||||
|
||||
public class StorageTest {
|
||||
|
||||
static String scope="/gcube/devsec";
|
||||
|
||||
public static void main(String[] args) throws RemoteBackendException, FileNotFoundException {
|
||||
ScopeProvider.instance.set(scope);
|
||||
String toUpload="/home/fabio/Documents/Personal/DND/Incantesimi 3.5 - Mago e Stregone.pdf";
|
||||
String id=StorageUtils.putOntoStorage(new File(toUpload));
|
||||
System.out.println(StorageUtils.getUrlById(id));
|
||||
}
|
||||
|
||||
}
|
|
@ -4,11 +4,17 @@ import java.net.MalformedURLException;
|
|||
import java.net.URL;
|
||||
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.data.transfer.library.client.AuthorizationFilter;
|
||||
import org.gcube.data.transfer.library.client.Client;
|
||||
import org.gcube.data.transfer.library.faults.CommunicationException;
|
||||
import org.gcube.data.transfer.library.faults.RemoteServiceException;
|
||||
import org.gcube.data.transfer.library.faults.ServiceNotFoundException;
|
||||
import org.gcube.data.transfer.model.ServiceConstants;
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
import org.gcube.data.transfer.model.TransferRequest;
|
||||
import org.gcube.data.transfer.model.TransferTicket;
|
||||
import org.gcube.data.transfer.model.TransferTicket.Status;
|
||||
|
@ -21,26 +27,27 @@ import org.junit.Test;
|
|||
|
||||
public class TestClientCalls {
|
||||
|
||||
static String hostname="http://pc-fabio.isti.cnr.it:8080";
|
||||
static String hostname="http://node3-d-d4s.d4science.org:80";
|
||||
// static String hostname="http://pc-fabio.isti.cnr.it:8080";
|
||||
static String scope="/gcube/devNext";
|
||||
static Client client;
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void init(){
|
||||
public static void init() throws ServiceNotFoundException{
|
||||
ScopeProvider.instance.set(scope);
|
||||
client=new Client(hostname);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void getCapabilties(){
|
||||
public void getCapabilties() throws CommunicationException{
|
||||
System.out.println(client.getCapabilties());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void doTheTransfer() throws MalformedURLException{
|
||||
public void doTheTransfer() throws MalformedURLException, RemoteServiceException{
|
||||
TransferRequest request= new TransferRequest("", new HttpDownloadSettings(new URL("http://goo.gl/oLP7zG"), HttpDownloadOptions.DEFAULT));
|
||||
System.out.println("Submitting "+request);
|
||||
TransferTicket ticket=client.submit(request);
|
||||
|
@ -62,8 +69,11 @@ public class TestClientCalls {
|
|||
@Test
|
||||
public void directCall(){
|
||||
javax.ws.rs.client.Client client = ClientBuilder.newClient(new ClientConfig().register(AuthorizationFilter.class));
|
||||
System.out.println(client.target("http://pc-fabio.isti.cnr.it:8080/data-transfer-service/gcube/service/Capabilities").
|
||||
request(MediaType.APPLICATION_XML).get());
|
||||
WebTarget target=client.target(hostname+"/data-transfer-service"+ServiceConstants.APPLICATION_PATH+"Capabilities");
|
||||
// WebTarget target=client.target(hostname+"/data-transfer-service/gcube/service/Capabilities");
|
||||
System.out.println("Asking capabilities to target : "+target.getUri());
|
||||
System.out.println(target.
|
||||
request(MediaType.APPLICATION_XML).get(TransferCapabilities.class));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
package org.gcube.data.transfer.library;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.gcube.data.transfer.library.model.Source;
|
||||
import org.gcube.data.transfer.model.TransferCapabilities;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
|
||||
public class TransferReport {
|
||||
|
||||
public static enum ReportType{
|
||||
local,uri,storage
|
||||
}
|
||||
|
||||
|
||||
private static final char end='\n';
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@ToString
|
||||
private static class ReportItem{
|
||||
String source;
|
||||
long size;
|
||||
long elapsed;
|
||||
|
||||
|
||||
// bytes/msec
|
||||
public long getAvgSpeed(){
|
||||
return size/elapsed;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected TransferCapabilities host;
|
||||
|
||||
private Map<ReportType,ArrayList<ReportItem>> reports=new HashMap<>();
|
||||
|
||||
|
||||
|
||||
public TransferReport(TransferCapabilities host) {
|
||||
super();
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
|
||||
public void addReport(ReportType sourceType,String theSource,long size,long elapsed){
|
||||
if(!reports.containsKey(sourceType)) reports.put(sourceType, new ArrayList<ReportItem>());
|
||||
reports.get(sourceType).add(new ReportItem(theSource, size, elapsed));
|
||||
}
|
||||
|
||||
|
||||
public String print(){
|
||||
StringBuilder builder=new StringBuilder(" Report for : "+host+end);
|
||||
for(Entry<ReportType,ArrayList<ReportItem>> entry:reports.entrySet()){
|
||||
builder.append("Source type "+entry.getKey()+end);
|
||||
ReportItem maxSizeItem=null;
|
||||
ReportItem maxAvgSpeedItem=null;
|
||||
ReportItem minSizeItem=null;
|
||||
ReportItem minAvgSpeedItem=null;
|
||||
long currentAvgSpeedCounter=0l;
|
||||
|
||||
for(ReportItem item:entry.getValue()){
|
||||
if(maxSizeItem==null||maxSizeItem.getSize()<item.getSize()) maxSizeItem=item;
|
||||
if(maxAvgSpeedItem==null||maxAvgSpeedItem.getAvgSpeed()<item.getAvgSpeed()) maxAvgSpeedItem=item;
|
||||
if(minSizeItem==null||minSizeItem.getSize()>item.getSize()) minSizeItem=item;
|
||||
if(minAvgSpeedItem==null||minAvgSpeedItem.getAvgSpeed()>item.getAvgSpeed()) minAvgSpeedItem=item;
|
||||
currentAvgSpeedCounter+=item.getAvgSpeed();
|
||||
}
|
||||
|
||||
builder.append("Max Size Item : "+maxSizeItem+end);
|
||||
builder.append("Max Avg Speed Item : "+maxAvgSpeedItem+end);
|
||||
builder.append("Min Size Item : "+minSizeItem+end);
|
||||
builder.append("Min Avg Speed Item : "+minAvgSpeedItem+end);
|
||||
builder.append("Total avg speed : "+currentAvgSpeedCounter/entry.getValue().size()+end);
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
};
|
||||
}
|
|
@ -19,22 +19,22 @@ import org.junit.Test;
|
|||
|
||||
public class TransfererTest {
|
||||
|
||||
static String hostname="http://pc-fabio.isti.cnr.it:8080/data-transfer-service/gcube/service";
|
||||
static String hostname="http://node3-d-d4s.d4science.org:80";
|
||||
static String nodeId="462b68c5-463f-4295-86da-37d6c0abc7ea";
|
||||
static String scope="/gcube/devNext";
|
||||
static String scope="/gcube/devsec";
|
||||
|
||||
static DataTransferClient client;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws UnreachableNodeException, ServiceNotFoundException, HostingNodeNotFoundException{
|
||||
ScopeProvider.instance.set(scope);
|
||||
// client=DataTransferClient.getInstanceByEndpoint(hostname);
|
||||
client=DataTransferClient.getInstanceByNodeId(nodeId);
|
||||
client=DataTransferClient.getInstanceByEndpoint(hostname);
|
||||
// client=DataTransferClient.getInstanceByNodeId(nodeId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void localFile() throws InvalidSourceException, SourceNotSetException, FailedTransferException, InitializationException{
|
||||
String localFile="/home/fabio/Downloads/Puntata 3.mp3";
|
||||
String localFile="/home/fabio/Dropbox/Mindless/01- COMA - Mindless.mp3";
|
||||
System.out.println(client.localFile(localFile));
|
||||
}
|
||||
|
||||
|
@ -48,9 +48,11 @@ public class TransfererTest {
|
|||
|
||||
@Test
|
||||
public void storage() throws InvalidSourceException, SourceNotSetException, FailedTransferException, InitializationException, RemoteBackendException, FileNotFoundException{
|
||||
String toUpload="/home/fabio/Downloads/Incantesimi3_5.doc";
|
||||
ScopeProvider.instance.set(scope);
|
||||
String toUpload="/home/fabio/Documents/Personal/DND/Incantesimi 3.5 - Mago e Stregone.pdf";
|
||||
String id=StorageUtils.putOntoStorage(new File(toUpload));
|
||||
System.out.println(client.storageId(id));
|
||||
System.out.println(client.storageId(id));
|
||||
// System.out.println(id);
|
||||
}
|
||||
|
||||
@Test(expected=InvalidSourceException.class)
|
||||
|
|
Loading…
Reference in New Issue