diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/REST.java b/src/main/java/org/gcube/data/transfer/service/transfers/REST.java index 1b601b9..7d7715a 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/REST.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/REST.java @@ -34,6 +34,7 @@ import org.gcube.data.transfer.model.options.HttpDownloadOptions; import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod; import org.gcube.data.transfer.model.settings.FileUploadSettings; import org.gcube.data.transfer.model.settings.HttpDownloadSettings; +import org.gcube.data.transfer.service.transfers.engine.AccountingManager; import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider; import org.gcube.data.transfer.service.transfers.engine.RequestManager; import org.gcube.data.transfer.service.transfers.engine.faults.DestinationAccessException; @@ -166,17 +167,29 @@ public class REST { @QueryParam("descriptor") @DefaultValue("false") Boolean getDescriptor) { String pathString="<"+destinationID+">/"+subPath; log.info("Received GET request at {} , descriptor option is {} ",pathString,getDescriptor); + long volume=0l; + boolean success=true; + String path=pathString; + String mimeType="N/A"; try{ if(getDescriptor) return Response.ok(persistence.getDescriptor(destinationID, subPath), MediaType.APPLICATION_JSON_TYPE).build(); + + File persisted= persistence.getPersistedFile(destinationID, subPath); if(!persisted.exists()) throw new WebApplicationException("File "+pathString+" doesn't exists.",Status.NOT_FOUND); if(persisted.isDirectory()) throw new WebApplicationException("The selected path "+pathString+" is a directory.",Status.BAD_REQUEST); - String mt = new MimetypesFileTypeMap().getContentType(persisted); - return Response.ok(persisted, mt).build(); - }catch(DestinationAccessException e) { + mimeType= new MimetypesFileTypeMap().getContentType(persisted); + volume=persisted.length(); + + return Response.ok(persisted, mimeType).build(); + }catch(DestinationAccessException e) { + success=false; throw new WebApplicationException("Unable to access selected path "+pathString,e,Status.INTERNAL_SERVER_ERROR); + }finally { + if(!getDescriptor) + account(true,volume,success,path,mimeType); } } @@ -186,11 +199,37 @@ public class REST { public DeletionReport deleteFile() { String pathString="<"+destinationID+">/"+subPath; log.info("Received DELETE request at {}",pathString); + + long volume=0l; + boolean success=true; + String path=pathString; + String mimeType="N/A"; + try{ + File theFile=persistence.getPersistedFile(destinationID, subPath); + volume=theFile.length(); + mimeType= new MimetypesFileTypeMap().getContentType(theFile); + + return persistence.delete(destinationID, subPath); }catch(DestinationAccessException e) { throw new WebApplicationException("Unable to access selected path "+pathString,e,Status.INTERNAL_SERVER_ERROR); + }finally { + account(false,volume,success,path,mimeType); } } + + private static void account(boolean read,long volume,boolean success,String path,String mimetype) { + AccountingManager manager=AccountingManager.get(); + String id=manager.createNewRecord(); + if(read) manager.setRead(id); + else manager.setDelete(id); + + manager.setSuccessful(id, success); + manager.setVolumne(id, volume); + manager.setMimeType(id, mimetype); + manager.setResourceURI(id, path); + manager.account(id); + } } diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/AccountingManager.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/AccountingManager.java new file mode 100644 index 0000000..674a59b --- /dev/null +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/AccountingManager.java @@ -0,0 +1,32 @@ +package org.gcube.data.transfer.service.transfers.engine; + +import org.gcube.data.transfer.service.transfers.engine.impl.AccountingManagerImpl; + +public interface AccountingManager { + + public String createNewRecord(); +// public StorageUsageRecord getById(String id); + public void account(String toAccountRecordId); + + + public void setSuccessful(String id,boolean succesfull); + public void setRead(String id); + public void setCreate(String id); + public void setDelete(String id); + public void setUpdate(String id); + public void setResourceURI(String id,String uri); + public void setVolumne(String id, long volume); + public void setMimeType(String id,String mimeType); + + + +// usageRecord.setOperationResult(TEST_OPERATION_RESULT); + // usageRecord.setResourceURI(new URI(TEST_RESOURCE_URI)); + // usageRecord.setOperationType(AbstractStorageUsageRecord.OperationType.READ); + // usageRecord.setDataVolume(generateRandomLong(MIN_DATA_VOLUME, MAX_DATA_VOLUME)); + // usageRecord.setQualifier("image/png"); + + public static AccountingManager get() { + return AccountingManagerImpl.get(); + } +} diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AbstractTicketHandler.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AbstractTicketHandler.java index de19b88..771b5ab 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AbstractTicketHandler.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AbstractTicketHandler.java @@ -18,6 +18,8 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.activation.MimetypesFileTypeMap; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.gcube.data.transfer.model.ExecutionReport; @@ -28,12 +30,15 @@ import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod; import org.gcube.data.transfer.model.settings.FileUploadSettings; import org.gcube.data.transfer.model.settings.HttpDownloadSettings; import org.gcube.data.transfer.plugin.fails.PluginException; +import org.gcube.data.transfer.service.transfers.engine.AccountingManager; import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider; import org.gcube.data.transfer.service.transfers.engine.PluginManager; import org.gcube.data.transfer.service.transfers.engine.faults.ManagedException; import org.gcube.data.transfer.service.transfers.engine.faults.NotSupportedMethodException; import org.gcube.data.transfer.service.transfers.engine.faults.PluginNotFoundException; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -47,11 +52,29 @@ public abstract class AbstractTicketHandler { private MessageDigest md; + + + //Accounting details + @Getter + @Setter + private class AccountingDetails{ + private String accountingId; + private String mimeType="N/A"; + private String uri="file.file"; + private boolean success=true; + private long volume=0l; + private boolean updatedFile=false; + } + + private AccountingDetails currentAccountingDetails=new AccountingDetails(); + + - public AbstractTicketHandler(PersistenceProvider persProv,PluginManager plugMan, TransferTicket ticket) { + public AbstractTicketHandler(PersistenceProvider persProv,PluginManager plugMan, TransferTicket ticket,String accountingId) { this.persistenceProvider=persProv; this.pluginManager=plugMan; this.ticket=ticket; + this.currentAccountingDetails.setAccountingId(accountingId); try { md = MessageDigest.getInstance("SHA1"); } catch (NoSuchAlgorithmException e) { @@ -74,6 +97,7 @@ public abstract class AbstractTicketHandler { } protected void onError(String message){ onStep(message,ticket.getPercent(),Status.ERROR); + currentAccountingDetails.setSuccess(false); } protected void onStep(String msg,double progress,Status status){ @@ -91,8 +115,6 @@ public abstract class AbstractTicketHandler { return ticket; } - - public TransferTicket handle(){ InputStream is=null; @@ -138,6 +160,12 @@ public abstract class AbstractTicketHandler { // IF TRANSFER FAILS, EXCEPTIONS AR THROWN log.debug("Completed transfer to {} [ SHA1 : {}]. moving to destination {} ",tempFile.getAbsolutePath(),checksum,destination.getAbsolutePath()); + if(Files.exists(destination.toPath())) + currentAccountingDetails.setUpdatedFile(true); + currentAccountingDetails.setVolume(tempFile.length()); + currentAccountingDetails.setUri(destination.toURI().toString()); + currentAccountingDetails.setMimeType(new MimetypesFileTypeMap().getContentType(tempFile)); + Files.copy(tempFile.toPath(), destination.toPath(),StandardCopyOption.REPLACE_EXISTING); Files.deleteIfExists(tempFile.toPath()); log.debug("Moved. Size is [temp : {} , dest : {}] ",tempFile.length(),destination.length()); @@ -167,6 +195,9 @@ public abstract class AbstractTicketHandler { log.info("Completed Transfer for ticket ID {} ",ticket.getId()); onStep("Completed transfer",1d,Status.SUCCESS); + + + }catch(PluginNotFoundException e){ log.error("Error while serving {} ",ticket,e); onError("Invalid plugin invocation "+e.getMessage()); @@ -183,6 +214,7 @@ public abstract class AbstractTicketHandler { onError("Unexpected error while downloading : "+t.getMessage()); log.error("Unexpected error occurred",t); }finally{ + account(currentAccountingDetails); log.debug("Finalizing transfer, ticket ID {} ",ticket.getId()); if(out!=null)IOUtils.closeQuietly(out); if(is!=null)IOUtils.closeQuietly(is); @@ -233,6 +265,20 @@ public abstract class AbstractTicketHandler { } } + + private void account(AccountingDetails toAccount) { + AccountingManager manager=AccountingManager.get(); + String accountingId=toAccount.getAccountingId(); + manager.setMimeType(accountingId, toAccount.getMimeType()); + manager.setResourceURI(accountingId, toAccount.getUri()); + manager.setSuccessful(accountingId, toAccount.isSuccess()); + manager.setVolumne(accountingId, toAccount.getVolume()); + if(toAccount.isUpdatedFile()) manager.setUpdate(accountingId); + else manager.setCreate(accountingId); + manager.account(accountingId); + } + + private InputStream getInputStream() throws ManagedException{ switch(ticket.getSettings().getOptions().getMethod()){ case HTTPDownload:{ diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AccountingManagerImpl.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AccountingManagerImpl.java new file mode 100644 index 0000000..f67b8ca --- /dev/null +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/AccountingManagerImpl.java @@ -0,0 +1,178 @@ +package org.gcube.data.transfer.service.transfers.engine.impl; + +import java.net.URI; +import java.util.concurrent.ConcurrentHashMap; + +import org.gcube.accounting.datamodel.UsageRecord.OperationResult; +import org.gcube.accounting.datamodel.basetypes.AbstractStorageUsageRecord; +import org.gcube.accounting.datamodel.basetypes.AbstractStorageUsageRecord.OperationType; +import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; +import org.gcube.accounting.persistence.AccountingPersistence; +import org.gcube.accounting.persistence.AccountingPersistenceFactory; +import org.gcube.data.transfer.service.transfers.engine.AccountingManager; +import org.gcube.documentstore.exception.InvalidValueException; +import org.gcube.smartgears.ContextProvider; +import org.gcube.smartgears.configuration.container.ContainerConfiguration; +import org.gcube.smartgears.context.application.ApplicationContext; + +import lombok.Synchronized; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class AccountingManagerImpl implements AccountingManager { + + private static AccountingManagerImpl instance=null; + + @Synchronized + public static final AccountingManagerImpl get() { + if(instance==null)instance=new AccountingManagerImpl(); + return instance; + } + + private ConcurrentHashMap records=new ConcurrentHashMap<>(); + + + @Override + public String createNewRecord(){ + StorageUsageRecord newer=initRecord(); + return records.put(newer.getId(),newer).getId(); + } + + + private StorageUsageRecord getById(String recordId) { + return records.get(recordId); + } + + @Override + public void account(String toAccountRecordId) { + StorageUsageRecord record=getById(toAccountRecordId); + try{ + AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence(); + persistence.account(record); + }catch(Exception e) { + log.warn("Unable to account record {}.",record,e); + } + records.remove(toAccountRecordId); + } + + + + @Override + public void setSuccessful(String id, boolean succesfull) { + try { + getById(id).setOperationResult(succesfull?OperationResult.SUCCESS:OperationResult.FAILED); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setRead(String id) { + try { + getById(id).setOperationType(OperationType.READ); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setCreate(String id) { + try { + getById(id).setOperationType(OperationType.CREATE); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setDelete(String id) { + try { + getById(id).setOperationType(OperationType.DELETE); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setUpdate(String id) { + try { + getById(id).setOperationType(OperationType.UPDATE); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setResourceURI(String id, String uri) { + try { + getById(id).setResourceURI(new URI(uri)); + } catch (Exception e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setVolumne(String id, long volume) { + try { + getById(id).setDataVolume(volume); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + @Override + public void setMimeType(String id,String mimeType) { + try { + getById(id).setQualifier(mimeType); + } catch (InvalidValueException e) { + log.warn("Unable to update record {}.",getById(id),e); + } + } + + + + + + private StorageUsageRecord initRecord(){ + StorageUsageRecord record=new StorageUsageRecord(); + try{ + String currentUser=TokenUtils.getCurrentUser(); + record.setConsumerId(currentUser); + record.setResourceOwner(currentUser); + + ApplicationContext context=ContextProvider.get(); + ContainerConfiguration configuration=context.container().configuration(); + + String hostName=configuration.hostname(); + + + record.setProviderURI(new URI(hostName)); + record.setDataType(AbstractStorageUsageRecord.DataType.OTHER); + }catch(Exception e) { + log.warn("Unable to create account record, returning empty one.. ",e); + } + + + return record; + + // usageRecord.setOperationResult(TEST_OPERATION_RESULT); + // usageRecord.setResourceURI(new URI(TEST_RESOURCE_URI)); + // usageRecord.setOperationType(AbstractStorageUsageRecord.OperationType.READ); + // usageRecord.setDataVolume(generateRandomLong(MIN_DATA_VOLUME, MAX_DATA_VOLUME)); + // usageRecord.setQualifier("image/png"); + + + + } + + + +} diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/LocalRequestHandler.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/LocalRequestHandler.java index 2fd5222..e46b281 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/LocalRequestHandler.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/LocalRequestHandler.java @@ -12,8 +12,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class LocalRequestHandler extends AbstractTicketHandler{ - public LocalRequestHandler(PersistenceProvider persProv, PluginManager plugMan, TransferTicket ticket) { - super(persProv, plugMan, ticket); + public LocalRequestHandler(PersistenceProvider persProv, PluginManager plugMan, TransferTicket ticket,String accountingId) { + super(persProv, plugMan, ticket,accountingId); } @Override diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestHandler.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestHandler.java index bb89a34..41ed5cb 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestHandler.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestHandler.java @@ -17,8 +17,8 @@ public class RequestHandler extends AbstractTicketHandler implements Runnable { } - public RequestHandler(TicketManager ticketManager,TransferTicket ticket,PersistenceProvider persProv, PluginManager plugMan) { - super(persProv, plugMan,ticket); + public RequestHandler(TicketManager ticketManager,TransferTicket ticket,PersistenceProvider persProv, PluginManager plugMan,String accountingId) { + super(persProv, plugMan,ticket,accountingId); this.ticketManager=ticketManager; ticketManager.insertUpdate(ticket); } diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestManagerImpl.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestManagerImpl.java index 0bf1f35..5e7d9d8 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestManagerImpl.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/RequestManagerImpl.java @@ -11,6 +11,7 @@ import javax.inject.Singleton; import org.gcube.data.transfer.model.TransferRequest; import org.gcube.data.transfer.model.TransferTicket; import org.gcube.data.transfer.model.options.TransferOptions.TransferMethod; +import org.gcube.data.transfer.service.transfers.engine.AccountingManager; import org.gcube.data.transfer.service.transfers.engine.PersistenceProvider; import org.gcube.data.transfer.service.transfers.engine.PluginManager; import org.gcube.data.transfer.service.transfers.engine.RequestManager; @@ -27,7 +28,7 @@ public class RequestManagerImpl implements RequestManager{ TicketManager ticketManager; PersistenceProvider persistenceProvider; PluginManager pluginManager; - + AccountingManager accounting; @Inject @@ -36,6 +37,7 @@ public class RequestManagerImpl implements RequestManager{ this.persistenceProvider=persistenceProvider; this.pluginManager=PluginManager.get(); this.ticketManager=ticketManager; + this.accounting=AccountingManager.get(); } @@ -45,13 +47,18 @@ public class RequestManagerImpl implements RequestManager{ request.setId(UUID.randomUUID().toString()); log.info("Managing request {} ",request); TransferTicket toReturn=new TransferTicket(request); - + + String accountingId=accounting.createNewRecord(); + + + + if(request.getSettings().getOptions().getMethod().equals(TransferMethod.FileUpload)){ log.debug("Request is sync"); - return new LocalRequestHandler(persistenceProvider, pluginManager, toReturn).handle(); + return new LocalRequestHandler(persistenceProvider, pluginManager, toReturn,accountingId).handle(); }else{ log.debug("Request is async"); - executor.execute(new RequestHandler(ticketManager,new TransferTicket(request),persistenceProvider,pluginManager)); + executor.execute(new RequestHandler(ticketManager,new TransferTicket(request),persistenceProvider,pluginManager,accountingId)); return toReturn; } } diff --git a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/TokenUtils.java b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/TokenUtils.java index a7c8f83..ed94dfc 100644 --- a/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/TokenUtils.java +++ b/src/main/java/org/gcube/data/transfer/service/transfers/engine/impl/TokenUtils.java @@ -2,6 +2,7 @@ package org.gcube.data.transfer.service.transfers.engine.impl; import static org.gcube.common.authorization.client.Constants.authorizationService; +import org.gcube.common.authorization.client.exceptions.ObjectNotFound; import org.gcube.common.authorization.library.AuthorizationEntry; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.scope.api.ScopeProvider; @@ -43,4 +44,10 @@ public class TokenUtils { } + + public static String getCurrentUser() throws ObjectNotFound, Exception { + String token=SecurityTokenProvider.instance.get(); + return authorizationService().get(token).getClientInfo().getId(); + } + }