refactoring
This commit is contained in:
parent
08351b2005
commit
487eae33e2
|
@ -0,0 +1,359 @@
|
|||
package org.gcube.data.access.storagehub.handlers.items;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
import javax.jcr.Node;
|
||||
import javax.jcr.PathNotFoundException;
|
||||
import javax.jcr.RepositoryException;
|
||||
import javax.jcr.Session;
|
||||
import javax.jcr.lock.LockException;
|
||||
|
||||
import org.apache.commons.compress.archivers.ArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.ArchiveInputStream;
|
||||
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
|
||||
import org.apache.tika.config.TikaConfig;
|
||||
import org.apache.tika.detect.Detector;
|
||||
import org.apache.tika.io.TikaInputStream;
|
||||
import org.apache.tika.metadata.Metadata;
|
||||
import org.gcube.common.authorization.library.AuthorizedTasks;
|
||||
import org.gcube.common.storagehub.model.NodeConstants;
|
||||
import org.gcube.common.storagehub.model.exceptions.BackendGenericError;
|
||||
import org.gcube.common.storagehub.model.exceptions.IdNotFoundException;
|
||||
import org.gcube.common.storagehub.model.exceptions.InvalidCallParameters;
|
||||
import org.gcube.common.storagehub.model.exceptions.InvalidItemException;
|
||||
import org.gcube.common.storagehub.model.exceptions.ItemLockedException;
|
||||
import org.gcube.common.storagehub.model.exceptions.UserNotAuthorizedException;
|
||||
import org.gcube.common.storagehub.model.items.AbstractFileItem;
|
||||
import org.gcube.common.storagehub.model.items.FolderItem;
|
||||
import org.gcube.common.storagehub.model.storages.MetaInfo;
|
||||
import org.gcube.common.storagehub.model.types.ItemAction;
|
||||
import org.gcube.common.storagehub.model.types.NodeProperty;
|
||||
import org.gcube.data.access.storagehub.AuthorizationChecker;
|
||||
import org.gcube.data.access.storagehub.MultipleOutputStream;
|
||||
import org.gcube.data.access.storagehub.Utils;
|
||||
import org.gcube.data.access.storagehub.accounting.AccountingHandler;
|
||||
import org.gcube.data.access.storagehub.handlers.StorageBackendHandler;
|
||||
import org.gcube.data.access.storagehub.handlers.VersionHandler;
|
||||
import org.gcube.data.access.storagehub.handlers.content.ContentHandler;
|
||||
import org.gcube.data.access.storagehub.handlers.content.ContentHandlerFactory;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.ArchiveStructureCreationParameter;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.CreateParameters;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.FileCreationParameters;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.FolderCreationParameters;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.GCubeItemCreationParameters;
|
||||
import org.gcube.data.access.storagehub.handlers.items.builders.URLCreationParameters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Singleton
|
||||
public class ItemHandler {
|
||||
|
||||
@Inject
|
||||
AuthorizationChecker authChecker;
|
||||
|
||||
@Inject
|
||||
AccountingHandler accountingHandler;
|
||||
|
||||
@Inject
|
||||
ContentHandlerFactory contenthandlerFactory;
|
||||
|
||||
@Inject
|
||||
VersionHandler versionHandler;
|
||||
|
||||
@Inject StorageBackendHandler storageBackend;
|
||||
|
||||
private static ExecutorService executor = Executors.newFixedThreadPool(100);
|
||||
|
||||
@Inject Node2ItemConverter node2Item;
|
||||
@Inject Item2NodeConverter item2Node;
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(ItemHandler.class);
|
||||
|
||||
|
||||
public <T extends CreateParameters> String create(T parameters) throws Exception{
|
||||
Session ses = parameters.getSession();
|
||||
|
||||
Node destination;
|
||||
try {
|
||||
destination = ses.getNodeByIdentifier(parameters.getParentId());
|
||||
}catch(RepositoryException inf) {
|
||||
throw new IdNotFoundException(parameters.getParentId());
|
||||
}
|
||||
|
||||
if (!node2Item.checkNodeType(destination, FolderItem.class))
|
||||
throw new InvalidItemException("the destination item is not a folder");
|
||||
|
||||
authChecker.checkWriteAuthorizationControl(ses, destination.getIdentifier(), true);
|
||||
|
||||
|
||||
try {
|
||||
Node newNode = null;
|
||||
switch (parameters.getMangedType()) {
|
||||
case FILE:
|
||||
newNode = create((FileCreationParameters)parameters, destination);
|
||||
break;
|
||||
case FOLDER:
|
||||
newNode = create((FolderCreationParameters)parameters, destination);
|
||||
break;
|
||||
case ARCHIVE:
|
||||
newNode = create((ArchiveStructureCreationParameter)parameters, destination);
|
||||
break;
|
||||
case URL:
|
||||
newNode = create((URLCreationParameters) parameters, destination);
|
||||
break;
|
||||
case GCUBEITEM:
|
||||
newNode = create((GCubeItemCreationParameters) parameters, destination);
|
||||
break;
|
||||
default:
|
||||
throw new InvalidCallParameters("Item not supported");
|
||||
}
|
||||
log.debug("item with id {} correctly created",newNode.getIdentifier());
|
||||
return newNode.getIdentifier();
|
||||
} finally {
|
||||
if (parameters.getSession().getWorkspace().getLockManager().isLocked(destination.getPath()))
|
||||
parameters.getSession().getWorkspace().getLockManager().unlock(destination.getPath());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private Node create(FolderCreationParameters params, Node destination) throws Exception{
|
||||
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
|
||||
Node newNode = Utils.createFolderInternally(params.getSession(), destination, params.getName(), params.getDescription(), params.isHidden(), params.getUser(), accountingHandler);
|
||||
params.getSession().save();
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private Node create(FileCreationParameters params, Node destination) throws Exception{
|
||||
Node newNode = createFileItemInternally(params.getSession(), destination, params.getStream(), params.getName(), params.getDescription(), params.getUser(), true);
|
||||
params.getSession().save();
|
||||
|
||||
versionHandler.checkinContentNode(newNode, params.getSession());
|
||||
log.info("file with id {} correctly created",newNode.getIdentifier());
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private Node create(URLCreationParameters params, Node destination) throws Exception{
|
||||
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
|
||||
Node newNode = Utils.createURLInternally(params.getSession(), destination, params.getName(), params.getUrl(), params.getDescription(), params.getUser(), accountingHandler);
|
||||
params.getSession().save();
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private Node create(ArchiveStructureCreationParameter params, Node destination) throws Exception{
|
||||
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
|
||||
Node parentDirectoryNode = Utils.createFolderInternally(params.getSession(), destination, params.getParentFolderName(), "", false, params.getUser(), accountingHandler);
|
||||
try {
|
||||
if (params.getSession().getWorkspace().getLockManager().isLocked(destination.getPath()))
|
||||
params.getSession().getWorkspace().getLockManager().unlock(destination.getPath());
|
||||
} catch (Throwable t){
|
||||
log.warn("error unlocking {}", destination.getPath(), t);
|
||||
}
|
||||
|
||||
Set<Node> fileNodes = new HashSet<>();
|
||||
|
||||
|
||||
HashMap<String, Node> directoryNodeMap = new HashMap<>();
|
||||
|
||||
try (ArchiveInputStream input = new ArchiveStreamFactory()
|
||||
.createArchiveInputStream(new BufferedInputStream(params.getStream(), 1024*64))){
|
||||
ArchiveEntry entry;
|
||||
while ((entry = input.getNextEntry()) != null) {
|
||||
if (entry.isDirectory()) {
|
||||
String entirePath = entry.getName();
|
||||
String name = entirePath.replaceAll("(.*/)*(.*)/", "$2");
|
||||
String parentPath = entirePath.replaceAll("(.*/)*(.*)/", "$1");
|
||||
log.debug("creating directory with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath);
|
||||
Node createdNode;
|
||||
if (parentPath.isEmpty()) {
|
||||
createdNode = Utils.createFolderInternally(params.getSession(), parentDirectoryNode, name, "", false, params.getUser(), accountingHandler);
|
||||
}else {
|
||||
Node parentNode = directoryNodeMap.get(parentPath);
|
||||
createdNode = Utils.createFolderInternally(params.getSession(), parentNode, name, "", false, params.getUser(), accountingHandler);
|
||||
}
|
||||
directoryNodeMap.put(entirePath, createdNode);
|
||||
continue;
|
||||
} else {
|
||||
try {
|
||||
String entirePath = entry.getName();
|
||||
String name = entirePath.replaceAll("(.*/)*(.*)", "$2");
|
||||
String parentPath = entirePath.replaceAll("(.*/)*(.*)", "$1");
|
||||
log.debug("creating file with entire path {}, name {}, parentPath {} ", entirePath, name, parentPath);
|
||||
Node fileNode = null;
|
||||
if (parentPath.isEmpty())
|
||||
fileNode = createFileItemInternally(params.getSession(), parentDirectoryNode, input, name, "", params.getUser(), false);
|
||||
else {
|
||||
Node parentNode = directoryNodeMap.get(parentPath);
|
||||
fileNode = createFileItemInternally(params.getSession(), parentNode, input, name, "", params.getUser(), false);
|
||||
}
|
||||
fileNodes.add(fileNode);
|
||||
}catch(Exception e) {
|
||||
log.warn("error getting file {}",entry.getName(),e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
params.getSession().save();
|
||||
for (Node node : fileNodes)
|
||||
versionHandler.checkinContentNode(node, params.getSession());
|
||||
return parentDirectoryNode;
|
||||
}
|
||||
|
||||
|
||||
private Node create(GCubeItemCreationParameters params, Node destination) throws Exception{
|
||||
Utils.acquireLockWithWait(params.getSession(), destination.getPath(), false, params.getUser(), 10);
|
||||
Node newNode = Utils.createGcubeItemInternally(params.getSession(), destination, params.getItem().getName(), params.getItem().getDescription(), params.getUser(), params.getItem(), accountingHandler);
|
||||
params.getSession().save();
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private Node createFileItemInternally(Session ses, Node destinationNode, InputStream stream, String name, String description, String login, boolean withLock) throws RepositoryException, UserNotAuthorizedException, ItemLockedException, BackendGenericError{
|
||||
|
||||
Node newNode;
|
||||
try {
|
||||
newNode = ses.getNode(org.gcube.common.storagehub.model.Paths.append(org.gcube.common.storagehub.model.Paths.getPath(destinationNode.getPath()), name).toPath());
|
||||
authChecker.checkWriteAuthorizationControl(ses, newNode.getIdentifier(), false);
|
||||
AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login);
|
||||
item.setHidden(destinationNode.getProperty(NodeProperty.HIDDEN.toString()).getBoolean());
|
||||
if (withLock) {
|
||||
try {
|
||||
ses.getWorkspace().getLockManager().lock(newNode.getPath(), true, true, 0,login);
|
||||
}catch (LockException le) {
|
||||
throw new ItemLockedException(le);
|
||||
}
|
||||
}
|
||||
try {
|
||||
versionHandler.checkoutContentNode(newNode, ses);
|
||||
log.trace("replacing content of class {}",item.getContent().getClass());
|
||||
item2Node.replaceContent(newNode,item, ItemAction.UPDATED);
|
||||
accountingHandler.createFileUpdated(item.getTitle(), ses, newNode, false);
|
||||
ses.save();
|
||||
}finally {
|
||||
if (withLock) ses.getWorkspace().getLockManager().unlock(newNode.getPath());
|
||||
}
|
||||
}catch(PathNotFoundException pnf) {
|
||||
authChecker.checkWriteAuthorizationControl(ses, destinationNode.getIdentifier(), true);
|
||||
AbstractFileItem item = fillItemWithContent(stream, name, description, destinationNode.getPath(), login);
|
||||
if (withLock) {
|
||||
try {
|
||||
log.debug("trying to acquire lock");
|
||||
Utils.acquireLockWithWait(ses, destinationNode.getPath(), false, login, 10);
|
||||
}catch (LockException le) {
|
||||
throw new ItemLockedException(le);
|
||||
}
|
||||
}
|
||||
try {
|
||||
newNode = item2Node.getNode(destinationNode, item);
|
||||
accountingHandler.createEntryCreate(item.getTitle(), ses, newNode, false);
|
||||
ses.save();
|
||||
}finally {
|
||||
if (withLock) ses.getWorkspace().getLockManager().unlock(destinationNode.getPath());
|
||||
}
|
||||
versionHandler.makeVersionableContent(newNode, ses);
|
||||
accountingHandler.createFolderAddObj(name, item.getClass().getSimpleName(), item.getContent().getMimeType(), ses, destinationNode, false);
|
||||
}
|
||||
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private AbstractFileItem fillItemWithContent(InputStream stream, String name, String description, String path, String login) throws BackendGenericError{
|
||||
ContentHandler handler = getContentHandler(stream , name, path, login);
|
||||
AbstractFileItem item =handler.buildItem(name, description, login);
|
||||
return item ;
|
||||
}
|
||||
|
||||
private ContentHandler getContentHandler(InputStream stream , String name, String path, String login) throws BackendGenericError {
|
||||
|
||||
|
||||
final MultipleOutputStream mos;
|
||||
try{
|
||||
mos = new MultipleOutputStream(stream, 2);
|
||||
}catch (IOException e) {
|
||||
throw new BackendGenericError(e);
|
||||
}
|
||||
|
||||
Callable<ContentHandler> mimeTypeDector = new Callable<ContentHandler>() {
|
||||
|
||||
@Override
|
||||
public ContentHandler call() throws Exception {
|
||||
ContentHandler handler =null;
|
||||
long start = System.currentTimeMillis();
|
||||
log.debug("TIMING: reading the mimetype - start");
|
||||
try(InputStream is1 = new BufferedInputStream(mos.get(), 1024*64)){
|
||||
org.apache.tika.mime.MediaType mediaType = null;
|
||||
TikaConfig config = TikaConfig.getDefaultConfig();
|
||||
Detector detector = config.getDetector();
|
||||
TikaInputStream stream = TikaInputStream.get(is1);
|
||||
Metadata metadata = new Metadata();
|
||||
metadata.add(Metadata.RESOURCE_NAME_KEY, name);
|
||||
mediaType = detector.detect(stream, metadata);
|
||||
String mimeType = mediaType.getBaseType().toString();
|
||||
|
||||
handler = contenthandlerFactory.create(mimeType);
|
||||
|
||||
is1.reset();
|
||||
handler.initiliseSpecificContent(is1, name, mimeType);
|
||||
|
||||
log.trace("TIMING: reading the mimetype - finished in {}",System.currentTimeMillis()-start);
|
||||
} catch (Throwable e) {
|
||||
log.error("error retrieving mimeType",e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return handler;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Callable<MetaInfo> uploader = new Callable<MetaInfo>() {
|
||||
|
||||
@Override
|
||||
public MetaInfo call() throws Exception {
|
||||
try(InputStream is1 = mos.get()){
|
||||
String uid = UUID.randomUUID().toString();
|
||||
String remotePath= String.format("%s/%s-%s",path,uid,name);
|
||||
MetaInfo info = storageBackend.upload(is1, remotePath);
|
||||
return info;
|
||||
}catch (Throwable e) {
|
||||
log.error("error writing content",e );
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
Future<ContentHandler> detectorF = executor.submit(AuthorizedTasks.bind(mimeTypeDector));
|
||||
Future<MetaInfo> uploaderF = executor.submit(AuthorizedTasks.bind(uploader));
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
log.debug("TIMING: writing the stream - start");
|
||||
try {
|
||||
mos.startWriting();
|
||||
log.debug("TIMING: writing the stream - finished in {}",System.currentTimeMillis()-start);
|
||||
|
||||
ContentHandler handler = detectorF.get();
|
||||
MetaInfo info = uploaderF.get();
|
||||
handler.getContent().setData(NodeConstants.CONTENT_NAME);
|
||||
handler.getContent().setStorageId(info.getStorageId());
|
||||
handler.getContent().setSize(info.getSize());
|
||||
handler.getContent().setRemotePath(info.getRemotePath());
|
||||
return handler;
|
||||
}catch (Exception e) {
|
||||
throw new BackendGenericError(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue