diff --git a/src/main/java/org/gcube/data/access/storagehub/handlers/items/ItemHandler.java b/src/main/java/org/gcube/data/access/storagehub/handlers/items/ItemHandler.java new file mode 100644 index 0000000..d947dc0 --- /dev/null +++ b/src/main/java/org/gcube/data/access/storagehub/handlers/items/ItemHandler.java @@ -0,0 +1,359 @@ +package org.gcube.data.access.storagehub.handlers.items; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.jcr.Node; +import javax.jcr.PathNotFoundException; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.lock.LockException; + +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveStreamFactory; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.detect.Detector; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.gcube.common.authorization.library.AuthorizedTasks; +import org.gcube.common.storagehub.model.NodeConstants; +import org.gcube.common.storagehub.model.exceptions.BackendGenericError; +import org.gcube.common.storagehub.model.exceptions.IdNotFoundException; +import org.gcube.common.storagehub.model.exceptions.InvalidCallParameters; +import org.gcube.common.storagehub.model.exceptions.InvalidItemException; +import org.gcube.common.storagehub.model.exceptions.ItemLockedException; +import org.gcube.common.storagehub.model.exceptions.UserNotAuthorizedException; +import org.gcube.common.storagehub.model.items.AbstractFileItem; +import org.gcube.common.storagehub.model.items.FolderItem; +import org.gcube.common.storagehub.model.storages.MetaInfo; +import org.gcube.common.storagehub.model.types.ItemAction; +import org.gcube.common.storagehub.model.types.NodeProperty; +import org.gcube.data.access.storagehub.AuthorizationChecker; +import org.gcube.data.access.storagehub.MultipleOutputStream; +import org.gcube.data.access.storagehub.Utils; +import org.gcube.data.access.storagehub.accounting.AccountingHandler; +import org.gcube.data.access.storagehub.handlers.StorageBackendHandler; +import org.gcube.data.access.storagehub.handlers.VersionHandler; +import org.gcube.data.access.storagehub.handlers.content.ContentHandler; +import org.gcube.data.access.storagehub.handlers.content.ContentHandlerFactory; +import org.gcube.data.access.storagehub.handlers.items.builders.ArchiveStructureCreationParameter; +import org.gcube.data.access.storagehub.handlers.items.builders.CreateParameters; +import org.gcube.data.access.storagehub.handlers.items.builders.FileCreationParameters; +import org.gcube.data.access.storagehub.handlers.items.builders.FolderCreationParameters; +import org.gcube.data.access.storagehub.handlers.items.builders.GCubeItemCreationParameters; +import org.gcube.data.access.storagehub.handlers.items.builders.URLCreationParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public class ItemHandler { + + @Inject + AuthorizationChecker authChecker; + + @Inject + AccountingHandler accountingHandler; + + @Inject + ContentHandlerFactory contenthandlerFactory; + + @Inject + VersionHandler versionHandler; + + @Inject StorageBackendHandler storageBackend; + + private static ExecutorService executor = Executors.newFixedThreadPool(100); + + @Inject Node2ItemConverter node2Item; + @Inject Item2NodeConverter item2Node; + + private static Logger log = LoggerFactory.getLogger(ItemHandler.class); + + + public String create(T parameters) throws Exception{ + Session ses = parameters.getSession(); + + Node destination; + try { + destination = ses.getNodeByIdentifier(parameters.getParentId()); + }catch(RepositoryException inf) { + throw new IdNotFoundException(parameters.getParentId()); + } + + if (!node2Item.checkNodeType(destination, FolderItem.class)) + throw new InvalidItemException("the destination item is not a folder"); + + authChecker.checkWriteAuthorizationControl(ses, destination.getIdentifier(), true); + + + try { + Node newNode = null; + switch (parameters.getMangedType()) { + case FILE: + newNode = create((FileCreationParameters)parameters, destination); + break; + case FOLDER: + newNode = create((FolderCreationParameters)parameters, destination); + break; + case ARCHIVE: + newNode = create((ArchiveStructureCreationParameter)parameters, destination); + break; + case URL: + newNode = create((URLCreationParameters) parameters, destination); + break; + case GCUBEITEM: + newNode = create((GCubeItemCreationParameters) parameters, destination); + break; + default: + throw new InvalidCallParameters("Item not supported"); + } + log.debug("item with id {} correctly created",newNode.getIdentifier()); + return newNode.getIdentifier(); + } finally { + if (parameters.getSession().getWorkspace().getLockManager().isLocked(destination.getPath())) + parameters.getSession().getWorkspace().getLockManager().unlock(destination.getPath()); + } + + } + + + private Node create(FolderCreationParameters params, Node destination) throws Exception{ + Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); + Node newNode = Utils.createFolderInternally(params.getSession(), destination, params.getName(), params.getDescription(), params.isHidden(), params.getUser(), accountingHandler); + params.getSession().save(); + return newNode; + } + + private Node create(FileCreationParameters params, Node destination) throws Exception{ + Node newNode = createFileItemInternally(params.getSession(), destination, params.getStream(), params.getName(), params.getDescription(), params.getUser(), true); + params.getSession().save(); + + versionHandler.checkinContentNode(newNode, params.getSession()); + log.info("file with id {} correctly created",newNode.getIdentifier()); + return newNode; + } + + private Node create(URLCreationParameters params, Node destination) throws Exception{ + Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); + Node newNode = Utils.createURLInternally(params.getSession(), destination, params.getName(), params.getUrl(), params.getDescription(), params.getUser(), accountingHandler); + params.getSession().save(); + return newNode; + } + + private Node create(ArchiveStructureCreationParameter params, Node destination) throws Exception{ + Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); + Node parentDirectoryNode = Utils.createFolderInternally(params.getSession(), destination, params.getParentFolderName(), "", false, params.getUser(), accountingHandler); + try { + if (params.getSession().getWorkspace().getLockManager().isLocked(destination.getPath())) + params.getSession().getWorkspace().getLockManager().unlock(destination.getPath()); + } catch (Throwable t){ + log.warn("error unlocking {}", destination.getPath(), t); + } + + Set fileNodes = new HashSet<>(); + + + HashMap directoryNodeMap = new HashMap<>(); + + try (ArchiveInputStream input = new ArchiveStreamFactory() + .createArchiveInputStream(new BufferedInputStream(params.getStream(), 1024*64))){ + ArchiveEntry entry; + while ((entry = input.getNextEntry()) != null) { + if (entry.isDirectory()) { + String entirePath = entry.getName(); + String name = entirePath.replaceAll("(.*/)*(.*)/", "$2"); + String parentPath = entirePath.replaceAll("(.*/)*(.*)/", "$1"); + log.debug("creating directory with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath); + Node createdNode; + if (parentPath.isEmpty()) { + createdNode = Utils.createFolderInternally(params.getSession(), parentDirectoryNode, name, "", false, params.getUser(), accountingHandler); + }else { + Node parentNode = directoryNodeMap.get(parentPath); + createdNode = Utils.createFolderInternally(params.getSession(), parentNode, name, "", false, params.getUser(), accountingHandler); + } + directoryNodeMap.put(entirePath, createdNode); + continue; + } else { + try { + String entirePath = entry.getName(); + String name = entirePath.replaceAll("(.*/)*(.*)", "$2"); + String parentPath = entirePath.replaceAll("(.*/)*(.*)", "$1"); + log.debug("creating file with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath); + Node fileNode = null; + if (parentPath.isEmpty()) + fileNode = createFileItemInternally(params.getSession(), parentDirectoryNode, input, name, "", params.getUser(), false); + else { + Node parentNode = directoryNodeMap.get(parentPath); + fileNode = createFileItemInternally(params.getSession(), parentNode, input, name, "", params.getUser(), false); + } + fileNodes.add(fileNode); + }catch(Exception e) { + log.warn("error getting file {}",entry.getName(),e); + } + } + } + + } + + params.getSession().save(); + for (Node node : fileNodes) + versionHandler.checkinContentNode(node, params.getSession()); + return parentDirectoryNode; + } + + + private Node create(GCubeItemCreationParameters params, Node destination) throws Exception{ + Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); + Node newNode = Utils.createGcubeItemInternally(params.getSession(), destination, params.getItem().getName(), params.getItem().getDescription(), params.getUser(), params.getItem(), accountingHandler); + params.getSession().save(); + return newNode; + } + + private Node createFileItemInternally(Session ses, Node destinationNode, InputStream stream, String name, String description, String login, boolean withLock) throws RepositoryException, UserNotAuthorizedException, ItemLockedException, BackendGenericError{ + + Node newNode; + try { + newNode = ses.getNode(org.gcube.common.storagehub.model.Paths.append(org.gcube.common.storagehub.model.Paths.getPath(destinationNode.getPath()), name).toPath()); + authChecker.checkWriteAuthorizationControl(ses, newNode.getIdentifier(), false); + AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login); + item.setHidden(destinationNode.getProperty(NodeProperty.HIDDEN.toString()).getBoolean()); + if (withLock) { + try { + ses.getWorkspace().getLockManager().lock(newNode.getPath(), true, true, 0,login); + }catch (LockException le) { + throw new ItemLockedException(le); + } + } + try { + versionHandler.checkoutContentNode(newNode, ses); + log.trace("replacing content of class {}",item.getContent().getClass()); + item2Node.replaceContent(newNode,item, ItemAction.UPDATED); + accountingHandler.createFileUpdated(item.getTitle(), ses, newNode, false); + ses.save(); + }finally { + if (withLock) ses.getWorkspace().getLockManager().unlock(newNode.getPath()); + } + }catch(PathNotFoundException pnf) { + authChecker.checkWriteAuthorizationControl(ses, destinationNode.getIdentifier(), true); + AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login); + if (withLock) { + try { + log.debug("trying to acquire lock"); + Utils.acquireLockWithWait(ses, destinationNode.getPath(), false, login, 10); + }catch (LockException le) { + throw new ItemLockedException(le); + } + } + try { + newNode = item2Node.getNode(destinationNode, item); + accountingHandler.createEntryCreate(item.getTitle(), ses, newNode, false); + ses.save(); + }finally { + if (withLock) ses.getWorkspace().getLockManager().unlock(destinationNode.getPath()); + } + versionHandler.makeVersionableContent(newNode, ses); + accountingHandler.createFolderAddObj(name, item.getClass().getSimpleName(), item.getContent().getMimeType(), ses, destinationNode, false); + } + + return newNode; + } + + private AbstractFileItem fillItemWithContent(InputStream stream, String name, String description, String path, String login) throws BackendGenericError{ + ContentHandler handler = getContentHandler(stream , name, path, login); + AbstractFileItem item =handler.buildItem(name, description, login); + return item ; + } + + private ContentHandler getContentHandler(InputStream stream , String name, String path, String login) throws BackendGenericError { + + + final MultipleOutputStream mos; + try{ + mos = new MultipleOutputStream(stream, 2); + }catch (IOException e) { + throw new BackendGenericError(e); + } + + Callable mimeTypeDector = new Callable() { + + @Override + public ContentHandler call() throws Exception { + ContentHandler handler =null; + long start = System.currentTimeMillis(); + log.debug("TIMING: reading the mimetype - start"); + try(InputStream is1 = new BufferedInputStream(mos.get(), 1024*64)){ + org.apache.tika.mime.MediaType mediaType = null; + TikaConfig config = TikaConfig.getDefaultConfig(); + Detector detector = config.getDetector(); + TikaInputStream stream = TikaInputStream.get(is1); + Metadata metadata = new Metadata(); + metadata.add(Metadata.RESOURCE_NAME_KEY, name); + mediaType = detector.detect(stream, metadata); + String mimeType = mediaType.getBaseType().toString(); + + handler = contenthandlerFactory.create(mimeType); + + is1.reset(); + handler.initiliseSpecificContent(is1, name, mimeType); + + log.trace("TIMING: reading the mimetype - finished in {}",System.currentTimeMillis()-start); + } catch (Throwable e) { + log.error("error retrieving mimeType",e); + throw new RuntimeException(e); + } + return handler; + } + + }; + + Callable uploader = new Callable() { + + @Override + public MetaInfo call() throws Exception { + try(InputStream is1 = mos.get()){ + String uid = UUID.randomUUID().toString(); + String remotePath= String.format("%s/%s-%s",path,uid,name); + MetaInfo info = storageBackend.upload(is1, remotePath); + return info; + }catch (Throwable e) { + log.error("error writing content",e ); + throw e; + } + + } + }; + + Future detectorF = executor.submit(AuthorizedTasks.bind(mimeTypeDector)); + Future uploaderF = executor.submit(AuthorizedTasks.bind(uploader)); + + long start = System.currentTimeMillis(); + log.debug("TIMING: writing the stream - start"); + try { + mos.startWriting(); + log.debug("TIMING: writing the stream - finished in {}",System.currentTimeMillis()-start); + + ContentHandler handler = detectorF.get(); + MetaInfo info = uploaderF.get(); + handler.getContent().setData(NodeConstants.CONTENT_NAME); + handler.getContent().setStorageId(info.getStorageId()); + handler.getContent().setSize(info.getSize()); + handler.getContent().setRemotePath(info.getRemotePath()); + return handler; + }catch (Exception e) { + throw new BackendGenericError(e); + } + + } +}