improving the monitor messages

git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/Common/workspace-task-executor-library@167536 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Francesco Mangiacrapa 2018-05-16 14:56:23 +00:00
parent 4a85e75565
commit 09bf01b062
3 changed files with 88 additions and 39 deletions

View File

@ -41,6 +41,15 @@ import org.slf4j.LoggerFactory;
*/ */
public class DataMinerAccessPoint { public class DataMinerAccessPoint {
/**
*
*/
private static final String NEW_STATUS_CHARS = "*******";
/**
*
*/
private static final String NEW_LINE = "\n";
private static final String DASH = "-";
private DataMinerService dataMinerService; private DataMinerService dataMinerService;
private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class); private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class);
@ -56,26 +65,25 @@ public class DataMinerAccessPoint {
dataMinerService = new DataMinerService(); dataMinerService = new DataMinerService();
} }
/** /**
* Removes the task. * Removes the task from memory.
* *
* @param algConfiguration the alg configuration * @param algConfiguration the alg configuration
* @return the task execution status * @return the task execution status
*/ */
private TaskExecutionStatus removeTask(BaseTaskConfiguration algConfiguration){ private TaskExecutionStatus removeTaskFromMemory(BaseTaskConfiguration algConfiguration){
return mapExecutionTask.remove(algConfiguration.getConfigurationKey()); return mapExecutionTask.remove(algConfiguration.getConfigurationKey());
} }
/** /**
* Save task. * Save task in memory.
* *
* @param algConfiguration the alg configuration * @param algConfiguration the alg configuration
* @param taskStatus the task status * @param taskStatus the task status
*/ */
private void saveTask(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){ private void saveTaskInMemory(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){
mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus); mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus);
} }
@ -152,7 +160,7 @@ public class DataMinerAccessPoint {
logger.debug("Start Computation"); logger.debug("Start Computation");
ComputationId computationId = sClient.startComputation(operator); ComputationId computationId = sClient.startComputation(operator);
logger.debug("Started ComputationId: " + computationId); logger.debug("Started ComputationId: " + computationId);
return getTaskStatus(taskConfiguration, DMConverter.toDMComputationId(computationId)); return loadTaskExecutionStatus(taskConfiguration, DMConverter.toDMComputationId(computationId));
} }
catch (Exception e) { catch (Exception e) {
@ -162,25 +170,53 @@ public class DataMinerAccessPoint {
} }
} }
/** /**
* Gets the task status. To monitor the task exection.. * Monitor status.
* *
* @param taskConfiguration the alg configuration * @param taskConfiguration the task configuration
* @param taskComputation the task computation * @param taskComputation the task computation
* @return the task status * @return the task execution status
* @throws TaskErrorException the task error exception * @throws TaskErrorException the task error exception
* @throws TaskNotExecutableException the task not executable exception * @throws TaskNotExecutableException the task not executable exception
*/ */
public TaskExecutionStatus getTaskStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ public TaskExecutionStatus monitorStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
//TODO ???
TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration); TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration);
if(theTaskStatus==null) if(theTaskStatus==null)
throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey()); throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey());
TaskExecutionStatus tes = loadTaskExecutionStatus(taskConfiguration, taskComputation);
switch (tes.getStatus()) {
case COMPLETED:
case CANCELLED:
case FAILED:
logger.info("Removing "+tes+ "from memory");
removeTaskFromMemory(tes.getTaskConfiguration());
break;
default:
break;
}
return tes;
}
/**
* Load task execution status.
*
* @param taskConfiguration the task configuration
* @param taskComputation the task computation
* @return the task execution status
* @throws TaskErrorException the task error exception
* @throws TaskNotExecutableException the task not executable exception
*/
private TaskExecutionStatus loadTaskExecutionStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
TaskExecutionStatus oldTaskExecutionStatus = getRunningTask(taskConfiguration);
boolean isStart = false;
if(oldTaskExecutionStatus==null)
isStart = true;
SClient sClient; SClient sClient;
ComputationId computationId = DMConverter.toComputationId(taskComputation); ComputationId computationId = DMConverter.toComputationId(taskComputation);
@ -193,7 +229,16 @@ public class DataMinerAccessPoint {
throw new TaskErrorException("Error on getting DataMiner client, Try later"); throw new TaskErrorException("Error on getting DataMiner client, Try later");
} }
TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(taskConfiguration, taskComputation); TaskExecutionStatus newTaskExecutionStatus = new TaskExecutionStatus(taskConfiguration, taskComputation);
if(!isStart)
newTaskExecutionStatus.addLog(oldTaskExecutionStatus.getLog());
else{
newTaskExecutionStatus.addLog(NEW_STATUS_CHARS+"Configurations are: ");
newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"Operator Id: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorId());
newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"Operator Name: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorName());
newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"EquivalentRequest: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getEquivalentRequest());
newTaskExecutionStatus.addLog(NEW_LINE+DASH+DASH+"URL ID: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getUrlId());
}
logger.debug("Requesting operation progress"); logger.debug("Requesting operation progress");
ComputationStatus computationStatus = null; ComputationStatus computationStatus = null;
@ -208,56 +253,60 @@ public class DataMinerAccessPoint {
logger.debug("ComputationStatus: " + computationStatus); logger.debug("ComputationStatus: " + computationStatus);
if (computationStatus == null) { if (computationStatus == null) {
logger.error("ComputationStatus is null"); logger.error("ComputationStatus is null");
return taskExecutiontatus; return newTaskExecutionStatus;
} }
Status status = computationStatus.getStatus(); Status status = computationStatus.getStatus();
if (status == null) { if (status == null) {
logger.error("Status is null"); logger.error("Status is null");
return taskExecutiontatus; return newTaskExecutionStatus;
} }
taskExecutiontatus.setCurrentMessage(computationStatus.getMessage()); newTaskExecutionStatus.setCurrentMessage(computationStatus.getMessage());
newTaskExecutionStatus.setPercentCompleted((float) computationStatus.getPercentage());
String msg=NEW_STATUS_CHARS;
switch (status) { switch (status) {
case ACCEPTED: case ACCEPTED:
logger.debug("Operation "+TaskStatus.ACCEPTED); logger.debug("Operation "+TaskStatus.ACCEPTED);
taskExecutiontatus.setStatus(TaskStatus.ACCEPTED); newTaskExecutionStatus.setStatus(TaskStatus.ACCEPTED);
taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage()); msg+= "Status "+TaskStatus.ACCEPTED;
saveTask(taskConfiguration, taskExecutiontatus); msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
break; break;
case CANCELLED: case CANCELLED:
logger.debug("Operation "+TaskStatus.CANCELLED); logger.debug("Operation "+TaskStatus.CANCELLED);
taskExecutiontatus.setStatus(TaskStatus.CANCELLED); newTaskExecutionStatus.setStatus(TaskStatus.CANCELLED);
taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage()); msg+= "Status "+TaskStatus.CANCELLED;
saveTask(taskConfiguration, taskExecutiontatus); msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
break; break;
case COMPLETE: case COMPLETE:
logger.debug("Operation "+TaskStatus.COMPLETED); logger.debug("Operation "+TaskStatus.COMPLETED);
taskExecutiontatus.setStatus(TaskStatus.COMPLETED); newTaskExecutionStatus.setStatus(TaskStatus.COMPLETED);
taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage()); msg+= "Status "+TaskStatus.COMPLETED;
saveTask(taskConfiguration, taskExecutiontatus); msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
break; break;
case FAILED: case FAILED:
logger.debug("Operation "+TaskStatus.FAILED); logger.debug("Operation "+TaskStatus.FAILED);
taskExecutiontatus.setStatus(TaskStatus.FAILED); newTaskExecutionStatus.setStatus(TaskStatus.FAILED);
taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage()); msg+= "Status "+TaskStatus.FAILED;
taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage()); msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
saveTask(taskConfiguration, taskExecutiontatus);
break; break;
case RUNNING: case RUNNING:
logger.debug("Operation "+TaskStatus.ONGOING); logger.debug("Operation "+TaskStatus.ONGOING);
taskExecutiontatus.setStatus(TaskStatus.ONGOING); newTaskExecutionStatus.setStatus(TaskStatus.ONGOING);
taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage()); msg+= "Status "+TaskStatus.ONGOING;
taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage()); msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
saveTask(taskConfiguration, taskExecutiontatus);
break; break;
default: default:
newTaskExecutionStatus.setStatus(TaskStatus.INITIALIZING);
msg+= "Status "+TaskStatus.INITIALIZING;
msg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
break; break;
} }
return taskExecutiontatus; newTaskExecutionStatus.addLog(NEW_LINE+msg);
saveTaskInMemory(taskConfiguration, newTaskExecutionStatus);
return newTaskExecutionStatus;
} }

View File

@ -390,7 +390,7 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfig
ValidateTaskConfiguration(taskConfiguration); ValidateTaskConfiguration(taskConfiguration);
DataMinerAccessPoint dap = getDataMinerAccessPoint(); DataMinerAccessPoint dap = getDataMinerAccessPoint();
return dap.getTaskStatus(taskConfiguration, taskComputation); return dap.monitorStatus(taskConfiguration, taskComputation);
} }

View File

@ -148,7 +148,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
if(log==null) if(log==null)
this.log = logMessage; this.log = logMessage;
else else
log+=logMessage; log=logMessage+log;
} }
/** /**