You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
storagehub/src/main/java/org/gcube/data/access/storagehub/handlers/items/ItemHandler.java

400 lines
16 KiB
Java

package org.gcube.data.access.storagehub.handlers.items;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.lock.LockException;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.gcube.common.authorization.library.AuthorizedTasks;
import org.gcube.common.storagehub.model.Excludes;
import org.gcube.common.storagehub.model.NodeConstants;
import org.gcube.common.storagehub.model.Paths;
import org.gcube.common.storagehub.model.exceptions.BackendGenericError;
import org.gcube.common.storagehub.model.exceptions.IdNotFoundException;
import org.gcube.common.storagehub.model.exceptions.InvalidCallParameters;
import org.gcube.common.storagehub.model.exceptions.InvalidItemException;
import org.gcube.common.storagehub.model.exceptions.ItemLockedException;
import org.gcube.common.storagehub.model.exceptions.StorageHubException;
import org.gcube.common.storagehub.model.items.AbstractFileItem;
import org.gcube.common.storagehub.model.items.FolderItem;
import org.gcube.common.storagehub.model.plugins.FolderManager;
import org.gcube.common.storagehub.model.storages.MetaInfo;
import org.gcube.common.storagehub.model.storages.StorageBackend;
import org.gcube.common.storagehub.model.types.ItemAction;
import org.gcube.common.storagehub.model.types.NodeProperty;
import org.gcube.data.access.storagehub.AuthorizationChecker;
import org.gcube.data.access.storagehub.MultipleOutputStream;
import org.gcube.data.access.storagehub.Utils;
import org.gcube.data.access.storagehub.accounting.AccountingHandler;
import org.gcube.data.access.storagehub.handlers.VersionHandler;
import org.gcube.data.access.storagehub.handlers.content.ContentHandler;
import org.gcube.data.access.storagehub.handlers.content.ContentHandlerFactory;
import org.gcube.data.access.storagehub.handlers.items.builders.ArchiveStructureCreationParameter;
import org.gcube.data.access.storagehub.handlers.items.builders.CreateParameters;
import org.gcube.data.access.storagehub.handlers.items.builders.FileCreationParameters;
import org.gcube.data.access.storagehub.handlers.items.builders.FolderCreationParameters;
import org.gcube.data.access.storagehub.handlers.items.builders.GCubeItemCreationParameters;
import org.gcube.data.access.storagehub.handlers.items.builders.URLCreationParameters;
import org.gcube.data.access.storagehub.handlers.plugins.FolderPluginHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class ItemHandler {
@Inject
AccountingHandler accountingHandler;
@Inject
ContentHandlerFactory contenthandlerFactory;
@Inject
AuthorizationChecker authChecker;
@Inject
VersionHandler versionHandler;
@Inject
FolderPluginHandler pluginHandler;
private static ExecutorService executor = Executors.newFixedThreadPool(100);
@Inject Node2ItemConverter node2Item;
@Inject Item2NodeConverter item2Node;
private static Logger log = LoggerFactory.getLogger(ItemHandler.class);
public <T extends CreateParameters> String create(T parameters) throws Exception{
Session ses = parameters.getSession();
Node destination;
try {
destination = ses.getNodeByIdentifier(parameters.getParentId());
}catch(RepositoryException inf) {
throw new IdNotFoundException(parameters.getParentId());
}
if (!node2Item.checkNodeType(destination, FolderItem.class))
throw new InvalidItemException("the destination item is not a folder");
authChecker.checkWriteAuthorizationControl(ses, parameters.getUser(), destination.getIdentifier(), true);
try {
Node newNode = null;
switch (parameters.getMangedType()) {
case FILE:
newNode = create((FileCreationParameters)parameters, destination);
break;
case FOLDER:
newNode = create((FolderCreationParameters)parameters, destination);
break;
case ARCHIVE:
newNode = create((ArchiveStructureCreationParameter)parameters, destination);
break;
case URL:
newNode = create((URLCreationParameters) parameters, destination);
break;
case GCUBEITEM:
newNode = create((GCubeItemCreationParameters) parameters, destination);
break;
default:
throw new InvalidCallParameters("Item not supported");
}
log.debug("item with id {} correctly created",newNode.getIdentifier());
return newNode.getIdentifier();
} finally {
if (parameters.getSession().getWorkspace().getLockManager().isLocked(destination.getPath()))
parameters.getSession().getWorkspace().getLockManager().unlock(destination.getPath());
}
}
private Node create(FolderCreationParameters params, Node destination) throws Exception{
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
Node newNode = Utils.createFolderInternally(params, accountingHandler);
params.getSession().save();
return newNode;
}
private Node create(FileCreationParameters params, Node destination) throws Exception{
Node newNode = createFileItemInternally(params.getSession(), destination, params.getStream(), params.getName(), params.getDescription(), params.getUser(), true);
params.getSession().save();
versionHandler.checkinContentNode(newNode);
log.info("file with id {} correctly created",newNode.getIdentifier());
return newNode;
}
private Node create(URLCreationParameters params, Node destination) throws Exception{
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
Node newNode = Utils.createURLInternally(params.getSession(), destination, params.getName(), params.getUrl(), params.getDescription(), params.getUser(), accountingHandler);
params.getSession().save();
return newNode;
}
private Node create(ArchiveStructureCreationParameter params, Node destination) throws Exception{
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
FolderCreationParameters folderParameters = FolderCreationParameters.builder().name(params.getParentFolderName()).author(params.getUser()).on(destination.getIdentifier()).with(params.getSession()).build();
Node parentDirectoryNode = Utils.createFolderInternally(folderParameters, accountingHandler);
params.getSession().save();
try {
if (params.getSession().getWorkspace().getLockManager().isLocked(destination.getPath()))
params.getSession().getWorkspace().getLockManager().unlock(destination.getPath());
} catch (Throwable t){
log.warn("error unlocking {}", destination.getPath(), t);
}
Set<Node> fileNodes = new HashSet<>();
HashMap<String, Node> directoryNodeMap = new HashMap<>();
try (ArchiveInputStream input = new ArchiveStreamFactory()
.createArchiveInputStream(new BufferedInputStream(params.getStream(), 1024*64))){
ArchiveEntry entry;
while ((entry = input.getNextEntry()) != null) {
String entirePath = entry.getName();
if (entry.isDirectory()) {
log.debug("creating directory with entire path {} ", entirePath);
createPath(entirePath, directoryNodeMap, parentDirectoryNode, params.getSession(), params.getUser());
continue;
} else {
try {
String name = entirePath.replaceAll("([^/]*/)*(.*)", "$2");
String parentPath = entirePath.replaceAll("(([^/]*/)*)(.*)", "$1");
log.debug("creating file with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath);
Node fileNode = null;
if (parentPath.isEmpty())
fileNode = createFileItemInternally(params.getSession(), parentDirectoryNode, input, name, "", params.getUser(), false);
else {
Node parentNode = directoryNodeMap.get(parentPath);
if (parentNode ==null)
parentNode = createPath(parentPath, directoryNodeMap, parentDirectoryNode, params.getSession(), params.getUser());
fileNode = createFileItemInternally(params.getSession(), parentNode, input, name, "", params.getUser(), false);
}
fileNodes.add(fileNode);
}catch(Exception e) {
log.warn("error getting file {}",entry.getName(),e);
}
}
}
}
params.getSession().save();
for (Node node : fileNodes)
versionHandler.checkinContentNode(node);
return parentDirectoryNode;
}
private Node createPath(String parentPath, Map<String, Node> directoryNodeMap, Node rootNode, Session ses, String user) throws StorageHubException, RepositoryException{
String[] parentPathSplit = parentPath.split("/");
String name = parentPathSplit[parentPathSplit.length-1];
StringBuilder relParentPath = new StringBuilder();
for (int i = 0 ; i<=parentPathSplit.length-2; i++)
relParentPath.append(parentPathSplit[i]).append("/");
if (relParentPath.toString().isEmpty()) {
FolderCreationParameters folderParameters = FolderCreationParameters.builder().name(name).author(user).on(rootNode.getIdentifier()).with(ses).build();
Node createdNode = Utils.createFolderInternally(folderParameters, accountingHandler);
directoryNodeMap.put(name+"/", createdNode);
return createdNode;
}else {
Node relParentNode = directoryNodeMap.get(relParentPath.toString());
if (relParentNode==null) {
relParentNode = createPath(relParentPath.toString(), directoryNodeMap, rootNode, ses, user);
}
FolderCreationParameters folderParameters = FolderCreationParameters.builder().name(name).author(user).on(relParentNode.getIdentifier()).with(ses).build();
Node createdNode = Utils.createFolderInternally(folderParameters, accountingHandler);
directoryNodeMap.put(relParentPath.append(name).append("/").toString(), createdNode);
return createdNode;
}
}
private Node create(GCubeItemCreationParameters params, Node destination) throws Exception{
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
Node newNode = Utils.createGcubeItemInternally(params.getSession(), destination, params.getItem().getName(), params.getItem().getDescription(), params.getUser(), params.getItem(), accountingHandler);
params.getSession().save();
return newNode;
}
private Node createFileItemInternally(Session ses, Node destinationNode, InputStream stream, String name, String description, String login, boolean withLock) throws RepositoryException, StorageHubException{
Node newNode;
FolderItem destinationItem = node2Item.getItem(destinationNode, Excludes.ALL);
FolderManager folderManager = pluginHandler.getFolderManager(destinationItem);
StorageBackend storageBackend = folderManager.getStorageBackend();
String relativePath = destinationNode.getPath();
if (destinationItem.isExternalManaged())
relativePath = relativePath.replace(folderManager.getRootFolder().getPath(), "");
String newNodePath = Paths.append(Paths.getPath(destinationNode.getPath()), name).toPath();
if (ses.nodeExists(newNodePath)) {
if (!folderManager.manageVersion())
throw new InvalidCallParameters("storage for plugin "+folderManager.getClass().getName()+" doesn't support versioning");
newNode = ses.getNode(newNodePath);
authChecker.checkWriteAuthorizationControl(ses, login, newNode.getIdentifier(), false);
AbstractFileItem item = fillItemWithContent(stream, storageBackend, name, description, relativePath,login);
item.setHidden(destinationNode.getProperty(NodeProperty.HIDDEN.toString()).getBoolean());
if (withLock) {
try {
ses.getWorkspace().getLockManager().lock(newNode.getPath(), true, true, 0,login);
}catch (LockException le) {
throw new ItemLockedException(le);
}
}
try {
versionHandler.checkoutContentNode(newNode);
log.trace("replacing content of class {}",item.getContent().getClass());
item2Node.replaceContent(newNode,item, ItemAction.UPDATED);
accountingHandler.createFileUpdated(item.getTitle(), ses, newNode, login, false);
ses.save();
}finally {
if (withLock) ses.getWorkspace().getLockManager().unlock(newNode.getPath());
}
}
else {
authChecker.checkWriteAuthorizationControl(ses, login, destinationNode.getIdentifier(), true);
AbstractFileItem item = fillItemWithContent(stream, storageBackend, name, description, relativePath, login);
if (withLock) {
try {
log.debug("trying to acquire lock");
Utils.acquireLockWithWait(ses, destinationNode.getPath(), false, login, 10);
}catch (LockException le) {
throw new ItemLockedException(le);
}
}
try {
newNode = item2Node.getNode(destinationNode, item);
accountingHandler.createEntryCreate(item.getTitle(), ses, newNode, login, false);
ses.save();
}finally {
if (withLock) ses.getWorkspace().getLockManager().unlock(destinationNode.getPath());
}
versionHandler.makeVersionableContent(newNode);
accountingHandler.createFolderAddObj(name, item.getClass().getSimpleName(), item.getContent().getMimeType(), ses, login, destinationNode, false);
}
return newNode;
}
private AbstractFileItem fillItemWithContent(InputStream stream, StorageBackend storageBackend, String name, String description, String relPath, String login) throws BackendGenericError{
ContentHandler handler = getContentHandler(stream, storageBackend, name, relPath, login);
AbstractFileItem item =handler.buildItem(name, description, login);
return item ;
}
private ContentHandler getContentHandler(InputStream stream, StorageBackend storageBackend, String name, String relPath, String login) throws BackendGenericError {
final MultipleOutputStream mos;
try{
mos = new MultipleOutputStream(stream, 2);
}catch (IOException e) {
throw new BackendGenericError(e);
}
Callable<ContentHandler> mimeTypeDector = new Callable<ContentHandler>() {
@Override
public ContentHandler call() throws Exception {
ContentHandler handler =null;
long start = System.currentTimeMillis();
log.debug("TIMING: reading the mimetype - start");
try(InputStream is1 = new BufferedInputStream(mos.get(), 1024*64)){
org.apache.tika.mime.MediaType mediaType = null;
TikaConfig config = TikaConfig.getDefaultConfig();
Detector detector = config.getDetector();
TikaInputStream stream = TikaInputStream.get(is1);
Metadata metadata = new Metadata();
metadata.add(Metadata.RESOURCE_NAME_KEY, name);
mediaType = detector.detect(stream, metadata);
String mimeType = mediaType.getBaseType().toString();
handler = contenthandlerFactory.create(mimeType);
is1.reset();
handler.initiliseSpecificContent(is1, name, mimeType);
log.trace("TIMING: reading the mimetype - finished in {}",System.currentTimeMillis()-start);
} catch (Throwable e) {
log.error("error retrieving mimeType",e);
throw new RuntimeException(e);
}
return handler;
}
};
Callable<MetaInfo> uploader = new Callable<MetaInfo>() {
@Override
public MetaInfo call() throws Exception {
try(InputStream is1 = mos.get()){
MetaInfo info = storageBackend.upload(is1, relPath, name);
return info;
}catch (Throwable e) {
log.error("error writing content",e );
throw e;
}
}
};
Future<ContentHandler> detectorF = executor.submit(AuthorizedTasks.bind(mimeTypeDector));
Future<MetaInfo> uploaderF = executor.submit(AuthorizedTasks.bind(uploader));
long start = System.currentTimeMillis();
log.debug("TIMING: writing the stream - start");
try {
mos.startWriting();
log.debug("TIMING: writing the stream - finished in {}",System.currentTimeMillis()-start);
ContentHandler handler = detectorF.get();
MetaInfo info = uploaderF.get();
handler.getContent().setData(NodeConstants.CONTENT_NAME);
handler.getContent().setStorageId(info.getStorageId());
handler.getContent().setSize(info.getSize());
handler.getContent().setRemotePath(info.getRemotePath());
return handler;
}catch (Exception e) {
throw new BackendGenericError(e);
}
}
}