Fabio Sinibaldi 2018-02-23 09:45:03 +00:00
parent 815f1bf665
commit e2ae6180c8
8 changed files with 323 additions and 14 deletions

View File

@ -34,6 +34,7 @@ import org.gcube.data.transfer.model.options.HttpDownloadOptions;
import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod;
import org.gcube.data.transfer.model.settings.FileUploadSettings;
import org.gcube.data.transfer.model.settings.HttpDownloadSettings;
import org.gcube.data.transfer.service.transfers.engine.AccountingManager;
import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider;
import org.gcube.data.transfer.service.transfers.engine.RequestManager;
import org.gcube.data.transfer.service.transfers.engine.faults.DestinationAccessException;
@ -166,17 +167,29 @@ public class REST {
@QueryParam("descriptor") @DefaultValue("false") Boolean getDescriptor) {
String pathString="<"+destinationID+">/"+subPath;
log.info("Received GET request at {} , descriptor option is {} ",pathString,getDescriptor);
long volume=0l;
boolean success=true;
String path=pathString;
String mimeType="N/A";
try{
if(getDescriptor) return Response.ok(persistence.getDescriptor(destinationID, subPath), MediaType.APPLICATION_JSON_TYPE).build();
File persisted= persistence.getPersistedFile(destinationID, subPath);
if(!persisted.exists()) throw new WebApplicationException("File "+pathString+" doesn't exists.",Status.NOT_FOUND);
if(persisted.isDirectory()) throw new WebApplicationException("The selected path "+pathString+" is a directory.",Status.BAD_REQUEST);
String mt = new MimetypesFileTypeMap().getContentType(persisted);
return Response.ok(persisted, mt).build();
}catch(DestinationAccessException e) {
mimeType= new MimetypesFileTypeMap().getContentType(persisted);
volume=persisted.length();
return Response.ok(persisted, mimeType).build();
}catch(DestinationAccessException e) {
success=false;
throw new WebApplicationException("Unable to access selected path "+pathString,e,Status.INTERNAL_SERVER_ERROR);
}finally {
if(!getDescriptor)
account(true,volume,success,path,mimeType);
}
}
@ -186,11 +199,37 @@ public class REST {
public DeletionReport deleteFile() {
String pathString="<"+destinationID+">/"+subPath;
log.info("Received DELETE request at {}",pathString);
long volume=0l;
boolean success=true;
String path=pathString;
String mimeType="N/A";
try{
File theFile=persistence.getPersistedFile(destinationID, subPath);
volume=theFile.length();
mimeType= new MimetypesFileTypeMap().getContentType(theFile);
return persistence.delete(destinationID, subPath);
}catch(DestinationAccessException e) {
throw new WebApplicationException("Unable to access selected path "+pathString,e,Status.INTERNAL_SERVER_ERROR);
}finally {
account(false,volume,success,path,mimeType);
}
}
private static void account(boolean read,long volume,boolean success,String path,String mimetype) {
AccountingManager manager=AccountingManager.get();
String id=manager.createNewRecord();
if(read) manager.setRead(id);
else manager.setDelete(id);
manager.setSuccessful(id, success);
manager.setVolumne(id, volume);
manager.setMimeType(id, mimetype);
manager.setResourceURI(id, path);
manager.account(id);
}
}

View File

@ -0,0 +1,32 @@
package org.gcube.data.transfer.service.transfers.engine;
import org.gcube.data.transfer.service.transfers.engine.impl.AccountingManagerImpl;
public interface AccountingManager {
public String createNewRecord();
// public StorageUsageRecord getById(String id);
public void account(String toAccountRecordId);
public void setSuccessful(String id,boolean succesfull);
public void setRead(String id);
public void setCreate(String id);
public void setDelete(String id);
public void setUpdate(String id);
public void setResourceURI(String id,String uri);
public void setVolumne(String id, long volume);
public void setMimeType(String id,String mimeType);
// usageRecord.setOperationResult(TEST_OPERATION_RESULT);
// usageRecord.setResourceURI(new URI(TEST_RESOURCE_URI));
// usageRecord.setOperationType(AbstractStorageUsageRecord.OperationType.READ);
// usageRecord.setDataVolume(generateRandomLong(MIN_DATA_VOLUME, MAX_DATA_VOLUME));
// usageRecord.setQualifier("image/png");
public static AccountingManager get() {
return AccountingManagerImpl.get();
}
}

