ws-thredds/src/main/java/org/gcube/usecases/ws/thredds/engine/impl/SynchEngineImpl.java

290 lines
11 KiB
Java
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package org.gcube.usecases.ws.thredds.engine.impl;
import java.io.File;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.data.transfer.model.plugins.thredds.DataSetScan;
import org.gcube.data.transfer.model.plugins.thredds.ThreddsCatalog;
import org.gcube.data.transfer.model.plugins.thredds.ThreddsInfo;
import org.gcube.usecases.ws.thredds.Constants;
import org.gcube.usecases.ws.thredds.LocalConfiguration;
import org.gcube.usecases.ws.thredds.SyncEngine;
import org.gcube.usecases.ws.thredds.engine.impl.threads.ProcessInitializationThread;
import org.gcube.usecases.ws.thredds.engine.impl.threads.RequestLogger;
import org.gcube.usecases.ws.thredds.faults.InternalException;
import org.gcube.usecases.ws.thredds.faults.ProcessNotFoundException;
import org.gcube.usecases.ws.thredds.faults.WorkspaceFolderNotRootException;
import org.gcube.usecases.ws.thredds.faults.WorkspaceInteractionException;
import org.gcube.usecases.ws.thredds.faults.WorkspaceLockedException;
import org.gcube.usecases.ws.thredds.faults.WorkspaceNotSynchedException;
import org.gcube.usecases.ws.thredds.model.CompletionCallback;
import org.gcube.usecases.ws.thredds.model.SyncEngineStatusDescriptor;
import org.gcube.usecases.ws.thredds.model.SyncFolderDescriptor;
import org.gcube.usecases.ws.thredds.model.SyncOperationCallBack;
import org.gcube.usecases.ws.thredds.model.SynchFolderConfiguration;
import org.gcube.usecases.ws.thredds.model.SynchronizedElementInfo;
import org.gcube.usecases.ws.thredds.model.gui.CatalogBean;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SynchEngineImpl implements SyncEngine{
private static SynchEngineImpl instance=null;
public static synchronized SyncEngine get() {
if(instance==null) {
instance=new SynchEngineImpl();
}
return instance;
}
private SynchEngineImpl() {
localProcesses=new ConcurrentHashMap<>();
// NB UNBOUNDED QUEUE MEANS ONLY CORE THREADS ARE EXECUTED
int scannerMaxSize=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.SCANNER_POOL_MAX_SIZE));
int scannerCoreSize=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.SCANNER_POOL_CORE_SIZE));
// int scannerCoreSize=scannerMaxSize;
int scannerIdleMs=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.SCANNER_POOL_IDLE_MS));
int transfersMaxSize=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.TRANSFERS_POOL_MAX_SIZE));
int transfersCoreSize=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.TRANSFERS_POOL_CORE_SIZE));
// int transfersCoreSize=transfersMaxSize;
int transfersIdleMs=Integer.parseInt(LocalConfiguration.getProperty(Constants.Configuration.TRANSFERS_POOL_IDLE_MS));
initializationExecutor= new ThreadPoolExecutor(scannerCoreSize, scannerMaxSize, scannerIdleMs,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
synchronizationExecutor=new ThreadPoolExecutor(transfersCoreSize, transfersMaxSize, transfersIdleMs,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
private String requestLoggerPath=null;
@Override
public void setRequestLogger(String path) {
requestLoggerPath=path;
}
@Override
public boolean isRequestLoggerEnabled() {
return requestLoggerPath!=null;
}
@Override
public String getRequestLoggerPath() {
return requestLoggerPath;
}
//folder ID -> Process
private ConcurrentHashMap<String, Process> localProcesses;
private ExecutorService initializationExecutor=null;
private ExecutorService synchronizationExecutor=null;
private final CompletionCallback completionCallback=new CompletionCallback() {
@Override
public void onProcessCompleted(Process completedProcess) {
try {
ProcessDescriptor descriptor=completedProcess.getDescriptor();
log.info("Process {} is completed. Going to cleanup.. ",descriptor);
localProcesses.remove(descriptor.getFolderId());
completedProcess.cleanup();
}catch(Throwable t) {
log.warn("Unable to cleanup {} ",completedProcess,t);
}
}
};
@Override
public SyncFolderDescriptor check(String folderId, boolean recursively) throws WorkspaceInteractionException, InternalException {
WorkspaceFolderManager manager=new WorkspaceFolderManager(folderId);
return manager.check(recursively);
}
@Override
public void registerCallBack(String folderId, SyncOperationCallBack callback) throws ProcessNotFoundException {
if(!localProcesses.containsKey(folderId)) throw new ProcessNotFoundException(folderId+" is not under local processes");
localProcesses.get(folderId).addCallBack(callback);
}
@Override
public ProcessDescriptor doSync(String folderId) throws WorkspaceInteractionException, InternalException {
if(localProcesses.containsKey(folderId))
return localProcesses.get(folderId).getDescriptor();
else {
WorkspaceFolderManager manager=new WorkspaceFolderManager(folderId);
if (!manager.isSynched()) throw new WorkspaceNotSynchedException("Folder "+folderId+" is not configured for synchronization.");
if(manager.isLocked()) throw new WorkspaceLockedException("Folder "+folderId+"is locked by an external process.");
if(!manager.isRoot()) throw new WorkspaceFolderNotRootException("Unable to launch synch operation. Folder "+folderId+" is not root configuration");
String callerContext=Security.getCurrentScope();
log.debug("Checking context. Caller is {} ",callerContext);
String configurationContext=Security.getContext(manager.getSynchConfiguration().getTargetToken());
if(!callerContext.equals(configurationContext))
throw new WorkspaceInteractionException("Cannot sync folder from context "+callerContext+". Expected context is "+configurationContext);
Process toLaunch=new Process(folderId,completionCallback);
localProcesses.put(folderId, toLaunch);
initializationExecutor.submit(new ProcessInitializationThread(toLaunch,synchronizationExecutor));
return toLaunch.getDescriptor();
}
}
@Override
public void stopSynch(String folderId) throws ProcessNotFoundException {
if(!localProcesses.containsKey(folderId)) throw new ProcessNotFoundException(folderId+" is not under local processes");
localProcesses.get(folderId).cancel();
}
@Override
public void setSynchronizedFolder(SynchFolderConfiguration config,String folderId) throws WorkspaceInteractionException, InternalException {
// Check config
if(config==null) throw new InternalException("Passed config is null : "+config);
String remotePath=config.getRemotePath();
if(remotePath==null||remotePath.isEmpty()||remotePath.startsWith("/"))
throw new InternalException("Invalid remote path "+remotePath+".");
new WorkspaceFolderManager(folderId).configure(config);
}
@Override
public void unsetSynchronizedFolder(String folderId,boolean deleteRemoteContent) throws WorkspaceInteractionException, InternalException {
new WorkspaceFolderManager(folderId).dismiss(deleteRemoteContent);
}
@Override
public SynchronizedElementInfo getInfo(String elementId) {
return WorkspaceFolderManager.getInfo(elementId);
}
@Override
public void updateCatalogFile(String folderId, File toUpdate) throws InternalException {
File previousCatalogFile=null;
try {
WorkspaceFolderManager manager=new WorkspaceFolderManager(folderId);
previousCatalogFile=manager.loadCatalogFile();
String lockId=UUID.randomUUID().toString();
manager.lock(lockId);
manager.updateCatalogFile(toUpdate);
manager.unlock(lockId);
}catch(Throwable t) {
log.warn("Unable to update catalogFile for {}. Trying to restore previous one..",folderId,t);
throw new InternalException("Unable to restore previous catalog.",t);
//TODO try to restore previous catalog
}
}
@Override
public void shutDown() {
log.trace("Cancelling processes...");
for(Entry<String,Process> entry:localProcesses.entrySet())
entry.getValue().cancel();
log.trace("Shutting down services... ");
initializationExecutor.shutdown();
synchronizationExecutor.shutdown();
do {
log.trace("Waiting for services to terminate..");
try {Thread.sleep(1000l);
} catch (InterruptedException e) {}
}while(!initializationExecutor.isTerminated()||!synchronizationExecutor.isTerminated());
RequestLogger.get().close();
log.trace("Terminated.");
}
@Override
public void forceUnlock(String folderId) throws InternalException, WorkspaceInteractionException {
log.warn("Forcing unlock of {} ",folderId);
new WorkspaceFolderManager(folderId).forceUnlock();
}
@Override
public ProcessDescriptor getProcessDescriptorByFolderId(String folderId) throws ProcessNotFoundException {
if(!localProcesses.containsKey(folderId)) throw new ProcessNotFoundException(folderId+" is not under processes or process is not in this host");
return localProcesses.get(folderId).getDescriptor();
}
@Override
public ProcessStatus getProcessStatusByFolderId(String folderId) throws ProcessNotFoundException {
if(!localProcesses.containsKey(folderId)) throw new ProcessNotFoundException(folderId+" is not under processes or process is not in this host");
return localProcesses.get(folderId).getStatus();
}
@Override
public Set<CatalogBean> getAvailableCatalogsByToken(String token) throws InternalException {
ThreddsController controller=new ThreddsController("",token);
ThreddsInfo info=controller.getThreddsInfo();
Set<CatalogBean> toReturn=asCatalogBeanSet(info.getCatalog());
DataSetScan mainScan=info.getCatalog().getDeclaredDataSetScan().iterator().next();
CatalogBean defaultBean=new CatalogBean(mainScan.getName(),mainScan.getLocation(),true);
toReturn.remove(defaultBean);
toReturn.add(defaultBean);
//*** Cleaning :
// absolute paths to relative paths (from thredds persistence)
// leading/ending '/'
String threddsPersistencePath=info.getLocalBasePath();
for(CatalogBean bean:toReturn) {
String path=bean.getPath();
if(path.startsWith(threddsPersistencePath))
path=path.substring(threddsPersistencePath.length());
if(path.startsWith("/")) path=path.substring(1);
if(path.endsWith("/"))path=path.substring(0, path.length()-1);
bean.setPath(path);
}
return toReturn;
}
private static HashSet<CatalogBean> asCatalogBeanSet(ThreddsCatalog catalog){
HashSet<CatalogBean> toReturn=new HashSet<>();
for(DataSetScan scan:catalog.getDeclaredDataSetScan())
toReturn.add(new CatalogBean(scan.getName(),
scan.getLocation(),false));
if(catalog.getSubCatalogs()!=null&&catalog.getSubCatalogs().getLinkedCatalogs()!=null)
for(ThreddsCatalog sub:catalog.getSubCatalogs().getLinkedCatalogs())
toReturn.addAll(asCatalogBeanSet(sub));
return toReturn;
}
@Override
public SyncEngineStatusDescriptor getStatus() {
ThreadPoolExecutor exec=(ThreadPoolExecutor) synchronizationExecutor;
return new SyncEngineStatusDescriptor(exec.getActiveCount(), exec.getQueue().size(), LocalConfiguration.get().asMap());
}
}