package org.gcube.data.access.storagehub.handlers.items; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.inject.Inject; import javax.inject.Singleton; import javax.jcr.Node; import javax.jcr.PathNotFoundException; import javax.jcr.RepositoryException; import javax.jcr.Session; import javax.jcr.lock.LockException; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveStreamFactory; import org.apache.tika.config.TikaConfig; import org.apache.tika.detect.Detector; import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; import org.gcube.common.authorization.library.AuthorizedTasks; import org.gcube.common.storagehub.model.NodeConstants; import org.gcube.common.storagehub.model.exceptions.BackendGenericError; import org.gcube.common.storagehub.model.exceptions.IdNotFoundException; import org.gcube.common.storagehub.model.exceptions.InvalidCallParameters; import org.gcube.common.storagehub.model.exceptions.InvalidItemException; import org.gcube.common.storagehub.model.exceptions.ItemLockedException; import org.gcube.common.storagehub.model.exceptions.UserNotAuthorizedException; import org.gcube.common.storagehub.model.items.AbstractFileItem; import org.gcube.common.storagehub.model.items.FolderItem; import org.gcube.common.storagehub.model.storages.MetaInfo; import org.gcube.common.storagehub.model.types.ItemAction; import org.gcube.common.storagehub.model.types.NodeProperty; import org.gcube.data.access.storagehub.AuthorizationChecker; import org.gcube.data.access.storagehub.MultipleOutputStream; import org.gcube.data.access.storagehub.Utils; import org.gcube.data.access.storagehub.accounting.AccountingHandler; import org.gcube.data.access.storagehub.handlers.StorageBackendHandler; import org.gcube.data.access.storagehub.handlers.VersionHandler; import org.gcube.data.access.storagehub.handlers.content.ContentHandler; import org.gcube.data.access.storagehub.handlers.content.ContentHandlerFactory; import org.gcube.data.access.storagehub.handlers.items.builders.ArchiveStructureCreationParameter; import org.gcube.data.access.storagehub.handlers.items.builders.CreateParameters; import org.gcube.data.access.storagehub.handlers.items.builders.FileCreationParameters; import org.gcube.data.access.storagehub.handlers.items.builders.FolderCreationParameters; import org.gcube.data.access.storagehub.handlers.items.builders.GCubeItemCreationParameters; import org.gcube.data.access.storagehub.handlers.items.builders.URLCreationParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton public class ItemHandler { @Inject AuthorizationChecker authChecker; @Inject AccountingHandler accountingHandler; @Inject ContentHandlerFactory contenthandlerFactory; @Inject VersionHandler versionHandler; @Inject StorageBackendHandler storageBackend; private static ExecutorService executor = Executors.newFixedThreadPool(100); @Inject Node2ItemConverter node2Item; @Inject Item2NodeConverter item2Node; private static Logger log = LoggerFactory.getLogger(ItemHandler.class); public String create(T parameters) throws Exception{ Session ses = parameters.getSession(); Node destination; try { destination = ses.getNodeByIdentifier(parameters.getParentId()); }catch(RepositoryException inf) { throw new IdNotFoundException(parameters.getParentId()); } if (!node2Item.checkNodeType(destination, FolderItem.class)) throw new InvalidItemException("the destination item is not a folder"); authChecker.checkWriteAuthorizationControl(ses, destination.getIdentifier(), true); try { Node newNode = null; switch (parameters.getMangedType()) { case FILE: newNode = create((FileCreationParameters)parameters, destination); break; case FOLDER: newNode = create((FolderCreationParameters)parameters, destination); break; case ARCHIVE: newNode = create((ArchiveStructureCreationParameter)parameters, destination); break; case URL: newNode = create((URLCreationParameters) parameters, destination); break; case GCUBEITEM: newNode = create((GCubeItemCreationParameters) parameters, destination); break; default: throw new InvalidCallParameters("Item not supported"); } log.debug("item with id {} correctly created",newNode.getIdentifier()); return newNode.getIdentifier(); } finally { if (parameters.getSession().getWorkspace().getLockManager().isLocked(destination.getPath())) parameters.getSession().getWorkspace().getLockManager().unlock(destination.getPath()); } } private Node create(FolderCreationParameters params, Node destination) throws Exception{ Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); Node newNode = Utils.createFolderInternally(params.getSession(), destination, params.getName(), params.getDescription(), params.isHidden(), params.getUser(), accountingHandler); params.getSession().save(); return newNode; } private Node create(FileCreationParameters params, Node destination) throws Exception{ Node newNode = createFileItemInternally(params.getSession(), destination, params.getStream(), params.getName(), params.getDescription(), params.getUser(), true); params.getSession().save(); versionHandler.checkinContentNode(newNode, params.getSession()); log.info("file with id {} correctly created",newNode.getIdentifier()); return newNode; } private Node create(URLCreationParameters params, Node destination) throws Exception{ Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); Node newNode = Utils.createURLInternally(params.getSession(), destination, params.getName(), params.getUrl(), params.getDescription(), params.getUser(), accountingHandler); params.getSession().save(); return newNode; } private Node create(ArchiveStructureCreationParameter params, Node destination) throws Exception{ Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); Node parentDirectoryNode = Utils.createFolderInternally(params.getSession(), destination, params.getParentFolderName(), "", false, params.getUser(), accountingHandler); try { if (params.getSession().getWorkspace().getLockManager().isLocked(destination.getPath())) params.getSession().getWorkspace().getLockManager().unlock(destination.getPath()); } catch (Throwable t){ log.warn("error unlocking {}", destination.getPath(), t); } Set fileNodes = new HashSet<>(); HashMap directoryNodeMap = new HashMap<>(); try (ArchiveInputStream input = new ArchiveStreamFactory() .createArchiveInputStream(new BufferedInputStream(params.getStream(), 1024*64))){ ArchiveEntry entry; while ((entry = input.getNextEntry()) != null) { if (entry.isDirectory()) { String entirePath = entry.getName(); String name = entirePath.replaceAll("(.*/)*(.*)/", "$2"); String parentPath = entirePath.replaceAll("(.*/)*(.*)/", "$1"); log.debug("creating directory with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath); Node createdNode; if (parentPath.isEmpty()) { createdNode = Utils.createFolderInternally(params.getSession(), parentDirectoryNode, name, "", false, params.getUser(), accountingHandler); }else { Node parentNode = directoryNodeMap.get(parentPath); createdNode = Utils.createFolderInternally(params.getSession(), parentNode, name, "", false, params.getUser(), accountingHandler); } directoryNodeMap.put(entirePath, createdNode); continue; } else { try { String entirePath = entry.getName(); String name = entirePath.replaceAll("(.*/)*(.*)", "$2"); String parentPath = entirePath.replaceAll("(.*/)*(.*)", "$1"); log.debug("creating file with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath); Node fileNode = null; if (parentPath.isEmpty()) fileNode = createFileItemInternally(params.getSession(), parentDirectoryNode, input, name, "", params.getUser(), false); else { Node parentNode = directoryNodeMap.get(parentPath); fileNode = createFileItemInternally(params.getSession(), parentNode, input, name, "", params.getUser(), false); } fileNodes.add(fileNode); }catch(Exception e) { log.warn("error getting file {}",entry.getName(),e); } } } } params.getSession().save(); for (Node node : fileNodes) versionHandler.checkinContentNode(node, params.getSession()); return parentDirectoryNode; } private Node create(GCubeItemCreationParameters params, Node destination) throws Exception{ Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10); Node newNode = Utils.createGcubeItemInternally(params.getSession(), destination, params.getItem().getName(), params.getItem().getDescription(), params.getUser(), params.getItem(), accountingHandler); params.getSession().save(); return newNode; } private Node createFileItemInternally(Session ses, Node destinationNode, InputStream stream, String name, String description, String login, boolean withLock) throws RepositoryException, UserNotAuthorizedException, ItemLockedException, BackendGenericError{ Node newNode; try { newNode = ses.getNode(org.gcube.common.storagehub.model.Paths.append(org.gcube.common.storagehub.model.Paths.getPath(destinationNode.getPath()), name).toPath()); authChecker.checkWriteAuthorizationControl(ses, newNode.getIdentifier(), false); AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login); item.setHidden(destinationNode.getProperty(NodeProperty.HIDDEN.toString()).getBoolean()); if (withLock) { try { ses.getWorkspace().getLockManager().lock(newNode.getPath(), true, true, 0,login); }catch (LockException le) { throw new ItemLockedException(le); } } try { versionHandler.checkoutContentNode(newNode, ses); log.trace("replacing content of class {}",item.getContent().getClass()); item2Node.replaceContent(newNode,item, ItemAction.UPDATED); accountingHandler.createFileUpdated(item.getTitle(), ses, newNode, false); ses.save(); }finally { if (withLock) ses.getWorkspace().getLockManager().unlock(newNode.getPath()); } }catch(PathNotFoundException pnf) { authChecker.checkWriteAuthorizationControl(ses, destinationNode.getIdentifier(), true); AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login); if (withLock) { try { log.debug("trying to acquire lock"); Utils.acquireLockWithWait(ses, destinationNode.getPath(), false, login, 10); }catch (LockException le) { throw new ItemLockedException(le); } } try { newNode = item2Node.getNode(destinationNode, item); accountingHandler.createEntryCreate(item.getTitle(), ses, newNode, false); ses.save(); }finally { if (withLock) ses.getWorkspace().getLockManager().unlock(destinationNode.getPath()); } versionHandler.makeVersionableContent(newNode, ses); accountingHandler.createFolderAddObj(name, item.getClass().getSimpleName(), item.getContent().getMimeType(), ses, destinationNode, false); } return newNode; } private AbstractFileItem fillItemWithContent(InputStream stream, String name, String description, String path, String login) throws BackendGenericError{ ContentHandler handler = getContentHandler(stream , name, path, login); AbstractFileItem item =handler.buildItem(name, description, login); return item ; } private ContentHandler getContentHandler(InputStream stream , String name, String path, String login) throws BackendGenericError { final MultipleOutputStream mos; try{ mos = new MultipleOutputStream(stream, 2); }catch (IOException e) { throw new BackendGenericError(e); } Callable mimeTypeDector = new Callable() { @Override public ContentHandler call() throws Exception { ContentHandler handler =null; long start = System.currentTimeMillis(); log.debug("TIMING: reading the mimetype - start"); try(InputStream is1 = new BufferedInputStream(mos.get(), 1024*64)){ org.apache.tika.mime.MediaType mediaType = null; TikaConfig config = TikaConfig.getDefaultConfig(); Detector detector = config.getDetector(); TikaInputStream stream = TikaInputStream.get(is1); Metadata metadata = new Metadata(); metadata.add(Metadata.RESOURCE_NAME_KEY, name); mediaType = detector.detect(stream, metadata); String mimeType = mediaType.getBaseType().toString(); handler = contenthandlerFactory.create(mimeType); is1.reset(); handler.initiliseSpecificContent(is1, name, mimeType); log.trace("TIMING: reading the mimetype - finished in {}",System.currentTimeMillis()-start); } catch (Throwable e) { log.error("error retrieving mimeType",e); throw new RuntimeException(e); } return handler; } }; Callable uploader = new Callable() { @Override public MetaInfo call() throws Exception { try(InputStream is1 = mos.get()){ String uid = UUID.randomUUID().toString(); String remotePath= String.format("%s/%s-%s",path,uid,name); MetaInfo info = storageBackend.upload(is1, remotePath); return info; }catch (Throwable e) { log.error("error writing content",e ); throw e; } } }; Future detectorF = executor.submit(AuthorizedTasks.bind(mimeTypeDector)); Future uploaderF = executor.submit(AuthorizedTasks.bind(uploader)); long start = System.currentTimeMillis(); log.debug("TIMING: writing the stream - start"); try { mos.startWriting(); log.debug("TIMING: writing the stream - finished in {}",System.currentTimeMillis()-start); ContentHandler handler = detectorF.get(); MetaInfo info = uploaderF.get(); handler.getContent().setData(NodeConstants.CONTENT_NAME); handler.getContent().setStorageId(info.getStorageId()); handler.getContent().setSize(info.getSize()); handler.getContent().setRemotePath(info.getRemotePath()); return handler; }catch (Exception e) { throw new BackendGenericError(e); } } }