ws-thredds/src/main/java/org/gcube/usecases/ws/thredds/engine/impl/WorkspaceUtils.java

423 lines
17 KiB
Java

package org.gcube.usecases.ws.thredds.engine.impl;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.gcube.common.storagehub.client.dsl.ContainerType;
import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.gcube.common.storagehub.client.dsl.ItemContainer;
import org.gcube.common.storagehub.client.dsl.StorageHubClient;
import org.gcube.common.storagehub.model.Metadata;
import org.gcube.common.storagehub.model.exceptions.StorageHubException;
import org.gcube.common.storagehub.model.items.FolderItem;
import org.gcube.common.storagehub.model.items.Item;
import org.gcube.common.storagehub.model.items.nodes.accounting.AccountEntry;
import org.gcube.common.storagehub.model.items.nodes.accounting.AccountFolderEntryRemoval;
import org.gcube.common.storagehub.model.items.nodes.accounting.AccountFolderEntryRenaming;
import org.gcube.common.storagehub.model.types.WorkspaceItemType;
import org.gcube.data.transfer.model.RemoteFileDescriptor;
import org.gcube.usecases.ws.thredds.Constants;
import org.gcube.usecases.ws.thredds.engine.impl.threads.DeleteRemoteRequest;
import org.gcube.usecases.ws.thredds.engine.impl.threads.SynchronizationThread;
import org.gcube.usecases.ws.thredds.engine.impl.threads.TransferFromThreddsRequest;
import org.gcube.usecases.ws.thredds.faults.InternalException;
import org.gcube.usecases.ws.thredds.faults.ItemNotFoundException;
import org.gcube.usecases.ws.thredds.faults.RemoteFileNotFoundException;
import org.gcube.usecases.ws.thredds.model.StepReport;
import org.gcube.usecases.ws.thredds.model.SynchFolderConfiguration;
import org.gcube.usecases.ws.thredds.model.SynchronizedElementInfo.SynchronizationStatus;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WorkspaceUtils {
public static StorageHubClient getClient() throws StorageHubException{
return new StorageHubClient();
}
/**
* Checks current accounting info in order to infer synchronization status.
* OwnerProcess and service parameters can be null for check purposes.
*
* @param folderPath
* @param toScanFolder
* @param config
* @param localChildrenNames
* @param remoteChildrenNames
* @param folderController
*
* @param ownerProcess
* @param service
* @return set of Item names that have been found to be synchronized
* @throws InternalErrorException
*/
static Set<String> scanAccountingForStatus(
FolderContainer toScanFolder,
SynchFolderConfiguration config,
Set<String> localChildrenNames,
Set<String> remoteChildrenNames,
ThreddsController folderController,
Process ownerProcess,
ExecutorService service) throws StorageHubException{
Set<String> handledAccountingEntries=new HashSet<>();
FolderItem toScanFolderItem=toScanFolder.get();
log.debug("Checking history of {} ",toScanFolderItem.getPath());
String relativePath=toScanFolderItem.getMetadata().getMap().get(Constants.WorkspaceProperties.REMOTE_PATH)+"";
Date folderLastUpdateTime=null;
try{
folderLastUpdateTime=WorkspaceUtils.safelyGetLastUpdate(toScanFolderItem);
}catch(Throwable t) {
log.warn("Unable to get folder {} last update time. Assuming first run.. ",toScanFolderItem.getName(),t);
folderLastUpdateTime=new Date(0l);
}
// scanning for deletions
log.debug("Checking Accounting for {}. Last update time is {} ",toScanFolderItem.getName(),Constants.DATE_FORMAT.format(folderLastUpdateTime));
for(AccountEntry entry:toScanFolderItem.getAccounting().getEntries()) {
try {
Date eventTime=entry.getDate().getTime();
if(folderLastUpdateTime==null|| eventTime.after(folderLastUpdateTime)) { // SKIP IF ENTRY OLDER THAN LAST UPDATE TIME
String toDeleteRemote=null;
switch(entry.getType()) {
case CUT:
case REMOVAL:{
AccountFolderEntryRemoval removalEntry=(AccountFolderEntryRemoval) entry;
if(removalEntry.getItemType().equals(WorkspaceItemType.FOLDER.toString())||
config.matchesFilter(removalEntry.getItemName()))
toDeleteRemote=removalEntry.getItemName();
break;
}
case RENAMING:{
AccountFolderEntryRenaming renamingEntry=(AccountFolderEntryRenaming) entry;
ItemContainer<?> newItem=toScanFolder.findByName(renamingEntry.getNewItemName()).getContainers().get(0);
if(newItem.getType().equals(ContainerType.FOLDER)||config.matchesFilter(renamingEntry.getOldItemName()))
toDeleteRemote=renamingEntry.getOldItemName();
break;
}
default : {
log.debug("Skpping accounting entry {} ",entry.getType());
}
}
if(toDeleteRemote!=null){
// SKIP IF LOCAL EXISTS
if(localChildrenNames.contains(toDeleteRemote))
log.debug("Skipping accounting entry for existing local item {} ",toDeleteRemote);
else if(remoteChildrenNames.contains(toDeleteRemote)) {
log.debug("Checking age of remote {} ",toDeleteRemote);
// IF REMOTE OLDER THAN ENTRY -> DELETE REQUEST
// IF REMOTE NEWER -> IMPORT REQUEST
RemoteFileDescriptor remote=folderController.getFileDescriptor(relativePath+"/"+toDeleteRemote);
Date remoteDate=new Date(remote.getLastUpdate());
log.debug("Last remote update : {} . Event date {} ",Constants.DATE_FORMAT.format(remoteDate),Constants.DATE_FORMAT.format(eventTime));
if(service!=null) {
log.debug("Service is not null. Submitting request ... ");
if(eventTime.after(remoteDate)) {
service.execute(new SynchronizationThread(new DeleteRemoteRequest(ownerProcess, toScanFolderItem,toDeleteRemote)));
handledAccountingEntries.add(toDeleteRemote);
log.debug("Submitted DELETION request number {} ",ownerProcess.getStatus().getQueuedTransfers().incrementAndGet());
}
// }else {
// service.execute(new SynchronizationThread(new TransferFromThreddsRequest(ownerProcess, null, toScanFolder, toDeleteRemote)));
// log.debug("Submitted UPDATE-LOCAL request number {} ",ownerProcess.getStatus().getQueuedTransfers().incrementAndGet());
// }
}
}else log.debug("To delete remote {} not found. skipping it.. ",toDeleteRemote);
// SKIP IF REMOTE NOT FOUND
}
}
}catch(Throwable t) {
log.error("Unable to submit deletion request for {} ",entry,t);
}
}
return handledAccountingEntries;
}
/**
* Scans remote Folder in order to gather elements to be synchronized.
* OwnerProcess and Service can be null for check purposes.
*
*
* @param folderPath
* @param folderDesc
* @param handledAccountingEntries
* @param handledWorkspaceItemEntries
* @param toScanFolder
* @param folderController
* @param config
* @param ownerProcess
* @param service
* @return
* @throws InternalException
* @throws InternalErrorException
*/
static Set<String> scanRemoteFolder(
RemoteFileDescriptor folderDesc,
Set<String> handledAccountingEntries,
Set<String> handledWorkspaceItemEntries,
FolderContainer toScanFolder,
ThreddsController folderController,
SynchFolderConfiguration config,
Process ownerProcess,
ExecutorService service) throws InternalException, StorageHubException{
FolderItem toScanItem=toScanFolder.get();
log.debug("Checking remote content for {}. Remote Absolute Path is {} ",toScanItem.getPath(),folderDesc.getAbsolutePath());
Set<String> handledRemoteElements=new HashSet<String>();
// String relativePath=toScanFolder.getProperties().getPropertyValue(Constants.WorkspaceProperties.REMOTE_PATH);
if(!folderDesc.isDirectory()) throw new InternalException("Remote Descriptor "+folderDesc.getAbsolutePath()+" Is not a directory. ");
for(String child:folderDesc.getChildren()) {
// skip if already handled with accounting
if(handledAccountingEntries.contains(child))
log.debug("Skipping remote child {} because already handled with accouting", child);
// skip if already handled with local items
else if(handledWorkspaceItemEntries.contains(child))
log.debug("Skipping remote child {} because already handled with respective item",child);
else {
RemoteFileDescriptor childDesc=folderController.getFileDescriptor(child);
if(childDesc.isDirectory()) {
handledRemoteElements.add(child);
}else if (config.matchesFilter(child)){
log.debug("Child {} matches filter...");
handledRemoteElements.add(child);
if(service!=null) {
service.execute(new SynchronizationThread(new TransferFromThreddsRequest(ownerProcess, null, toScanItem, child)));
log.debug("Submitted IMPORT request number {} ",ownerProcess.getStatus().getQueuedTransfers().incrementAndGet());
}
// import if matching
}else log.debug("Skipping not matching remote {} ",child);
// skip if doesn't match filter or isn't folder
}
}
return handledRemoteElements;
}
static void initProperties(ItemContainer<?> toInit, String remotePath, String filter, String targetToken,
String catalogName,Boolean validateMeta, String rootFolderId) throws StorageHubException {
Metadata meta=toInit.get().getMetadata();
Map<String,Object> toSetProperties=meta.getMap();
initIfMissing(toSetProperties,Constants.WorkspaceProperties.TBS,"true");
initIfMissing(toSetProperties,Constants.WorkspaceProperties.LAST_UPDATE_TIME,0l+"");
initIfMissing(toSetProperties,Constants.WorkspaceProperties.LAST_UPDATE_STATUS,StepReport.Status.OK+"");
initIfMissing(toSetProperties,Constants.WorkspaceProperties.SYNCHRONIZATION_STATUS,SynchronizationStatus.UP_TO_DATE+"");
if(toInit.getType().equals(ContainerType.FOLDER)) {
initIfMissing(toSetProperties,Constants.WorkspaceProperties.SYNCH_FILTER,filter);
initIfMissing(toSetProperties,Constants.WorkspaceProperties.REMOTE_PATH,remotePath);
initIfMissing(toSetProperties,Constants.WorkspaceProperties.REMOTE_PERSISTENCE,Constants.THREDDS_PERSISTENCE);
initIfMissing(toSetProperties,Constants.WorkspaceProperties.TARGET_TOKEN,targetToken);
initIfMissing(toSetProperties,Constants.WorkspaceProperties.RELATED_CATALOG,catalogName);
initIfMissing(toSetProperties,Constants.WorkspaceProperties.VALIDATE_METADATA,validateMeta+"");
initIfMissing(toSetProperties,Constants.WorkspaceProperties.ROOT_FOLDER_ID,rootFolderId);
}else {
initIfMissing(toSetProperties,Constants.WorkspaceProperties.METADATA_UUID,null);
}
meta.setMap(toSetProperties);
toInit.setMetadata(meta);
}
private static void initIfMissing(Map<String,Object> current,String key,String defaultValue) {
if(!current.containsKey(key)||
current.get(key)==null||
current.get(key).equals("null")) current.put(key, defaultValue);
}
static boolean isConfigured(Item toCheck) throws StorageHubException {
return isConfigured(toCheck.getMetadata().getMap());
}
static boolean isConfigured(Map<String,Object> toCheckProperties) {
return (toCheckProperties.containsKey(Constants.WorkspaceProperties.TBS)&&toCheckProperties.get(Constants.WorkspaceProperties.TBS)!=null);
}
static SynchronizationStatus getStatusAgainstRemote(Item item, Set<String> existingRemote, ThreddsController remoteFolderController,Date lastUpdateRoutine) throws NumberFormatException,RemoteFileNotFoundException {
String itemName=item.getName();
SynchronizationStatus status=SynchronizationStatus.OUTDATED_REMOTE;
if(existingRemote.contains(itemName)) {
RemoteFileDescriptor desc=remoteFolderController.getFileDescriptor(itemName);
Date remoteDate=new Date(desc.getLastUpdate());
Date localDate=item.getLastModificationTime().getTime();
Date lastUpdate=safelyGetLastUpdate(item);
if(localDate.equals(lastUpdate)) {
//LAST MODIFCATION WAS FROM SYNCHRONIZATION
if(remoteDate.after(lastUpdate)) status=SynchronizationStatus.OUTDATED_WS;
else status=SynchronizationStatus.UP_TO_DATE;
}else
if(remoteDate.before(localDate)) { // REMOTE OLDER THAN LOCAL
if(isModifiedAfter(item,lastUpdateRoutine)) status=SynchronizationStatus.OUTDATED_REMOTE; // IT's been locally modified from last routine
else status=SynchronizationStatus.UP_TO_DATE;
}
else if(remoteDate.after(localDate)) { // REMOTE NEWER &..
if (remoteDate.equals(lastUpdate))status =SynchronizationStatus.UP_TO_DATE; // REMOTE DATE == LAST UPDATE ROUTINE -> UP TO DATE
else if (remoteDate.before(lastUpdate))status =SynchronizationStatus.OUTDATED_REMOTE; // REMOTE DATE < LAST UPDATE -> transfer to thredds, last update was faulty
else status=SynchronizationStatus.OUTDATED_WS; // REMOTE DATE != LAST UPDATE -> import from thredds
}
}
return status;
}
static Date safelyGetLastUpdate(Item item){
try {
return new Date(Long.parseLong(item.getMetadata().getMap().get(Constants.WorkspaceProperties.LAST_UPDATE_TIME)+""));
}catch(NumberFormatException e) {
log.debug("Unable to get last update time for {} ",item.getName(),e);
return new Date(0l);
}
}
public static boolean isModifiedAfter(Item item,Date fromDate) {
return item.getLastModificationTime().after(fromDate);
// for(AccountingEntry entry:item.getAccounting()) {
// if(entry.getDate().getTime().after(fromDate)) {
// switch(entry.getEntryType()) {
// case PASTE:
// case CREATE:
// case RESTORE:
// case UPDATE:
// case ADD: return true;
// }
// }
// }
// return false;
}
static void cleanItem(ItemContainer<?> itemContainer) throws StorageHubException{
Metadata meta=itemContainer.get().getMetadata();
Map<String,Object> map=meta.getMap();
if(map.containsKey(Constants.WorkspaceProperties.TBS)) {
if(itemContainer.getType().equals(ContainerType.FOLDER)) {
map=Constants.cleanedFolderPropertiesMap;
for(ItemContainer<?> child : ((FolderContainer)itemContainer).list().withMetadata().getContainers())
cleanItem(child);
}else map=Constants.cleanedItemPropertiesMap;
//Actually posting cleaned map
meta.setMap(map);
log.debug("Setting meta to item "+itemContainer.getId()+" : "+itemContainer.get().getPath());
itemContainer.setMetadata(meta);
}
}
static void setLastUpdateTime(FolderContainer folder,long toSetTime) throws StorageHubException {
Metadata meta=folder.get().getMetadata();
Map<String,Object> map=meta.getMap();
StepReport.Status currentWSStatus=StepReport.Status.valueOf(map.get(Constants.WorkspaceProperties.LAST_UPDATE_STATUS)+"");
if(currentWSStatus.equals(StepReport.Status.OK))
map.put(Constants.WorkspaceProperties.LAST_UPDATE_TIME, toSetTime+"");
//Actually posting cleaned map
meta.setMap(map);
folder.setMetadata(meta);
// for(ItemContainer<?> child : folder.list().withMetadata().getContainers())
// if(child.getType().equals(ContainerType.FOLDER))
// setLastUpdateTime((FolderContainer) child, toSetTime);
}
public static SynchFolderConfiguration loadConfiguration(ItemContainer<?> item) throws StorageHubException {
if(item.getType().equals(ContainerType.FOLDER)) {
Map<String,Object> map=item.get().getMetadata().getMap();
SynchFolderConfiguration config=new SynchFolderConfiguration();
config.setFilter(""+map.get(Constants.WorkspaceProperties.SYNCH_FILTER));
config.setRemotePath(""+map.get(Constants.WorkspaceProperties.REMOTE_PATH));
config.setRemotePersistence(""+map.get(Constants.WorkspaceProperties.REMOTE_PERSISTENCE));
config.setTargetToken(""+map.get(Constants.WorkspaceProperties.TARGET_TOKEN));
config.setToCreateCatalogName(""+map.get(Constants.WorkspaceProperties.RELATED_CATALOG));
config.setValidateMetadata(Boolean.parseBoolean(""+map.get(Constants.WorkspaceProperties.VALIDATE_METADATA)));
config.setRootFolderId(""+map.get(Constants.WorkspaceProperties.ROOT_FOLDER_ID));
return config;
}else {
FolderContainer parentFolder=getClient().open(item.get().getParentId()).asFolder();
return loadConfiguration(parentFolder);
}
}
static void resetStatus(ItemContainer<?> item) throws StorageHubException {
if(item.getType().equals(ContainerType.FOLDER)) {
for(ItemContainer<?> child : ((FolderContainer)item).list().withMetadata().getContainers())
resetStatus(child);
}
Metadata meta=item.get().getMetadata();
Map<String,Object> map=meta.getMap();
if(map.containsKey(Constants.WorkspaceProperties.LAST_UPDATE_STATUS)) {
map.put(Constants.WorkspaceProperties.LAST_UPDATE_STATUS, StepReport.Status.OK+"");
//Actually posting cleaned map
meta.setMap(map);
item.setMetadata(meta);
}
}
public static void addParameters(ItemContainer<?> item, Map<String,Object> toAdd) throws StorageHubException {
Metadata meta=item.get().getMetadata();
Map<String,Object> current=meta.getMap();
current.putAll(toAdd);
meta.setMap(current);
item.setMetadata(meta);
}
public static void addParameter(ItemContainer<?> item, String key, Object value) throws StorageHubException {
Metadata meta=item.get().getMetadata();
Map<String,Object> current=meta.getMap();
current.put(key,value);
meta.setMap(current);
item.setMetadata(meta);
}
public static ItemContainer<?> scan(FolderContainer folder,String path) throws ItemNotFoundException, StorageHubException{
String toLookFor=path.substring((path.startsWith("/")?1:0), path.length());
String[] split=toLookFor.split("/");
toLookFor=split[0];
for(ItemContainer<?> item:folder.list().withMetadata().getContainers())
if(item.get().getName().equals(toLookFor)) {
if(split.length>1) return scan((FolderContainer)item,path.substring(toLookFor.length()));
else return item;
}
throw new ItemNotFoundException("Unable to find "+path);
}
}