From bfb4fabd69df71f9fb3ef528830b0be6efd7ff24 Mon Sep 17 00:00:00 2001 From: Lucio Lelii Date: Wed, 22 May 2019 14:26:23 +0000 Subject: [PATCH] git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/data-access/sh-fuse-integration/1.0@179516 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 1 - .../access/storagehub/fs/FSInputStream.java | 6 +- .../access/storagehub/fs/FileDownload.java | 3 +- .../data/access/storagehub/fs/FileUpload.java | 3 + .../data/access/storagehub/fs/SHFile.java | 4 +- .../access/storagehub/fs/StorageHubFS.java | 76 +++++++++++++++---- 6 files changed, 72 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index 88b2641..013352d 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,6 @@ ch.qos.logback logback-classic 1.0.13 - test diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/FSInputStream.java b/src/main/java/org/gcube/data/access/storagehub/fs/FSInputStream.java index 05b5ec6..4292199 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/FSInputStream.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/FSInputStream.java @@ -53,7 +53,7 @@ public class FSInputStream extends InputStream{ } @Override - public int available() throws IOException { + public int available() { return q.size(); } @@ -63,6 +63,8 @@ public class FSInputStream extends InputStream{ super.close(); } - + public boolean isClosed() { + return this.closed; + } } diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java b/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java index 0499074..bf66f2a 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/FileDownload.java @@ -31,7 +31,8 @@ public class FileDownload implements SHFile{ logger.trace("FILE-DOWNLOAD initialized with {} , {}", fileItem.getName(), fileItem.getContent().getSize()); } - public int read(Pointer buf, @size_t long size, @off_t long offset) { + + public int read(Pointer buf, long size, long offset) { logger.trace("{} read called with size {} and offset {} ", fileItem.getName(), size, offset); while (this.offset!=offset) { diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/FileUpload.java b/src/main/java/org/gcube/data/access/storagehub/fs/FileUpload.java index a85ffa8..41f5349 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/FileUpload.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/FileUpload.java @@ -63,6 +63,9 @@ public class FileUpload implements SHFile { return 0; } + public boolean uploadFinished() { + return stream.isClosed() && stream.available()==0; + } } diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/SHFile.java b/src/main/java/org/gcube/data/access/storagehub/fs/SHFile.java index 0f60201..d8b5ab4 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/SHFile.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/SHFile.java @@ -1,14 +1,12 @@ package org.gcube.data.access.storagehub.fs; import jnr.ffi.Pointer; -import jnr.ffi.types.off_t; -import jnr.ffi.types.size_t; import ru.serce.jnrfuse.ErrorCodes; import ru.serce.jnrfuse.struct.FileStat; public interface SHFile { - default int read(Pointer buf, @size_t long size, @off_t long offset) { + default int read(Pointer buf, long size, long offset) { return -ErrorCodes.ENOSYS(); } diff --git a/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java b/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java index 9b662b3..1239764 100644 --- a/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java +++ b/src/main/java/org/gcube/data/access/storagehub/fs/StorageHubFS.java @@ -35,6 +35,7 @@ import ru.serce.jnrfuse.FuseFillDir; import ru.serce.jnrfuse.FuseStubFS; import ru.serce.jnrfuse.struct.FileStat; import ru.serce.jnrfuse.struct.FuseFileInfo; +import ru.serce.jnrfuse.struct.Timespec; public class StorageHubFS extends FuseStubFS { @@ -87,10 +88,15 @@ public class StorageHubFS extends FuseStubFS { } @Override - public synchronized int flush(String path, FuseFileInfo fi) { + public int flush(String path, FuseFileInfo fi) { logger.trace("called flush for "+path); - tempFiles.get(path).flush(); - logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null)); + SHFile file = tempFiles.get(path); + file.flush(); + if (!(file instanceof FileUpload)) { + logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null)); + cache.remove(pathUtils.getParentPath(path)); + cache.remove(path); + } return 0; } @@ -110,7 +116,6 @@ public class StorageHubFS extends FuseStubFS { } uploadFile(path); - return 0; } @@ -134,16 +139,18 @@ public class StorageHubFS extends FuseStubFS { try { ((FolderContainer) parentContainer).uploadFile(stream, pathUtils.getLastComponent(path), ""); }catch(Throwable t) { - t.printStackTrace(); tempFiles.get(path).flush(); } + logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null)); + cache.remove(pathUtils.getParentPath(path)); + cache.remove(path); } })).start(); } @Override - public int getattr(String path, FileStat stat) { + public synchronized int getattr(String path, FileStat stat) { ScopeProvider.instance.set(scope); SecurityTokenProvider.instance.set(token); @@ -196,7 +203,7 @@ public class StorageHubFS extends FuseStubFS { if (item.isShared()) { stat.st_mode.set(type | FileStat.S_IROTH); }else { - stat.st_mode.set(type | 0755); + stat.st_mode.set(type | 0777); } stat.st_mtim.tv_sec.set(item.getLastModificationTime().toInstant().getEpochSecond()); stat.st_mtim.tv_nsec.set(item.getLastModificationTime().toInstant().getNano()); @@ -248,10 +255,32 @@ public class StorageHubFS extends FuseStubFS { logger.trace("!!! read called in path {} with size {} and offset {} and pointer address {}",path, size, offset, buf.address()); - SHFile fileDownload; + SHFile fileDownload = null; + + boolean loop =false; + do { + synchronized (tempFiles) { + if (tempFiles.containsKey(path) && tempFiles.get(path) instanceof FileUpload) { + loop = true; + logger.trace("upload not finished yet for {}",path); + } + else { + loop=false; + } + } + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + + } + }while(loop); + + synchronized (tempFiles) { - if (tempFiles.containsKey(path)) { + if (tempFiles.containsKey(path) && tempFiles.get(path) instanceof FileDownload) { + logger.trace("path {} found in tmpFiles"); fileDownload = tempFiles.get(path); } else { ItemContainer item = pathUtils.getPath(path); @@ -272,13 +301,18 @@ public class StorageHubFS extends FuseStubFS { tempFiles.put(path, fileDownload); } } - return fileDownload.read(buf, size, offset); + + int toReturn = fileDownload.read(buf, size, offset); + + logger.trace("!!! read ---- returning {}",toReturn); + + return toReturn; } /* * list dir - * @see ru.serce.jnrfuse.FuseStubFS#write(java.lang.String, jnr.ffi.Pointer, long, long, ru.serce.jnrfuse.struct.FuseFileInfo) + * @see ru.serce.jnrfuse.FuseStubFS#readdir() */ @Override public int readdir(String path, Pointer buf, FuseFillDir filter, @off_t long offset, FuseFileInfo fi) { @@ -477,6 +511,12 @@ public class StorageHubFS extends FuseStubFS { } + @Override + public int readlink(String path, Pointer buf, @size_t long size) { + logger.info("readlink called {}",path); + return 0; + } + @Override public int open(String path, FuseFileInfo fi) { logger.info("open called {} {}",path, fi.fh.getMemory().address()); @@ -500,11 +540,19 @@ public class StorageHubFS extends FuseStubFS { return 0; } - /* + @Override public int access(String path, int mask) { logger.trace("access function called "+path+" "+mask); - return super.access(path, mask); + return 0; } - */ + + @Override + public int utimens(String path, Timespec[] timespec) { + logger.trace("utimens called "+path); + return 0; + } + + + }