From 09bf01b062de477d1bc6ba2f6e0257537bb5bb7d Mon Sep 17 00:00:00 2001 From: Francesco Mangiacrapa Date: Wed, 16 May 2018 14:56:23 +0000 Subject: [PATCH] improving the monitor messages git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/Common/workspace-task-executor-library@167536 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../dataminer/DataMinerAccessPoint.java | 123 ++++++++++++------ .../WorkspaceDataMinerTaskExecutor.java | 2 +- .../shared/dataminer/TaskExecutionStatus.java | 2 +- 3 files changed, 88 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java index 8a59a59..e2855f6 100644 --- a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java +++ b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java @@ -41,6 +41,15 @@ import org.slf4j.LoggerFactory; */ public class DataMinerAccessPoint { + /** + * + */ + private static final String NEW_STATUS_CHARS = "*******"; + /** + * + */ + private static final String NEW_LINE = "\n"; + private static final String DASH = "-"; private DataMinerService dataMinerService; private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class); @@ -56,26 +65,25 @@ public class DataMinerAccessPoint { dataMinerService = new DataMinerService(); } - /** - * Removes the task. + * Removes the task from memory. * * @param algConfiguration the alg configuration * @return the task execution status */ - private TaskExecutionStatus removeTask(BaseTaskConfiguration algConfiguration){ + private TaskExecutionStatus removeTaskFromMemory(BaseTaskConfiguration algConfiguration){ return mapExecutionTask.remove(algConfiguration.getConfigurationKey()); } /** - * Save task. + * Save task in memory. * * @param algConfiguration the alg configuration * @param taskStatus the task status */ - private void saveTask(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){ + private void saveTaskInMemory(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){ mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus); } @@ -152,7 +160,7 @@ public class DataMinerAccessPoint { logger.debug("Start Computation"); ComputationId computationId = sClient.startComputation(operator); logger.debug("Started ComputationId: " + computationId); - return getTaskStatus(taskConfiguration, DMConverter.toDMComputationId(computationId)); + return loadTaskExecutionStatus(taskConfiguration, DMConverter.toDMComputationId(computationId)); } catch (Exception e) { @@ -162,25 +170,53 @@ public class DataMinerAccessPoint { } } - /** - * Gets the task status. To monitor the task exection.. + * Monitor status. * - * @param taskConfiguration the alg configuration + * @param taskConfiguration the task configuration * @param taskComputation the task computation - * @return the task status + * @return the task execution status * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ - public TaskExecutionStatus getTaskStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ - - //TODO ??? - + public TaskExecutionStatus monitorStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration); if(theTaskStatus==null) throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey()); + TaskExecutionStatus tes = loadTaskExecutionStatus(taskConfiguration, taskComputation); + + switch (tes.getStatus()) { + case COMPLETED: + case CANCELLED: + case FAILED: + logger.info("Removing "+tes+ "from memory"); + removeTaskFromMemory(tes.getTaskConfiguration()); + break; + default: + break; + } + + return tes; + } + + + /** + * Load task execution status. + * + * @param taskConfiguration the task configuration + * @param taskComputation the task computation + * @return the task execution status + * @throws TaskErrorException the task error exception + * @throws TaskNotExecutableException the task not executable exception + */ + private TaskExecutionStatus loadTaskExecutionStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ + + TaskExecutionStatus oldTaskExecutionStatus = getRunningTask(taskConfiguration); + boolean isStart = false; + if(oldTaskExecutionStatus==null) + isStart = true; SClient sClient; ComputationId computationId = DMConverter.toComputationId(taskComputation); @@ -193,7 +229,16 @@ public class DataMinerAccessPoint { throw new TaskErrorException("Error on getting DataMiner client, Try later"); } - TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(taskConfiguration, taskComputation); + TaskExecutionStatus newTaskExecutionStatus = new TaskExecutionStatus(taskConfiguration, taskComputation); + if(!isStart) + newTaskExecutionStatus.addLog(oldTaskExecutionStatus.getLog()); + else{ + newTaskExecutionStatus.addLog(NEW_STATUS_CHARS+"Configurations are: "); + newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"Operator Id: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorId()); + newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"Operator Name: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorName()); + newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"EquivalentRequest: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getEquivalentRequest()); + newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"URL ID: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getUrlId()); + } logger.debug("Requesting operation progress"); ComputationStatus computationStatus = null; @@ -208,56 +253,60 @@ public class DataMinerAccessPoint { logger.debug("ComputationStatus: " + computationStatus); if (computationStatus == null) { logger.error("ComputationStatus is null"); - return taskExecutiontatus; + return newTaskExecutionStatus; } Status status = computationStatus.getStatus(); if (status == null) { logger.error("Status is null"); - return taskExecutiontatus; + return newTaskExecutionStatus; } - taskExecutiontatus.setCurrentMessage(computationStatus.getMessage()); - + newTaskExecutionStatus.setCurrentMessage(computationStatus.getMessage()); + newTaskExecutionStatus.setPercentCompleted((float) computationStatus.getPercentage()); + String msg=NEW_STATUS_CHARS; switch (status) { case ACCEPTED: logger.debug("Operation "+TaskStatus.ACCEPTED); - taskExecutiontatus.setStatus(TaskStatus.ACCEPTED); - taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage()); - saveTask(taskConfiguration, taskExecutiontatus); + newTaskExecutionStatus.setStatus(TaskStatus.ACCEPTED); + msg+= "Status "+TaskStatus.ACCEPTED; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; case CANCELLED: logger.debug("Operation "+TaskStatus.CANCELLED); - taskExecutiontatus.setStatus(TaskStatus.CANCELLED); - taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage()); - saveTask(taskConfiguration, taskExecutiontatus); + newTaskExecutionStatus.setStatus(TaskStatus.CANCELLED); + msg+= "Status "+TaskStatus.CANCELLED; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; case COMPLETE: logger.debug("Operation "+TaskStatus.COMPLETED); - taskExecutiontatus.setStatus(TaskStatus.COMPLETED); - taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage()); - saveTask(taskConfiguration, taskExecutiontatus); + newTaskExecutionStatus.setStatus(TaskStatus.COMPLETED); + msg+= "Status "+TaskStatus.COMPLETED; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; case FAILED: logger.debug("Operation "+TaskStatus.FAILED); - taskExecutiontatus.setStatus(TaskStatus.FAILED); - taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage()); - taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage()); - saveTask(taskConfiguration, taskExecutiontatus); + newTaskExecutionStatus.setStatus(TaskStatus.FAILED); + msg+= "Status "+TaskStatus.FAILED; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; case RUNNING: logger.debug("Operation "+TaskStatus.ONGOING); - taskExecutiontatus.setStatus(TaskStatus.ONGOING); - taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage()); - taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage()); - saveTask(taskConfiguration, taskExecutiontatus); + newTaskExecutionStatus.setStatus(TaskStatus.ONGOING); + msg+= "Status "+TaskStatus.ONGOING; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; default: + newTaskExecutionStatus.setStatus(TaskStatus.INITIALIZING); + msg+= "Status "+TaskStatus.INITIALIZING; + msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; } - return taskExecutiontatus; + newTaskExecutionStatus.addLog(NEW_LINE+msg); + saveTaskInMemory(taskConfiguration, newTaskExecutionStatus); + return newTaskExecutionStatus; } diff --git a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java index b13c1d7..bf755d5 100644 --- a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java +++ b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java @@ -390,7 +390,7 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask