diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java b/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java index 51da2f5..ebda8bb 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java @@ -20,42 +20,66 @@ public class FileDownload implements SHFile{ InputStream stream; AbstractFileItem fileItem; - + + Object monitor = new Object(); + + long offset = 0; + public FileDownload(FileContainer fileContainer) throws Exception { stream = fileContainer.download().getStream(); fileItem = fileContainer.get(); logger.trace("FILE-DOWNLOAD initialized with {} , {}", fileItem.getName(), fileItem.getContent().getSize()); } - public synchronized int read(Pointer buf, @size_t long size, @off_t long offset) { + public int read(Pointer buf, @size_t long size, @off_t long offset) { logger.trace("read called with size {} and offset {} ", size, offset); - + + while (this.offset!=offset) { + logger.trace("going in wait ({},{})",this.offset, offset); + synchronized (monitor) { + try { + monitor.wait(); + logger.trace("waking up!!!"); + } catch (InterruptedException e2) { + logger.warn("interrupt exception",e2); + } + } + } + int bytesToRead = (int) (size); byte[] mybuf = new byte[bytesToRead]; - int readTotal= 0;; + int readTotal= 0; try { int read =0; + logger.trace("BEFORE: bytes to read {} and read total {} and last read {}", bytesToRead, readTotal, read); while ((read= stream.read(mybuf, 0 , bytesToRead-readTotal))!=-1 && bytesToRead>readTotal) { - buf.put(0, mybuf, 0, read); + buf.put(readTotal, mybuf, 0, read); readTotal+= read; + logger.trace("INSIDE: bytes to read {} and read total {} and last read {}", bytesToRead, readTotal, read); } + logger.trace("AFTER: bytes to read {} and read total {} and last read {}", bytesToRead, readTotal, read); - logger.trace("bytes to read {} and read total {} and last read {}", bytesToRead, readTotal, read); }catch (Exception e) { logger.error("error in read",e); try { stream.close(); } catch (IOException e1) {} return -ErrorCodes.ENOENT(); + } finally { + this.offset = readTotal+offset; + logger.trace("setting offset to {}",this.offset); + synchronized (monitor) { + monitor.notifyAll(); + } } - + logger.trace("work finished!!! {}", readTotal); return readTotal; } - - - + + + public synchronized int flush() { logger.trace("called flush"); @@ -67,7 +91,7 @@ public class FileDownload implements SHFile{ } return 0; } - + public int getAttr(FileStat stat) { logger.trace("is in download"); @@ -81,5 +105,5 @@ public class FileDownload implements SHFile{ stat.st_atim.tv_nsec.set(fileItem.getLastModificationTime().toInstant().getNano()); return 0; } - + } diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java b/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java index 35e4556..700c7df 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java @@ -53,7 +53,7 @@ public class StorageHubFS extends FuseStubFS { Cache> cache; PathUtils pathUtils; - + private FolderContainer rootDirectory; public StorageHubFS(String token, String scope) { @@ -240,29 +240,31 @@ public class StorageHubFS extends FuseStubFS { SecurityTokenProvider.instance.set(token); logger.trace("!!! read called in path {} with size {} and offset {} ",path, size, offset); - + SHFile fileDownload; - if (tempFiles.containsKey(path)) { - fileDownload = tempFiles.get(path); - } else { - ItemContainer item = pathUtils.getPath(path); - if (item == null) { - return -ErrorCodes.ENOENT(); - } - if (item.getType()!=ContainerType.FILE) { - return -ErrorCodes.EISDIR(); - } + synchronized (tempFiles) { - try { - fileDownload = new FileDownload((FileContainer)item); - } catch (Exception e) { - logger.error("error reading remote file",e); - return -ErrorCodes.ENOENT(); - } + if (tempFiles.containsKey(path)) { + fileDownload = tempFiles.get(path); + } else { + ItemContainer item = pathUtils.getPath(path); + if (item == null) { + return -ErrorCodes.ENOENT(); + } + if (item.getType()!=ContainerType.FILE) { + return -ErrorCodes.EISDIR(); + } - tempFiles.put(path, fileDownload); + try { + fileDownload = new FileDownload((FileContainer)item); + } catch (Exception e) { + logger.error("error reading remote file",e); + return -ErrorCodes.ENOENT(); + } + + tempFiles.put(path, fileDownload); + } } - return fileDownload.read(buf, size, offset); } @@ -330,7 +332,7 @@ public class StorageHubFS extends FuseStubFS { } logger.trace("tempFiles.entrySet() is empty ? {}",(tempFiles.entrySet().isEmpty())); - + for(Entry entry: tempFiles.entrySet()) { logger.trace("entry in temp map {}", entry.getKey()); if (entry.getValue() instanceof FileUpload || pathUtils.getParentPath(entry.getKey()).equals(path)) { @@ -338,7 +340,7 @@ public class StorageHubFS extends FuseStubFS { logger.trace("last temp entry added {}", entry.getKey()); } } - + if (path.equals("/")) filter.apply(buf, VREFOLDERS_NAME , null, 0); return 0; @@ -403,10 +405,10 @@ public class StorageHubFS extends FuseStubFS { public int rmdir(String path) { if (path.equals("/"+VREFOLDERS_NAME)) return -ErrorCodes.EACCES(); - + ScopeProvider.instance.set(scope); SecurityTokenProvider.instance.set(token); - + ItemContainer folder = pathUtils.getPath(path); if (folder == null) { return -ErrorCodes.ENOENT(); @@ -418,10 +420,10 @@ public class StorageHubFS extends FuseStubFS { SecurityTokenProvider.instance.set(token); try { checkSpecialFolderRemove(path); - + if (folder.get() instanceof SharedFolder && ((SharedFolder) folder.get()).isVreFolder()) return -ErrorCodes.EACCES(); - + folder.delete(); cache.remove(path); }catch(UserNotAuthorizedException una) { @@ -435,8 +437,8 @@ public class StorageHubFS extends FuseStubFS { public void checkSpecialFolderRemove(String path) throws UserNotAuthorizedException{ if (path.equals(String.format("/%s", VREFOLDERS_NAME))) throw new UserNotAuthorizedException(VREFOLDERS_NAME+" cannot be deleted"); } - - + + /* * delete file * @see ru.serce.jnrfuse.FuseStubFS#write(java.lang.String, jnr.ffi.Pointer, long, long, ru.serce.jnrfuse.struct.FuseFileInfo)