Lucio Lelii 2019-05-22 14:26:23 +00:00
parent 0b917990a9
commit bfb4fabd69
6 changed files with 72 additions and 21 deletions

View File

@ -75,7 +75,6 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -53,7 +53,7 @@ public class FSInputStream extends InputStream{
}
@Override
public int available() throws IOException {
public int available() {
return q.size();
}
@ -63,6 +63,8 @@ public class FSInputStream extends InputStream{
super.close();
}
public boolean isClosed() {
return this.closed;
}
}

View File

@ -31,7 +31,8 @@ public class FileDownload implements SHFile{
logger.trace("FILE-DOWNLOAD initialized with {} , {}", fileItem.getName(), fileItem.getContent().getSize());
}
public int read(Pointer buf, @size_t long size, @off_t long offset) {
public int read(Pointer buf, long size, long offset) {
logger.trace("{} read called with size {} and offset {} ", fileItem.getName(), size, offset);
while (this.offset!=offset) {

View File

@ -63,6 +63,9 @@ public class FileUpload implements SHFile {
return 0;
}
public boolean uploadFinished() {
return stream.isClosed() && stream.available()==0;
}
}

View File

@ -1,14 +1,12 @@
package org.gcube.data.access.storagehub.fs;
import jnr.ffi.Pointer;
import jnr.ffi.types.off_t;
import jnr.ffi.types.size_t;
import ru.serce.jnrfuse.ErrorCodes;
import ru.serce.jnrfuse.struct.FileStat;
public interface SHFile {
default int read(Pointer buf, @size_t long size, @off_t long offset) {
default int read(Pointer buf, long size, long offset) {
return -ErrorCodes.ENOSYS();
}

View File

@ -35,6 +35,7 @@ import ru.serce.jnrfuse.FuseFillDir;
import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Timespec;
public class StorageHubFS extends FuseStubFS {
@ -87,10 +88,15 @@ public class StorageHubFS extends FuseStubFS {
}
@Override
public synchronized int flush(String path, FuseFileInfo fi) {
public int flush(String path, FuseFileInfo fi) {
logger.trace("called flush for "+path);
tempFiles.get(path).flush();
logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null));
SHFile file = tempFiles.get(path);
file.flush();
if (!(file instanceof FileUpload)) {
logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null));
cache.remove(pathUtils.getParentPath(path));
cache.remove(path);
}
return 0;
}
@ -110,7 +116,6 @@ public class StorageHubFS extends FuseStubFS {
}
uploadFile(path);
return 0;
}
@ -134,16 +139,18 @@ public class StorageHubFS extends FuseStubFS {
try {
((FolderContainer) parentContainer).uploadFile(stream, pathUtils.getLastComponent(path), "");
}catch(Throwable t) {
t.printStackTrace();
tempFiles.get(path).flush();
}
logger.trace("file have been removed? {}", (tempFiles.remove(path)!=null));
cache.remove(pathUtils.getParentPath(path));
cache.remove(path);
}
})).start();
}
@Override
public int getattr(String path, FileStat stat) {
public synchronized int getattr(String path, FileStat stat) {
ScopeProvider.instance.set(scope);
SecurityTokenProvider.instance.set(token);
@ -196,7 +203,7 @@ public class StorageHubFS extends FuseStubFS {
if (item.isShared()) {
stat.st_mode.set(type | FileStat.S_IROTH);
}else {
stat.st_mode.set(type | 0755);
stat.st_mode.set(type | 0777);
}
stat.st_mtim.tv_sec.set(item.getLastModificationTime().toInstant().getEpochSecond());
stat.st_mtim.tv_nsec.set(item.getLastModificationTime().toInstant().getNano());
@ -248,10 +255,32 @@ public class StorageHubFS extends FuseStubFS {
logger.trace("!!! read called in path {} with size {} and offset {} and pointer address {}",path, size, offset, buf.address());
SHFile fileDownload;
SHFile fileDownload = null;
boolean loop =false;
do {
synchronized (tempFiles) {
if (tempFiles.containsKey(path) && tempFiles.get(path) instanceof FileUpload) {
loop = true;
logger.trace("upload not finished yet for {}",path);
}
else {
loop=false;
}
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}while(loop);
synchronized (tempFiles) {
if (tempFiles.containsKey(path)) {
if (tempFiles.containsKey(path) && tempFiles.get(path) instanceof FileDownload) {
logger.trace("path {} found in tmpFiles");
fileDownload = tempFiles.get(path);
} else {
ItemContainer<? extends Item> item = pathUtils.getPath(path);
@ -272,13 +301,18 @@ public class StorageHubFS extends FuseStubFS {
tempFiles.put(path, fileDownload);
}
}
return fileDownload.read(buf, size, offset);
int toReturn = fileDownload.read(buf, size, offset);
logger.trace("!!! read ---- returning {}",toReturn);
return toReturn;
}
/*
* list dir
* @see ru.serce.jnrfuse.FuseStubFS#write(java.lang.String, jnr.ffi.Pointer, long, long, ru.serce.jnrfuse.struct.FuseFileInfo)
* @see ru.serce.jnrfuse.FuseStubFS#readdir()
*/
@Override
public int readdir(String path, Pointer buf, FuseFillDir filter, @off_t long offset, FuseFileInfo fi) {
@ -477,6 +511,12 @@ public class StorageHubFS extends FuseStubFS {
}
@Override
public int readlink(String path, Pointer buf, @size_t long size) {
logger.info("readlink called {}",path);
return 0;
}
@Override
public int open(String path, FuseFileInfo fi) {
logger.info("open called {} {}",path, fi.fh.getMemory().address());
@ -500,11 +540,19 @@ public class StorageHubFS extends FuseStubFS {
return 0;
}
/*
@Override
public int access(String path, int mask) {
logger.trace("access function called "+path+" "+mask);
return super.access(path, mask);
return 0;
}
*/
@Override
public int utimens(String path, Timespec[] timespec) {
logger.trace("utimens called "+path);
return 0;
}
}