data-transfer-service/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AbstractTicketHandler.java

195 lines
6.4 KiB
Java

package org.gcube.data.transfer.service.transfers.engine.impl;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.gcube.data.transfer.model.PluginInvocation;
import org.gcube.data.transfer.model.TransferTicket;
import org.gcube.data.transfer.model.TransferTicket.Status;
import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod;
import org.gcube.data.transfer.model.settings.FileUploadSettings;
import org.gcube.data.transfer.model.settings.HttpDownloadSettings;
import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider;
import org.gcube.data.transfer.service.transfers.engine.PluginManager;
import org.gcube.data.transfer.service.transfers.engine.faults.ManagedException;
import org.gcube.data.transfer.service.transfers.engine.faults.NotSupportedMethodException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractTicketHandler {
private TransferTicket ticket;
PersistenceProvider persistenceProvider;
PluginManager pluginManager;
public AbstractTicketHandler(PersistenceProvider persProv,PluginManager plugMan, TransferTicket ticket) {
this.persistenceProvider=persProv;
this.pluginManager=plugMan;
this.ticket=ticket;
}
protected void onStep(String msg,double progress,Status status,long transferredBytes){
ticket.setStatus(status);
ticket.setMessage(msg);
ticket.setPercent(progress);
ticket.setTransferredBytes(transferredBytes);
try{
long elapsedTime=System.currentTimeMillis()-ticket.getSubmissionTime().getValue().getTimeInMillis();
long average=(transferredBytes/((elapsedTime==0?1:elapsedTime)))*1000;
ticket.setAverageTransferSpeed(average);
}catch(Exception e){
log.warn("Unable to evaluate average ",e);
}
}
protected void onError(String message){
onStep(message,ticket.getPercent(),Status.ERROR);
}
protected void onStep(String msg,double progress,Status status){
onStep(msg,progress,status,ticket.getTransferredBytes());
}
public TransferTicket getTicket(){
return ticket;
}
public void handle(){
InputStream is=null;
BufferedOutputStream out=null;
Boolean completedTransfer=false;
File destination=null;
try{
if(ticket.getSettings().getOptions().getMethod().equals(TransferMethod.DirectTransfer))
throw new NotSupportedMethodException("Unable to manage request [ID "+ticket.getId()+"]. Method not supported : "+ticket.getSettings().getOptions().getMethod());
log.trace("Request handling started. Ticket is "+ticket);
onStep("Checking destination",0d,Status.TRANSFERRING,0l);
destination =persistenceProvider.prepareDestination(ticket.getDestinationSettings());
ticket.setDestinationFileName(destination.getAbsolutePath());
onStep("Opening input stream",0d,Status.TRANSFERRING,0l);
is=getInputStream();
try{
out=new BufferedOutputStream(new FileOutputStream(destination));
} catch (IOException e) {
log.warn("Unable to create destination file.",e);
throw new ManagedException("Cannot save file in host");
}
transferStream(is, out);
completedTransfer=true;
// IF TRANSFER FAILS, EXCEPTIONS AR THROWN
//Plugin execution
if(ticket.getPluginInvocations()!=null){
for(PluginInvocation invocation:ticket.getPluginInvocations()){
log.debug("Execution {}",invocation);
if(invocation.getParameters().containsValue(PluginInvocation.DESTINATION_FILE_PATH)){
log.debug("Checking for param value : "+PluginInvocation.DESTINATION_FILE_PATH);
for(Entry<String,String> param:invocation.getParameters().entrySet())
if(param.getValue().equals(PluginInvocation.DESTINATION_FILE_PATH)){
log.debug("Setting {} = {} ",param.getKey(),ticket.getDestinationFileName());
param.setValue(ticket.getDestinationFileName());
}
}
onStep("Executing invocation "+invocation.getPluginId(),1d,Status.PLUGIN_EXECUTION);
pluginManager.execute(invocation);
}
}
onStep("Completed transfer",1d,Status.SUCCESS);
}catch(NotSupportedMethodException e){
onError(e.getMessage());
}catch(ManagedException e){
onError(e.getMessage());
}catch(Throwable t){
onError("Unexpected error while downloading : "+t.getMessage());
log.error("Unexpected error occurred",t);
}finally{
log.debug("Finalizing transfer, ticket ID {} ",ticket.getId());
if(out!=null)IOUtils.closeQuietly(out);
if(is!=null)IOUtils.closeQuietly(is);
if((!completedTransfer)&& (destination!=null) && (destination.exists())) {
log.debug("Removing incomplete transfer..");
try{
FileUtils.forceDelete(destination);
}catch(Exception e){
log.warn("Unable to clean {} ",destination);
}
}
}
}
private void transferStream(InputStream in, OutputStream out) throws ManagedException{
long receivedTotal=0l;
try{
byte[] internalBuf=new byte[1024];
int received=0;
while ((received=in.read(internalBuf))!=-1){
out.write(internalBuf,0,received);
receivedTotal+=received;
onStep("Transferring",0d,Status.TRANSFERRING,receivedTotal);
}
out.flush();
}catch(IOException e){
log.debug("Unable to read from source",e);
throw new ManagedException("Unable to read from source.");
}
log.debug("Completed transfer phase for ticket ID {}. Transferred {} bytes. ",ticket.getId(),receivedTotal);
}
private InputStream getInputStream() throws ManagedException{
switch(ticket.getSettings().getOptions().getMethod()){
case HTTPDownload:{
try{
HttpDownloadSettings options=(HttpDownloadSettings) (ticket.getSettings());
return new BufferedInputStream(options.getSource().openStream());
}catch(Exception e){
log.debug("Unable to open connection ",e);
throw new ManagedException("Cannot open connection to source");
}
}
case FileUpload :{
try{
FileUploadSettings options=(FileUploadSettings) (ticket.getSettings());
return new BufferedInputStream(options.getPassedStream());
}catch(Exception e){
log.debug("Unable to open connection ",e);
throw new ManagedException("Cannot open connection to source");
}
}
default:
throw new ManagedException(ticket.getSettings().getOptions().getMethod()+" cannot be managed");
}
}
}