Fabio Sinibaldi 2017-05-29 15:45:29 +00:00
parent 437cd856de
commit 6772ecf112
5 changed files with 37 additions and 35 deletions

View File

@ -2,9 +2,8 @@ package org.gcube.data.transfer.service.transfers;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Set;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
@ -16,11 +15,11 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.gcube.data.transfer.model.Destination;
import org.gcube.data.transfer.model.DestinationClashPolicy;
import org.gcube.data.transfer.model.PluginInvocation;
import org.gcube.data.transfer.model.ServiceConstants;
import org.gcube.data.transfer.model.TransferRequest;
import org.gcube.data.transfer.model.TransferTicket;
@ -44,43 +43,43 @@ public class REST {
RequestManager requests;
// @Inject
// PersistenceProvider persistenceProvider;
// @Inject
// PluginManager pluginManager;
//
@QueryParam(ServiceConstants.DESTINATION_FILE_NAME) String destinationFileName;
@QueryParam(ServiceConstants.CREATE_DIRS) @DefaultValue("true") Boolean createDirs;
@QueryParam(ServiceConstants.ON_EXISTING_FILE) @DefaultValue("ADD_SUFFIX") DestinationClashPolicy onExistingFile;
@QueryParam(ServiceConstants.ON_EXISTING_DIR) @DefaultValue("APPEND") DestinationClashPolicy onExistingDirectory;
@QueryParam(ServiceConstants.SOURCE_ID) String sourceID;
@FormDataParam(ServiceConstants.MULTIPART_FILE) InputStream uploadedFile;
@FormDataParam(ServiceConstants.MULTIPART_FILE) FormDataContentDisposition uploadedFileDetails;
@FormDataParam("plugin-invocations") Set<PluginInvocation> pluginInvocations;
@POST
@Path("/{method}/{destinationId}/{subPath: .*}")
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
public Object serveFileUpload(@PathParam("method") String methodString,
@PathParam("destinationId") String destinationID, @PathParam("subPath") String subPath){
@PathParam("destinationId") String destinationID, @PathParam("subPath") String subPath ){
try{
TransferRequest request=formRequestFromREST(methodString, destinationID, subPath);
log.info("Received REST Request {} ",request);
log.debug("Plugin invocation set : {} ",pluginInvocations);
TransferRequest request=formRequestFromREST(methodString, destinationID, subPath,pluginInvocations);
log.info("Received REST Request {} ",request);
TransferTicket ticket=requests.put(request);
TransferTicket ticket=requests.put(request);
if(ticket.getSettings().getOptions().getMethod().equals(TransferMethod.FileUpload))
{
log.debug("Resulting sync ticket {} ",ticket);
return ticket;
if(ticket.getSettings().getOptions().getMethod().equals(TransferMethod.FileUpload))
try {
return Response.created(new URI(ticket.getDestinationFileName())).build();
} catch (URISyntaxException e) {
throw new WebApplicationException("Internal ERROR "+e.getMessage(),e);
}
else{
return ticket;
}
else{
return ticket;
}
}catch(WebApplicationException e){
log.error("Unable to serve request",e);
throw e;
@ -89,7 +88,7 @@ public class REST {
private TransferRequest formRequestFromREST(String methodString,String destinationID,String subPath){
private TransferRequest formRequestFromREST(String methodString,String destinationID,String subPath, Set<PluginInvocation> pluginInvocations){
log.info("Creating TransferRequest from REST invocation method : {}, dest ID {}, sub Path {} ",methodString,destinationID,subPath);
TransferMethod method=null;
try{
@ -97,6 +96,9 @@ public class REST {
}catch (Throwable t) {
throw new WebApplicationException("Invalid selected method "+methodString,Status.BAD_REQUEST);}
Destination destination=new Destination();
destination.setCreateSubfolders(createDirs);
destination.setPersistenceId(destinationID);
@ -104,11 +106,11 @@ public class REST {
destination.setOnExistingSubFolder(onExistingDirectory);
TransferRequest resultingRequest=new TransferRequest();
resultingRequest.setDestinationSettings(destination);
resultingRequest.setDestinationSettings(destination);
resultingRequest.setPluginInvocations(pluginInvocations);
switch(method){
case FileUpload : {
// if(destinationFileName==null) throw new WebApplicationException("Parameter "+ServiceConstants.DESTINATION_FILE_NAME+" is mandatory.",Status.BAD_REQUEST);
// if(destinationFileName==null) throw new WebApplicationException("Parameter "+ServiceConstants.DESTINATION_FILE_NAME+" is mandatory.",Status.BAD_REQUEST);
if(uploadedFileDetails==null) throw new WebApplicationException("Missing multipart "+ServiceConstants.MULTIPART_FILE+" details.",Status.BAD_REQUEST);
if(uploadedFile==null) throw new WebApplicationException("Missing multipart "+ServiceConstants.MULTIPART_FILE+" stream.",Status.BAD_REQUEST);
destination.setDestinationFileName(destinationFileName!=null?destinationFileName:uploadedFileDetails.getFileName());

View File

@ -11,6 +11,6 @@ import org.gcube.data.transfer.service.transfers.engine.faults.PluginNotFoundExc
public interface PluginManager {
public Map<String,PluginDescription> getInstalledPlugins();
public ExecutionReport execute(PluginInvocation invocation)throws PluginException, PluginNotFoundException;
public ExecutionReport execute(PluginInvocation invocation,String transferredFile)throws PluginException, PluginNotFoundException;
public void shutdown();
}

View File

@ -81,7 +81,7 @@ public abstract class AbstractTicketHandler {
public void handle(){
public TransferTicket handle(){
InputStream is=null;
BufferedOutputStream out=null;
@ -129,7 +129,7 @@ public abstract class AbstractTicketHandler {
if(ticket.getPluginInvocations()!=null){
for(PluginInvocation invocation:ticket.getPluginInvocations()){
log.debug("Execution {}",invocation);
if(invocation.getParameters().containsValue(PluginInvocation.DESTINATION_FILE_PATH)){
if(invocation.getParameters()!=null && invocation.getParameters().containsValue(PluginInvocation.DESTINATION_FILE_PATH)){
log.debug("Checking for param value : "+PluginInvocation.DESTINATION_FILE_PATH);
for(Entry<String,String> param:invocation.getParameters().entrySet())
if(param.getValue().equals(PluginInvocation.DESTINATION_FILE_PATH)){
@ -140,7 +140,7 @@ public abstract class AbstractTicketHandler {
}
log.debug("Executing invocation {} ",invocation);
onStep("Executing invocation "+invocation.getPluginId(),1d,Status.PLUGIN_EXECUTION);
ExecutionReport report=pluginManager.execute(invocation);
ExecutionReport report=pluginManager.execute(invocation,destination.getAbsolutePath());
log.debug("Adding plugin execution report {} to ticket {} ",report,ticket.getId());
addExecutionReport(report);
}
@ -179,6 +179,7 @@ public abstract class AbstractTicketHandler {
}
}
return getTicket();
}

View File

@ -58,7 +58,7 @@ public class PluginManagerImpl implements PluginManager {
}
@Override
public ExecutionReport execute(PluginInvocation invocation)throws PluginException, PluginNotFoundException {
public ExecutionReport execute(PluginInvocation invocation,String transferredFile)throws PluginException, PluginNotFoundException {
log.debug("Executing invocation {} ",invocation);
if(!getInstalledPlugins().containsKey(invocation.getPluginId())) throw new PluginNotFoundException("Plugin with ID "+invocation.getPluginId()+" is not available.");
@ -67,8 +67,8 @@ public class PluginManagerImpl implements PluginManager {
AbstractPlugin plugin=null;
try{
log.debug("Checking invocation {} ",invocation);
factory.checkInvocation(invocation);
plugin=factory.createWorker(invocation);
PluginInvocation modifiedInvocation=factory.checkInvocation(invocation,transferredFile);
plugin=factory.createWorker(modifiedInvocation);
ExecutionReport report=plugin.execute();
log.debug("Plugin execution report is {} ",report);
switch(report.getFlag()){

View File

@ -48,8 +48,7 @@ public class RequestManagerImpl implements RequestManager{
if(request.getSettings().getOptions().getMethod().equals(TransferMethod.FileUpload)){
log.debug("Request is sync");
new LocalRequestHandler(persistenceProvider, pluginManager, toReturn).handle();
return toReturn;
return new LocalRequestHandler(persistenceProvider, pluginManager, toReturn).handle();
}else{
log.debug("Request is async");
executor.execute(new RequestHandler(ticketManager,new TransferTicket(request),persistenceProvider,pluginManager));