View File

@ -18,6 +18,8 @@ import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.activation.MimetypesFileTypeMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.gcube.data.transfer.model.ExecutionReport;
@ -28,12 +30,15 @@ import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod;
import org.gcube.data.transfer.model.settings.FileUploadSettings;
import org.gcube.data.transfer.model.settings.HttpDownloadSettings;
import org.gcube.data.transfer.plugin.fails.PluginException;
import org.gcube.data.transfer.service.transfers.engine.AccountingManager;
import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider;
import org.gcube.data.transfer.service.transfers.engine.PluginManager;
import org.gcube.data.transfer.service.transfers.engine.faults.ManagedException;
import org.gcube.data.transfer.service.transfers.engine.faults.NotSupportedMethodException;
import org.gcube.data.transfer.service.transfers.engine.faults.PluginNotFoundException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -47,11 +52,29 @@ public abstract class AbstractTicketHandler {
private MessageDigest md;
//Accounting details
@Getter
@Setter
private class AccountingDetails{
private String accountingId;
private String mimeType="N/A";
private String uri="file.file";
private boolean success=true;
private long volume=0l;
private boolean updatedFile=false;
}
private AccountingDetails currentAccountingDetails=new AccountingDetails();
public AbstractTicketHandler(PersistenceProvider persProv,PluginManager plugMan, TransferTicket ticket) {
public AbstractTicketHandler(PersistenceProvider persProv,PluginManager plugMan, TransferTicket ticket,String accountingId) {
this.persistenceProvider=persProv;
this.pluginManager=plugMan;
this.ticket=ticket;
this.currentAccountingDetails.setAccountingId(accountingId);
try {
md = MessageDigest.getInstance("SHA1");
} catch (NoSuchAlgorithmException e) {
@ -74,6 +97,7 @@ public abstract class AbstractTicketHandler {
}
protected void onError(String message){
onStep(message,ticket.getPercent(),Status.ERROR);
currentAccountingDetails.setSuccess(false);
}
protected void onStep(String msg,double progress,Status status){
@ -91,8 +115,6 @@ public abstract class AbstractTicketHandler {
return ticket;
}
public TransferTicket handle(){
InputStream is=null;
@ -138,6 +160,12 @@ public abstract class AbstractTicketHandler {
// IF TRANSFER FAILS, EXCEPTIONS AR THROWN
log.debug("Completed transfer to {} [ SHA1 : {}]. moving to destination {} ",tempFile.getAbsolutePath(),checksum,destination.getAbsolutePath());
if(Files.exists(destination.toPath()))
currentAccountingDetails.setUpdatedFile(true);
currentAccountingDetails.setVolume(tempFile.length());
currentAccountingDetails.setUri(destination.toURI().toString());
currentAccountingDetails.setMimeType(new MimetypesFileTypeMap().getContentType(tempFile));
Files.copy(tempFile.toPath(), destination.toPath(),StandardCopyOption.REPLACE_EXISTING);
Files.deleteIfExists(tempFile.toPath());
log.debug("Moved. Size is [temp : {} , dest : {}] ",tempFile.length(),destination.length());
@ -167,6 +195,9 @@ public abstract class AbstractTicketHandler {
log.info("Completed Transfer for ticket ID {} ",ticket.getId());
onStep("Completed transfer",1d,Status.SUCCESS);
}catch(PluginNotFoundException e){
log.error("Error while serving {} ",ticket,e);
onError("Invalid plugin invocation "+e.getMessage());
@ -183,6 +214,7 @@ public abstract class AbstractTicketHandler {
onError("Unexpected error while downloading : "+t.getMessage());
log.error("Unexpected error occurred",t);
}finally{
account(currentAccountingDetails);
log.debug("Finalizing transfer, ticket ID {} ",ticket.getId());
if(out!=null)IOUtils.closeQuietly(out);
if(is!=null)IOUtils.closeQuietly(is);
@ -233,6 +265,20 @@ public abstract class AbstractTicketHandler {
}
}
private void account(AccountingDetails toAccount) {
AccountingManager manager=AccountingManager.get();
String accountingId=toAccount.getAccountingId();
manager.setMimeType(accountingId, toAccount.getMimeType());
manager.setResourceURI(accountingId, toAccount.getUri());
manager.setSuccessful(accountingId, toAccount.isSuccess());
manager.setVolumne(accountingId, toAccount.getVolume());
if(toAccount.isUpdatedFile()) manager.setUpdate(accountingId);
else manager.setCreate(accountingId);
manager.account(accountingId);
}
private InputStream getInputStream() throws ManagedException{
switch(ticket.getSettings().getOptions().getMethod()){
case HTTPDownload:{

View File

@ -0,0 +1,178 @@
package org.gcube.data.transfer.service.transfers.engine.impl;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import org.gcube.accounting.datamodel.UsageRecord.OperationResult;
import org.gcube.accounting.datamodel.basetypes.AbstractStorageUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractStorageUsageRecord.OperationType;
import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistence;
import org.gcube.accounting.persistence.AccountingPersistenceFactory;
import org.gcube.data.transfer.service.transfers.engine.AccountingManager;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.smartgears.ContextProvider;
import org.gcube.smartgears.configuration.container.ContainerConfiguration;
import org.gcube.smartgears.context.application.ApplicationContext;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AccountingManagerImpl implements AccountingManager {
private static AccountingManagerImpl instance=null;
@Synchronized
public static final AccountingManagerImpl get() {
if(instance==null)instance=new AccountingManagerImpl();
return instance;
}
private ConcurrentHashMap<String,StorageUsageRecord> records=new ConcurrentHashMap<>();
@Override
public String createNewRecord(){
StorageUsageRecord newer=initRecord();
return records.put(newer.getId(),newer).getId();
}
private StorageUsageRecord getById(String recordId) {
return records.get(recordId);
}
@Override
public void account(String toAccountRecordId) {
StorageUsageRecord record=getById(toAccountRecordId);
try{
AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence();
persistence.account(record);
}catch(Exception e) {
log.warn("Unable to account record {}.",record,e);
}
records.remove(toAccountRecordId);
}
@Override
public void setSuccessful(String id, boolean succesfull) {
try {
getById(id).setOperationResult(succesfull?OperationResult.SUCCESS:OperationResult.FAILED);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setRead(String id) {
try {
getById(id).setOperationType(OperationType.READ);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setCreate(String id) {
try {
getById(id).setOperationType(OperationType.CREATE);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setDelete(String id) {
try {
getById(id).setOperationType(OperationType.DELETE);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setUpdate(String id) {
try {
getById(id).setOperationType(OperationType.UPDATE);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setResourceURI(String id, String uri) {
try {
getById(id).setResourceURI(new URI(uri));
} catch (Exception e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setVolumne(String id, long volume) {
try {
getById(id).setDataVolume(volume);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
@Override
public void setMimeType(String id,String mimeType) {
try {
getById(id).setQualifier(mimeType);
} catch (InvalidValueException e) {
log.warn("Unable to update record {}.",getById(id),e);
}
}
private StorageUsageRecord initRecord(){
StorageUsageRecord record=new StorageUsageRecord();
try{
String currentUser=TokenUtils.getCurrentUser();
record.setConsumerId(currentUser);
record.setResourceOwner(currentUser);
ApplicationContext context=ContextProvider.get();
ContainerConfiguration configuration=context.container().configuration();
String hostName=configuration.hostname();
record.setProviderURI(new URI(hostName));
record.setDataType(AbstractStorageUsageRecord.DataType.OTHER);
}catch(Exception e) {
log.warn("Unable to create account record, returning empty one.. ",e);
}
return record;
// usageRecord.setOperationResult(TEST_OPERATION_RESULT);
// usageRecord.setResourceURI(new URI(TEST_RESOURCE_URI));
// usageRecord.setOperationType(AbstractStorageUsageRecord.OperationType.READ);
// usageRecord.setDataVolume(generateRandomLong(MIN_DATA_VOLUME, MAX_DATA_VOLUME));
// usageRecord.setQualifier("image/png");
}
}

View File

@ -12,8 +12,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LocalRequestHandler extends AbstractTicketHandler{
public LocalRequestHandler(PersistenceProvider persProv, PluginManager plugMan, TransferTicket ticket) {
super(persProv, plugMan, ticket);
public LocalRequestHandler(PersistenceProvider persProv, PluginManager plugMan, TransferTicket ticket,String accountingId) {
super(persProv, plugMan, ticket,accountingId);
}
@Override

View File

@ -17,8 +17,8 @@ public class RequestHandler extends AbstractTicketHandler implements Runnable {
}
public RequestHandler(TicketManager ticketManager,TransferTicket ticket,PersistenceProvider persProv, PluginManager plugMan) {
super(persProv, plugMan,ticket);
public RequestHandler(TicketManager ticketManager,TransferTicket ticket,PersistenceProvider persProv, PluginManager plugMan,String accountingId) {
super(persProv, plugMan,ticket,accountingId);
this.ticketManager=ticketManager;
ticketManager.insertUpdate(ticket);
}

View File

@ -11,6 +11,7 @@ import javax.inject.Singleton;
import org.gcube.data.transfer.model.TransferRequest;
import org.gcube.data.transfer.model.TransferTicket;
import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod;
import org.gcube.data.transfer.service.transfers.engine.AccountingManager;
import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider;
import org.gcube.data.transfer.service.transfers.engine.PluginManager;
import org.gcube.data.transfer.service.transfers.engine.RequestManager;
@ -27,7 +28,7 @@ public class RequestManagerImpl implements RequestManager{
TicketManager ticketManager;
PersistenceProvider persistenceProvider;
PluginManager pluginManager;
AccountingManager accounting;
@Inject
@ -36,6 +37,7 @@ public class RequestManagerImpl implements RequestManager{
this.persistenceProvider=persistenceProvider;
this.pluginManager=PluginManager.get();
this.ticketManager=ticketManager;
this.accounting=AccountingManager.get();
}
@ -45,13 +47,18 @@ public class RequestManagerImpl implements RequestManager{
request.setId(UUID.randomUUID().toString());
log.info("Managing request {} ",request);
TransferTicket toReturn=new TransferTicket(request);
String accountingId=accounting.createNewRecord();
if(request.getSettings().getOptions().getMethod().equals(TransferMethod.FileUpload)){
log.debug("Request is sync");
return new LocalRequestHandler(persistenceProvider, pluginManager, toReturn).handle();
return new LocalRequestHandler(persistenceProvider, pluginManager, toReturn,accountingId).handle();
}else{
log.debug("Request is async");
executor.execute(new RequestHandler(ticketManager,new TransferTicket(request),persistenceProvider,pluginManager));
executor.execute(new RequestHandler(ticketManager,new TransferTicket(request),persistenceProvider,pluginManager,accountingId));
return toReturn;
}
}

View File

@ -2,6 +2,7 @@ package org.gcube.data.transfer.service.transfers.engine.impl;
import static org.gcube.common.authorization.client.Constants.authorizationService;
import org.gcube.common.authorization.client.exceptions.ObjectNotFound;
import org.gcube.common.authorization.library.AuthorizationEntry;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
@ -43,4 +44,10 @@ public class TokenUtils {
}
public static String getCurrentUser() throws ObjectNotFound, Exception {
String token=SecurityTokenProvider.instance.get();
return authorizationService().get(token).getClientInfo().getId();
}
}