Enhancement on Project Activity #11690
git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/Common/workspace-task-executor-library@167305 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
0e55e61c24
commit
4b4e6e0e12
|
@ -10,21 +10,21 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.Status;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.TaskStatus;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException;
|
||||
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
|
||||
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
|
||||
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
|
||||
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.FileParameter;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus.Status;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -118,7 +118,7 @@ public class DataMinerAccessPoint {
|
|||
Operator operator;
|
||||
SClient sClient;
|
||||
try {
|
||||
sClient = dataMinerService.getClient();
|
||||
sClient = dataMinerService.getClient(algConfiguration.getToken());
|
||||
operator = sClient.getOperatorById(algConfiguration.getTaskId());
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -180,67 +180,70 @@ public class DataMinerAccessPoint {
|
|||
*/
|
||||
private TaskExecutionStatus startComputationOnDataMiner(BaseTaskConfiguration algConfiguration, final ComputationId computationId, final SClient sClient){
|
||||
|
||||
TaskExecutionStatus status = new TaskExecutionStatus(algConfiguration, computationId);
|
||||
TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, computationId);
|
||||
|
||||
DMMonitorListener listener = new DMMonitorListener() {
|
||||
logger.debug("Requesting operation progress");
|
||||
ComputationStatus computationStatus = null;
|
||||
try {
|
||||
computationStatus = sClient.getComputationStatus(computationId);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getLocalizedMessage());
|
||||
e.printStackTrace();
|
||||
|
||||
@Override
|
||||
public void running(double percentage) {
|
||||
logger.debug("Operation Running: " + percentage);
|
||||
status.setStatus(Status.ONGOING);
|
||||
status.setPercentCompleted((float) percentage);
|
||||
status.setCurrentMessage("Operation Running...");
|
||||
saveTask(algConfiguration, status);
|
||||
}
|
||||
logger.debug("ComputationStatus: " + computationStatus);
|
||||
if (computationStatus == null) {
|
||||
logger.error("ComputationStatus is null");
|
||||
return taskExecutiontatus;
|
||||
}
|
||||
|
||||
}
|
||||
Status status = computationStatus.getStatus();
|
||||
if (status == null) {
|
||||
logger.error("Status is null");
|
||||
return taskExecutiontatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(String message, Exception exception) {
|
||||
logger.error("Operation Failed");
|
||||
logger.error(message, exception);
|
||||
status.setStatus(Status.FAILED);
|
||||
status.setCurrentMessage(message);
|
||||
status.addLog(exception.getMessage());
|
||||
saveTask(algConfiguration, status);
|
||||
taskExecutiontatus.setCurrentMessage(computationStatus.getMessage());
|
||||
|
||||
}
|
||||
switch (status) {
|
||||
case ACCEPTED:
|
||||
logger.debug("Operation "+TaskStatus.ACCEPTED);
|
||||
taskExecutiontatus.setStatus(TaskStatus.ACCEPTED);
|
||||
taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage());
|
||||
saveTask(algConfiguration, taskExecutiontatus);
|
||||
break;
|
||||
case CANCELLED:
|
||||
logger.debug("Operation "+TaskStatus.CANCELLED);
|
||||
taskExecutiontatus.setStatus(TaskStatus.CANCELLED);
|
||||
taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage());
|
||||
saveTask(algConfiguration, taskExecutiontatus);
|
||||
break;
|
||||
case COMPLETE:
|
||||
logger.debug("Operation "+TaskStatus.COMPLETED);
|
||||
taskExecutiontatus.setStatus(TaskStatus.COMPLETED);
|
||||
taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage());
|
||||
saveTask(algConfiguration, taskExecutiontatus);
|
||||
break;
|
||||
case FAILED:
|
||||
logger.debug("Operation "+TaskStatus.FAILED);
|
||||
taskExecutiontatus.setStatus(TaskStatus.FAILED);
|
||||
taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage());
|
||||
taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage());
|
||||
saveTask(algConfiguration, taskExecutiontatus);
|
||||
break;
|
||||
case RUNNING:
|
||||
logger.debug("Operation "+TaskStatus.ONGOING);
|
||||
taskExecutiontatus.setStatus(TaskStatus.ONGOING);
|
||||
taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage());
|
||||
taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage());
|
||||
saveTask(algConfiguration, taskExecutiontatus);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
@Override
|
||||
public void complete(double percentage) {
|
||||
logger.debug("Operation Completed");
|
||||
logger.debug("Perc: " + percentage);
|
||||
String log = retrieveOutput(computationId, sClient);
|
||||
status.setStatus(Status.COMPLETED);
|
||||
status.setCurrentMessage("Operation Completed");
|
||||
status.addLog(log);
|
||||
saveTask(algConfiguration, status);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
logger.debug("Operation Cancelled");
|
||||
status.setStatus(Status.COMPLETED);
|
||||
status.setCurrentMessage("Operation Cancelled");
|
||||
status.addLog("Operation Cancelled");
|
||||
saveTask(algConfiguration, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accepted() {
|
||||
logger.debug("Operation Accepted");
|
||||
status.setStatus(Status.ACCEPTED);
|
||||
status.setCurrentMessage("Operation Accepted");
|
||||
status.addLog("Operation Accepted");
|
||||
saveTask(algConfiguration, status);
|
||||
}
|
||||
};
|
||||
|
||||
DMMonitor dmMonitor = new DMMonitor(computationId, sClient);
|
||||
dmMonitor.add(listener);
|
||||
dmMonitor.start();
|
||||
|
||||
return status;
|
||||
return taskExecutiontatus;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -31,10 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
*/
|
||||
public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfiguration, TaskExecutionStatus>, ExecutableItem<TaskConfiguration>{
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public static final String FIELD_CONFIGURATION_KEY = "configurationKey";
|
||||
|
||||
|
||||
/** The logger. */
|
||||
private static Logger logger = LoggerFactory.getLogger(WorkspaceDataMinerTaskExecutor.class);
|
||||
|
@ -141,7 +138,7 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfig
|
|||
}
|
||||
}
|
||||
|
||||
throw new TaskConfigurationNotFoundException("The configuration with "+FIELD_CONFIGURATION_KEY+" "+configurationKey+" does not exist");
|
||||
throw new TaskConfigurationNotFoundException("The configuration with "+TaskConfiguration.FIELD_CONFIGURATION_KEY+" "+configurationKey+" does not exist");
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -280,13 +277,13 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfig
|
|||
}else{
|
||||
//Configuration found
|
||||
found = true;
|
||||
logger.info("The configuration with "+FIELD_CONFIGURATION_KEY +" found, updating it");
|
||||
logger.info("The configuration with "+TaskConfiguration.FIELD_CONFIGURATION_KEY +" found, updating it");
|
||||
newConfigurations.add(taskConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
if(!found){
|
||||
logger.info("The configuration with "+FIELD_CONFIGURATION_KEY +" not found, adding it as new");
|
||||
logger.info("The configuration with "+TaskConfiguration.FIELD_CONFIGURATION_KEY +" not found, adding it as new");
|
||||
newConfigurations.add(taskConfiguration);
|
||||
}
|
||||
try{
|
||||
|
|
|
@ -23,7 +23,7 @@ public interface BaseTaskExecutionStatus {
|
|||
*
|
||||
* @return the status
|
||||
*/
|
||||
public Status getStatus();
|
||||
public TaskStatus getStatus();
|
||||
|
||||
/**
|
||||
* Gets the log.
|
||||
|
|
|
@ -9,7 +9,7 @@ import java.io.Serializable;
|
|||
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
|
||||
* Apr 27, 2018
|
||||
*/
|
||||
public enum Status implements Serializable{
|
||||
public enum TaskStatus implements Serializable{
|
||||
INITIALIZING("INITIALIZING"),
|
||||
ONGOING("ONGOING"),
|
||||
ACCEPTED("ACCEPTED"),
|
||||
|
@ -20,7 +20,7 @@ public enum Status implements Serializable{
|
|||
/**
|
||||
* Instantiates a new status.
|
||||
*/
|
||||
Status(){}
|
||||
TaskStatus(){}
|
||||
|
||||
|
||||
private String label;
|
||||
|
@ -30,7 +30,7 @@ public enum Status implements Serializable{
|
|||
*
|
||||
* @param label the label
|
||||
*/
|
||||
Status(String label){
|
||||
TaskStatus(String label){
|
||||
this.label = label;
|
||||
}
|
||||
|
|
@ -22,6 +22,12 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||
*/
|
||||
public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@JsonIgnoreProperties
|
||||
public static final String FIELD_CONFIGURATION_KEY = "configurationKey";
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.io.Serializable;
|
|||
|
||||
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskExecutionStatus;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.Status;
|
||||
import org.gcube.common.workspacetaskexecutor.shared.TaskStatus;
|
||||
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
|
||||
|
||||
|
||||
|
@ -22,7 +22,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
|
|||
private static final long serialVersionUID = 6733806455721902864L;
|
||||
|
||||
private Long errorCount;
|
||||
private Status status;
|
||||
private TaskStatus status;
|
||||
private Float percentCompleted = new Float(0);
|
||||
private String log;
|
||||
private String currentMessage="Waiting to start..";
|
||||
|
@ -60,7 +60,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
|
|||
* @param taskConfiguration the task configuration
|
||||
*/
|
||||
public TaskExecutionStatus(
|
||||
Long errorCount, Status status, Float percentCompleted, String log,
|
||||
Long errorCount, TaskStatus status, Float percentCompleted, String log,
|
||||
String currentMessage, TaskConfiguration taskConfiguration) {
|
||||
this.errorCount = errorCount;
|
||||
this.status = status;
|
||||
|
@ -117,7 +117,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
|
|||
*
|
||||
* @return the status
|
||||
*/
|
||||
public Status getStatus() {
|
||||
public TaskStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -126,7 +126,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
|
|||
*
|
||||
* @param status the new status
|
||||
*/
|
||||
public void setStatus(Status status) {
|
||||
public void setStatus(TaskStatus status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
|
|
|
@ -99,14 +99,14 @@ public class TestDataMinerTaskExecutor {
|
|||
getConfigurations(exec);
|
||||
|
||||
//ERASE ALL CONFIGURATIONS
|
||||
//eraseAllTaskConfigurations(exec);
|
||||
eraseAllTaskConfigurations(exec);
|
||||
|
||||
//SET TASK CONFIGURATION
|
||||
//setDummyTaskConfigurations(exec);
|
||||
setDummyTaskConfigurations(exec);
|
||||
|
||||
//deleteConfiguration(exec, listDummyConf.get(1));
|
||||
deleteConfiguration(exec, listDummyConf.get(1));
|
||||
|
||||
//getConfigurations(exec);
|
||||
getConfigurations(exec);
|
||||
|
||||
|
||||
listDummyConf.get(2).setTaskId("Updated task id");
|
||||
|
|
Loading…
Reference in New Issue