393 lines
13 KiB
Java
393 lines
13 KiB
Java
|
package org.gcube.portal.wssynclibrary.thredds;
|
||
|
|
||
|
import java.util.Collections;
|
||
|
import java.util.HashMap;
|
||
|
import java.util.Map;
|
||
|
|
||
|
import org.gcube.common.homelibrary.home.HomeLibrary;
|
||
|
import org.gcube.common.homelibrary.home.workspace.Properties;
|
||
|
import org.gcube.common.homelibrary.home.workspace.Workspace;
|
||
|
import org.gcube.common.homelibrary.home.workspace.WorkspaceFolder;
|
||
|
import org.gcube.common.scope.api.ScopeProvider;
|
||
|
import org.gcube.portal.wssynclibrary.shared.ItemNotSynched;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.Status;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.Sync_Status;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.ThProcessDescriptor;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.ThProcessStatus;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.ThSyncFolderDescriptor;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.ThSyncStatus;
|
||
|
import org.gcube.portal.wssynclibrary.shared.thredds.ThSynchFolderConfiguration;
|
||
|
import org.gcube.usecases.ws.thredds.SyncEngine;
|
||
|
import org.gcube.usecases.ws.thredds.engine.impl.ProcessDescriptor;
|
||
|
import org.gcube.usecases.ws.thredds.engine.impl.ProcessStatus;
|
||
|
import org.gcube.usecases.ws.thredds.faults.InternalException;
|
||
|
import org.gcube.usecases.ws.thredds.faults.ProcessNotFoundException;
|
||
|
import org.gcube.usecases.ws.thredds.faults.WorkspaceInteractionException;
|
||
|
import org.gcube.usecases.ws.thredds.faults.WorkspaceLockedException;
|
||
|
import org.gcube.usecases.ws.thredds.faults.WorkspaceNotSynchedException;
|
||
|
import org.gcube.usecases.ws.thredds.model.SyncFolderDescriptor;
|
||
|
import org.gcube.usecases.ws.thredds.model.SyncOperationCallBack;
|
||
|
import org.gcube.usecases.ws.thredds.model.SynchFolderConfiguration;
|
||
|
import org.slf4j.Logger;
|
||
|
import org.slf4j.LoggerFactory;
|
||
|
|
||
|
|
||
|
// TODO: Auto-generated Javadoc
|
||
|
/**
|
||
|
* The Class WorkspaceThreddsSynchronize.
|
||
|
*
|
||
|
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
|
||
|
* Feb 14, 2018
|
||
|
*/
|
||
|
public class WorkspaceThreddsSynchronize implements WorkspaceThreddsSynchronizedRepository<ThSyncStatus, ThSyncFolderDescriptor>{
|
||
|
|
||
|
/** The logger. */
|
||
|
private static Logger logger = LoggerFactory.getLogger(WorkspaceThreddsSynchronize.class);
|
||
|
|
||
|
/** The engine. */
|
||
|
private SyncEngine engine=null;
|
||
|
|
||
|
/** The instance. */
|
||
|
private static WorkspaceThreddsSynchronize instance = null;
|
||
|
|
||
|
public static final String WS_SYNCH_SYNCH_STATUS = "WS-SYNCH.SYNCH-STATUS";
|
||
|
|
||
|
/** The map call back. */
|
||
|
// Fully synchronized HashMap
|
||
|
private Map<String, ThSyncStatus> mapCallBack = Collections.synchronizedMap(new HashMap<>());
|
||
|
|
||
|
/**
|
||
|
* Instantiates a new workspace thredds synchronize.
|
||
|
*/
|
||
|
private WorkspaceThreddsSynchronize() {
|
||
|
// GET ENGINE : SINGLETON INSTANCE
|
||
|
engine = SyncEngine.get();
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Gets the single instance of WorkspaceThreddsSynchronize.
|
||
|
*
|
||
|
* @return single instance of WorkspaceThreddsSynchronize
|
||
|
*/
|
||
|
public static WorkspaceThreddsSynchronize getInstance() {
|
||
|
if (instance == null) {
|
||
|
instance = new WorkspaceThreddsSynchronize();
|
||
|
}
|
||
|
return instance;
|
||
|
}
|
||
|
|
||
|
//setSynchronizedFolder(config, folder.getId());
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#isItemSynchronized(java.lang.String)
|
||
|
*/
|
||
|
@Override
|
||
|
public ThSyncFolderDescriptor checkItemSynched(String itemId) throws ItemNotSynched, Exception {
|
||
|
logger.debug("Perfoming checkItemSynched for id: "+itemId);
|
||
|
try {
|
||
|
// WHEN OPENING A FOLDER, INVOKE CHECK TO UPDATE SYNCH STATUS
|
||
|
SyncFolderDescriptor desc = engine.check(itemId, false);
|
||
|
|
||
|
logger.debug("The item id: "+itemId +" is synched");
|
||
|
ThSyncFolderDescriptor descr = ThreddsConverter.toThSyncFolderDescriptor.apply(desc);
|
||
|
logger.trace("CheckItemSynched for id: "+itemId+" returning descriptor: "+descr);
|
||
|
return descr;
|
||
|
|
||
|
}catch(WorkspaceNotSynchedException e) {
|
||
|
String err = "The item id: "+itemId +" is not synched";
|
||
|
logger.trace(err);
|
||
|
throw new ItemNotSynched("The item id: "+itemId +" is not synched");
|
||
|
// System.out.println("Folder not synched, configurin it..");
|
||
|
//engine.setSynchronizedFolder(config, folder.getId());
|
||
|
}catch(WorkspaceLockedException e) {
|
||
|
logger.warn("Workspace locked, going to force unlock..");
|
||
|
engine.forceUnlock(itemId);
|
||
|
//System.out.println("Workspace locked, going to force unlock.."); // MAINLY FOR TEST PURPOSES, OR WHEN SOMETHIGN GOES WRONG.. USE CAUTIOUSLY
|
||
|
//engine.forceUnlock(folder.getId());
|
||
|
|
||
|
//TODO???
|
||
|
return null;
|
||
|
//???
|
||
|
} catch (WorkspaceInteractionException | InternalException e) {
|
||
|
logger.error("Error: ",e);
|
||
|
if(e instanceof WorkspaceInteractionException)
|
||
|
throw new Exception("Sorry, an error occurred during check syncronization due to WS interection for the itemId: "+itemId);
|
||
|
else if(e instanceof InternalException)
|
||
|
throw new Exception("Sorry, an Internal Exception occurred during check syncronization for the itemId: "+itemId);
|
||
|
|
||
|
throw new Exception("Sorry, an error occurred server side during chck syncronization for the itemId: "+itemId);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Sets the synchronized folder.
|
||
|
*
|
||
|
* @param thConfig the th config
|
||
|
* @param itemId the item id
|
||
|
* @return the th sync folder descriptor
|
||
|
* @throws Exception the exception
|
||
|
*/
|
||
|
public ThSyncFolderDescriptor setSynchronizedFolder(ThSynchFolderConfiguration thConfig, String itemId) throws Exception {
|
||
|
SynchFolderConfiguration config = ThreddsConverter.toSynchFolderConfiguration.apply(thConfig);
|
||
|
|
||
|
if(thConfig.getRemotePath()==null || thConfig.getRemotePath().isEmpty())
|
||
|
throw new Exception("A valid remote path must be provided");
|
||
|
|
||
|
if(thConfig.getToCreateCatalogName()==null || thConfig.getToCreateCatalogName().isEmpty())
|
||
|
throw new Exception("A valid Catalogue Name must be provided");
|
||
|
|
||
|
if(thConfig.getTargetToken()==null || thConfig.getTargetToken().isEmpty())
|
||
|
throw new Exception("A valid Target Token must be provided");
|
||
|
|
||
|
try {
|
||
|
engine.setSynchronizedFolder(config, itemId);
|
||
|
} catch (WorkspaceInteractionException | InternalException e) {
|
||
|
logger.error("Error on setSynchronizedFolder for config: "+thConfig);
|
||
|
logger.error("Using itemId: "+itemId,e);
|
||
|
throw new Exception("Error on setSynchronizedFolder");
|
||
|
}
|
||
|
return null;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#doSynchronization(java.lang.String)
|
||
|
*/
|
||
|
@Override
|
||
|
public ThSyncStatus doSync(String itemId) throws Exception {
|
||
|
|
||
|
try {
|
||
|
// INVOKE SYNCHRONIZATION ON FOLDER
|
||
|
ProcessDescriptor descriptor = engine.doSync(itemId);
|
||
|
|
||
|
ThSyncStatus synStatus = mapCallBack.get(itemId);
|
||
|
|
||
|
if(synStatus==null) {
|
||
|
|
||
|
registerCallbackForId(itemId);
|
||
|
descriptor = engine.getProcessDescriptorByFolderId(itemId);
|
||
|
ProcessStatus status = engine.getProcessStatusByFolderId(itemId);
|
||
|
updateMapCallback(itemId, status, descriptor);
|
||
|
}
|
||
|
|
||
|
logger.debug("DoSync returning status: "+synStatus);
|
||
|
return synStatus;
|
||
|
|
||
|
} catch (WorkspaceInteractionException | InternalException | ProcessNotFoundException e) {
|
||
|
logger.error("Error: ",e);
|
||
|
|
||
|
if(e instanceof WorkspaceInteractionException)
|
||
|
throw new Exception("Sorry, an error occurred during syncronization due to WS interection for the itemId: "+itemId);
|
||
|
else if(e instanceof InternalException)
|
||
|
throw new Exception("Sorry, an Internal Exception occurred during syncronization for the itemId: "+itemId);
|
||
|
|
||
|
throw new Exception("Sorry, an error occurred server side during syncronization for the itemId: "+itemId);
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.DoSyncItem#getSyncStatus(java.lang.String)
|
||
|
*/
|
||
|
@Override
|
||
|
public ThSyncStatus monitorSyncStatus(String itemId) throws ItemNotSynched, Exception {
|
||
|
|
||
|
|
||
|
try {
|
||
|
//ThSyncFolderDescriptor descr = checkItemSynched(itemId);
|
||
|
|
||
|
ThSyncStatus synStatus = mapCallBack.get(itemId);
|
||
|
|
||
|
|
||
|
if(synStatus==null) {
|
||
|
logger.info("The sync is not on-going...,returning process status null");
|
||
|
// String err = "The item id: "+itemId +" is not cached as synched";
|
||
|
// throw new ItemNotSynched(err);
|
||
|
return new ThSyncStatus(null, null);
|
||
|
}
|
||
|
|
||
|
if(synStatus.getProcessStatus()!=null) {
|
||
|
if(synStatus.getProcessStatus().getStatus()!=null) {
|
||
|
if(synStatus.getProcessStatus().getStatus().equals(Status.COMPLETED)) {
|
||
|
//TODO NOW?
|
||
|
mapCallBack.put(itemId, null);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
logger.trace("MonitorSyncStatus for item: "+itemId+" returning: "+synStatus);
|
||
|
return synStatus;
|
||
|
|
||
|
// } catch (ItemNotSynched e) {
|
||
|
// String err = "The item id: "+itemId +" is not synched repository side";
|
||
|
// throw new ItemNotSynched(err);
|
||
|
} catch (Exception e) {
|
||
|
throw new Exception("Sorry, an error occurred during syncronization for itemId: "+itemId);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#removeSync(java.lang.String)
|
||
|
*/
|
||
|
@Override
|
||
|
public Boolean removeSync(String itemId) {
|
||
|
return null;
|
||
|
// TODO Auto-generated method stub
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#stopSync(java.lang.String)
|
||
|
*/
|
||
|
@Override
|
||
|
public Boolean stopSync(String itemId) {
|
||
|
return false;
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#initRepository()
|
||
|
*/
|
||
|
@Override
|
||
|
public Boolean initRepository() {
|
||
|
return false;
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.WorkspaceSynchronizedRepository#shutDownRepository()
|
||
|
*/
|
||
|
@Override
|
||
|
public Boolean shutDownRepository() {
|
||
|
try {
|
||
|
engine.shutDown();
|
||
|
return true;
|
||
|
}catch (Exception e) {
|
||
|
return null;
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/* (non-Javadoc)
|
||
|
* @see org.gcube.portal.wssynclibrary.thredds.WorkspaceThreddsSynchronizedRepository#decribeSyncRepository()
|
||
|
*/
|
||
|
@Override
|
||
|
public String decribeSyncRepository() {
|
||
|
return "Sync repository for Thredds";
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Gets the synched status from item property.
|
||
|
*
|
||
|
* @param folderId the folder id
|
||
|
* @param username the username
|
||
|
* @return the synched status from item property
|
||
|
* @throws ItemNotSynched the item not synched
|
||
|
* @throws Exception the exception
|
||
|
*/
|
||
|
public Sync_Status getSynchedStatusFromItemProperty(String folderId, String username) throws ItemNotSynched, Exception{
|
||
|
|
||
|
if(folderId==null)
|
||
|
throw new Exception("Invalid pareter itemId is null");
|
||
|
|
||
|
String scope = ScopeProvider.instance.get();
|
||
|
if(scope == null || scope.isEmpty())
|
||
|
throw new Exception("You must set a valid scope into ScopeProvider instance");
|
||
|
|
||
|
WorkspaceFolder folder = null;
|
||
|
|
||
|
try {
|
||
|
|
||
|
logger.debug("Scope provider instancied at: "+scope);
|
||
|
Workspace workspace = HomeLibrary.getUserWorkspace(username);
|
||
|
folder = (WorkspaceFolder) workspace.getItem(folderId);
|
||
|
|
||
|
}catch (Exception e) {
|
||
|
logger.error("Error HL side: "+folderId, e);
|
||
|
throw new Exception("Sorry an error occurred getting HL item for id: "+folderId);
|
||
|
}
|
||
|
|
||
|
if(folder==null)
|
||
|
throw new Exception("Sorry an error occurred getting HL item for id: "+folderId);
|
||
|
|
||
|
Properties properties = folder.getProperties();
|
||
|
|
||
|
if(properties==null || properties.getProperties()==null || properties.getProperties().size()==0) {
|
||
|
throw new ItemNotSynched("No properties to read");
|
||
|
}
|
||
|
|
||
|
String wsSyncStatus = properties.getProperties().get(WS_SYNCH_SYNCH_STATUS);
|
||
|
logger.info("Folder id: "+folder.getId()+" has current: "+WS_SYNCH_SYNCH_STATUS +" value at: "+wsSyncStatus);
|
||
|
try {
|
||
|
return Sync_Status.valueOf(wsSyncStatus);
|
||
|
}catch (Exception e) {
|
||
|
logger.warn(wsSyncStatus + "is not value of "+Sync_Status.values());
|
||
|
return null;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Register callback for id.
|
||
|
*
|
||
|
* @param itemId the item id
|
||
|
* @throws Exception the exception
|
||
|
*/
|
||
|
@Override
|
||
|
public void registerCallbackForId(String itemId) throws Exception{
|
||
|
|
||
|
try {
|
||
|
|
||
|
SyncOperationCallBack callback = new SyncOperationCallBack() {
|
||
|
|
||
|
@Override
|
||
|
public void onStep(ProcessStatus status, ProcessDescriptor descriptor) {
|
||
|
logger.debug("ON STEP : "+status+" "+descriptor);
|
||
|
logger.debug("LOG : \n"+ status.getLogBuilder().toString());
|
||
|
if(status.getStatus().equals(ProcessStatus.Status.COMPLETED)) {
|
||
|
//mapCallBack.remove(itemId);
|
||
|
}
|
||
|
updateMapCallback(itemId, status, descriptor);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
// REGISTER CALLBACK TO MONITOR PROGRESS
|
||
|
logger.debug("Registering callback on itemId: "+itemId);
|
||
|
engine.registerCallBack(itemId, callback);
|
||
|
}catch (ProcessNotFoundException e) {
|
||
|
logger.error("Register callback exception: ",e);
|
||
|
throw new Exception("No Process found for item id: "+itemId+ ", refresh an try again");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Update map callback.
|
||
|
*
|
||
|
* @param itemId the item id
|
||
|
* @param status the status
|
||
|
* @param descriptor the descriptor
|
||
|
*/
|
||
|
private void updateMapCallback(String itemId, ProcessStatus status, ProcessDescriptor descriptor) {
|
||
|
ThProcessDescriptor thDesc = ThreddsConverter.toThProcessDescriptor.apply(descriptor);
|
||
|
ThProcessStatus thStatus = ThreddsConverter.toThProcessStatus.apply(status);
|
||
|
mapCallBack.put(itemId, new ThSyncStatus(thDesc, thStatus));
|
||
|
logger.debug("Update map for "+itemId +" with new "+thStatus);
|
||
|
}
|
||
|
}
|