Enhancement on Project Activity #11690

git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/Common/workspace-task-executor-library@167306 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Francesco Mangiacrapa 2018-05-03 16:07:31 +00:00
parent 4b4e6e0e12
commit 8bc1186ed0
7 changed files with 359 additions and 37 deletions

View File

@ -11,10 +11,12 @@ import java.util.Map;
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.TaskStatus;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus;
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException;
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException;
import org.gcube.common.workspacetaskexecutor.util.Converter;
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
@ -44,7 +46,7 @@ public class DataMinerAccessPoint {
/** The map call back. */
// Fully synchronized HashMap
private Map<Integer, TaskExecutionStatus> mapExecutionTask = Collections.synchronizedMap(new HashMap<>());
private Map<String, TaskExecutionStatus> mapExecutionTask = Collections.synchronizedMap(new HashMap<>());
/**
@ -54,6 +56,17 @@ public class DataMinerAccessPoint {
dataMinerService = new DataMinerService();
}
/**
* Removes the task.
*
* @param algConfiguration the alg configuration
* @return
*/
private TaskExecutionStatus removeTask(BaseTaskConfiguration algConfiguration){
return mapExecutionTask.remove(algConfiguration.getConfigurationKey());
}
/**
* Save task.
@ -63,7 +76,7 @@ public class DataMinerAccessPoint {
*/
private void saveTask(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){
mapExecutionTask.put(algConfiguration.hashCode(), taskStatus);
mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus);
}
@ -96,7 +109,7 @@ public class DataMinerAccessPoint {
SClient sClient;
try {
sClient = dataMinerService.getClient();
sClient.cancelComputation(task.getComputationId());
sClient.cancelComputation(Converter.toComputationId(task.getComputationId()));
}
catch (Exception e) {
String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId();
@ -137,7 +150,7 @@ public class DataMinerAccessPoint {
logger.debug("Start Computation");
ComputationId computationId = sClient.startComputation(operator);
logger.debug("Started ComputationId: " + computationId);
return startComputationOnDataMiner(algConfiguration, computationId, sClient);
return getTaskStatus(algConfiguration, Converter.toDMComputationId(computationId));
}
catch (Exception e) {
@ -152,35 +165,24 @@ public class DataMinerAccessPoint {
* Gets the task status. To monitor the task exection..
*
* @param algConfiguration the alg configuration
* @param taskComputation the task computation
* @return the task status
* @throws TaskErrorException the task error exception
* @throws TaskNotExecutableException the task not executable exception
*/
public TaskExecutionStatus getTaskStatus(BaseTaskConfiguration algConfiguration) throws TaskErrorException{
public TaskExecutionStatus getTaskStatus(TaskConfiguration algConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
TaskExecutionStatus task = getTask(algConfiguration);
if(task==null){
//TODO
//startComputationOnDM(computationId, sClient);
throw new TaskErrorException("No Task running with configuration: "+algConfiguration);
SClient sClient;
ComputationId computationId = Converter.toComputationId(taskComputation);
try {
sClient = dataMinerService.getClient(algConfiguration.getToken());
}
catch (Exception e) {
logger.error("Error on get DM client", e);
throw new TaskErrorException("Error on getting DataMiner client, Try later");
}
return task;
}
/**
* Start computation on data miner.
*
* @param algConfiguration the alg configuration
* @param computationId the computation id
* @param sClient the s client
* @return the task execution status
*/
private TaskExecutionStatus startComputationOnDataMiner(BaseTaskConfiguration algConfiguration, final ComputationId computationId, final SClient sClient){
TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, computationId);
TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, taskComputation);
logger.debug("Requesting operation progress");
ComputationStatus computationStatus = null;
@ -191,6 +193,7 @@ public class DataMinerAccessPoint {
e.printStackTrace();
}
logger.debug("ComputationStatus: " + computationStatus);
if (computationStatus == null) {
logger.error("ComputationStatus is null");
@ -244,9 +247,87 @@ public class DataMinerAccessPoint {
}
return taskExecutiontatus;
}
// /**
// * Start computation on data miner.
// *
// * @param algConfiguration the alg configuration
// * @param computationId the computation id
// * @param sClient the s client
// * @return the task execution status
// */
// private TaskExecutionStatus startComputationOnDataMiner(BaseTaskConfiguration algConfiguration, final ComputationId computationId, final SClient sClient){
//
// TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, computationId);
//
// logger.debug("Requesting operation progress");
// ComputationStatus computationStatus = null;
// try {
// computationStatus = sClient.getComputationStatus(computationId);
// } catch (Exception e) {
// logger.error(e.getLocalizedMessage());
// e.printStackTrace();
//
// }
// logger.debug("ComputationStatus: " + computationStatus);
// if (computationStatus == null) {
// logger.error("ComputationStatus is null");
// return taskExecutiontatus;
// }
//
// Status status = computationStatus.getStatus();
// if (status == null) {
// logger.error("Status is null");
// return taskExecutiontatus;
// }
//
// taskExecutiontatus.setCurrentMessage(computationStatus.getMessage());
//
// switch (status) {
// case ACCEPTED:
// logger.debug("Operation "+TaskStatus.ACCEPTED);
// taskExecutiontatus.setStatus(TaskStatus.ACCEPTED);
// taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage());
// saveTask(algConfiguration, taskExecutiontatus);
// break;
// case CANCELLED:
// logger.debug("Operation "+TaskStatus.CANCELLED);
// taskExecutiontatus.setStatus(TaskStatus.CANCELLED);
// taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage());
// saveTask(algConfiguration, taskExecutiontatus);
// break;
// case COMPLETE:
// logger.debug("Operation "+TaskStatus.COMPLETED);
// taskExecutiontatus.setStatus(TaskStatus.COMPLETED);
// taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage());
// saveTask(algConfiguration, taskExecutiontatus);
// break;
// case FAILED:
// logger.debug("Operation "+TaskStatus.FAILED);
// taskExecutiontatus.setStatus(TaskStatus.FAILED);
// taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage());
// taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage());
// saveTask(algConfiguration, taskExecutiontatus);
// break;
// case RUNNING:
// logger.debug("Operation "+TaskStatus.ONGOING);
// taskExecutiontatus.setStatus(TaskStatus.ONGOING);
// taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage());
// taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage());
// saveTask(algConfiguration, taskExecutiontatus);
// break;
// default:
// break;
//
// }
//
// return taskExecutiontatus;
// }
/**
* Retrieve output.
*

View File

@ -7,6 +7,7 @@ import org.apache.commons.lang.Validate;
import org.gcube.common.homelibrary.home.workspace.WorkspaceItem;
import org.gcube.common.workspacetaskexecutor.shared.ExecutableItem;
import org.gcube.common.workspacetaskexecutor.shared.ExecutableTask;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus;
import org.gcube.common.workspacetaskexecutor.shared.exception.ItemNotExecutableException;
@ -29,7 +30,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 3, 2018
*/
public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfiguration, TaskExecutionStatus>, ExecutableItem<TaskConfiguration>{
public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfiguration, TaskComputation, TaskExecutionStatus>, ExecutableItem<TaskConfiguration>{
@ -349,12 +350,12 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfig
*/
@Override
public TaskExecutionStatus monitorRunStatus(
TaskConfiguration taskConfiguration)
TaskConfiguration taskConfiguration, TaskComputation taskComputation)
throws TaskErrorException, Exception {
ValidateTaskConfiguration(taskConfiguration);
DataMinerAccessPoint dap = getDataMinerAccessPoint();
return dap.getTaskStatus(taskConfiguration);
return dap.getTaskStatus(taskConfiguration, taskComputation);
}

View File

@ -0,0 +1,22 @@
/**
*
*/
package org.gcube.common.workspacetaskexecutor.shared;
/**
* The Interface BaseTaskComputation.
*
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 3, 2018
*/
public interface BaseTaskComputation {
/**
* Gets the id.
*
* @return the id
*/
public String getId();
}

View File

@ -13,7 +13,7 @@ import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutable
* @param <I> the generic type must extend {@link BaseTaskConfiguration}
* @param <O> the generic type must extend {@link BaseTaskExecutionStatus}
*/
public interface ExecutableTask<I extends BaseTaskConfiguration, O extends BaseTaskExecutionStatus> {
public interface ExecutableTask<I extends BaseTaskConfiguration, C extends BaseTaskComputation, O extends BaseTaskExecutionStatus> {
/**
@ -43,10 +43,11 @@ public interface ExecutableTask<I extends BaseTaskConfiguration, O extends BaseT
* Monitor run status.
*
* @param taskConfiguration the task configuration
* @param taskComputation the task computation
* @return the o
* @throws TaskErrorException the task error exception
* @throws Exception the exception
*/
O monitorRunStatus(I taskConfiguration) throws TaskErrorException, Exception;
O monitorRunStatus(I taskConfiguration, C taskComputation) throws TaskErrorException, Exception;
}

View File

@ -0,0 +1,171 @@
/**
*
*/
package org.gcube.common.workspacetaskexecutor.shared.dataminer;
import java.io.Serializable;
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskComputation;
/**
* The Class TaskComputation.
*
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 3, 2018
*/
public class TaskComputation implements BaseTaskComputation, Serializable {
/**
*
*/
private static final long serialVersionUID = 7987759965326087786L;
/** The id. */
private String id;
private String urlId;
private String operatorId;
private String operatorName;
private String equivalentRequest;
/**
* Instantiates a new DM computation id.
*/
public TaskComputation() {
}
/**
* Instantiates a new DM computation id.
*
* @param id the id
* @param urlId the url id
* @param operatorId the operator id
* @param operatorName the operator name
* @param equivalentRequest the equivalent request
*/
public TaskComputation(String id, String urlId, String operatorId,
String operatorName, String equivalentRequest) {
super();
this.id = id;
this.urlId = urlId;
this.operatorId = operatorId;
this.operatorName = operatorName;
this.equivalentRequest = equivalentRequest;
}
/**
* Gets the id.
*
* @return the id
*/
public String getId() {
return id;
}
/**
* Sets the id.
*
* @param id the new id
*/
public void setId(String id) {
this.id = id;
}
/**
* Gets the url id.
*
* @return the url id
*/
public String getUrlId() {
return urlId;
}
/**
* Sets the url id.
*
* @param urlId the new url id
*/
public void setUrlId(String urlId) {
this.urlId = urlId;
}
/**
* Gets the operator id.
*
* @return the operator id
*/
public String getOperatorId() {
return operatorId;
}
/**
* Sets the operator id.
*
* @param operatorId the new operator id
*/
public void setOperatorId(String operatorId) {
this.operatorId = operatorId;
}
/**
* Gets the operator name.
*
* @return the operator name
*/
public String getOperatorName() {
return operatorName;
}
/**
* Sets the operator name.
*
* @param operatorName the new operator name
*/
public void setOperatorName(String operatorName) {
this.operatorName = operatorName;
}
/**
* Gets the equivalent request.
*
* @return the equivalent request
*/
public String getEquivalentRequest() {
return equivalentRequest;
}
/**
* Sets the equivalent request.
*
* @param equivalentRequest the new equivalent request
*/
public void setEquivalentRequest(String equivalentRequest) {
this.equivalentRequest = equivalentRequest;
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("DMComputationId [id=");
builder.append(id);
builder.append(", urlId=");
builder.append(urlId);
builder.append(", operatorId=");
builder.append(operatorId);
builder.append(", operatorName=");
builder.append(operatorName);
builder.append(", equivalentRequest=");
builder.append(equivalentRequest);
builder.append("]");
return builder.toString();
}
}

View File

@ -5,7 +5,6 @@ import java.io.Serializable;
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskExecutionStatus;
import org.gcube.common.workspacetaskexecutor.shared.TaskStatus;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
/**
@ -26,7 +25,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
private Float percentCompleted = new Float(0);
private String log;
private String currentMessage="Waiting to start..";
private ComputationId computationId;
private TaskComputation computationId;
private BaseTaskConfiguration taskConfiguration;
@ -43,7 +42,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
* @param taskConfiguration the task configuration
* @param computationId the computation id
*/
public TaskExecutionStatus(BaseTaskConfiguration taskConfiguration, ComputationId computationId) {
public TaskExecutionStatus(BaseTaskConfiguration taskConfiguration, TaskComputation computationId) {
this.taskConfiguration = taskConfiguration;
this.computationId = computationId;
}
@ -76,7 +75,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
*
* @return the computationId
*/
public ComputationId getComputationId() {
public TaskComputation getComputationId() {
return computationId;
}
@ -88,7 +87,7 @@ public class TaskExecutionStatus implements BaseTaskExecutionStatus, Serializabl
*
* @param computationId the computationId to set
*/
public void setComputationId(ComputationId computationId) {
public void setComputationId(TaskComputation computationId) {
this.computationId = computationId;
}

View File

@ -0,0 +1,47 @@
/**
*
*/
package org.gcube.common.workspacetaskexecutor.util;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
/**
*
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 3, 2018
*/
public class Converter {
/**
* To dm computation id.
*
* @param computationId the computation id
* @return the DM computation id
*/
public static TaskComputation toDMComputationId(ComputationId computationId){
if(computationId==null)
return null;
return new TaskComputation(computationId.getId(), computationId.getUrlId(), computationId.getOperatorId(), computationId.getOperatorName(), computationId.getEquivalentRequest());
}
/**
* To computation id.
*
* @param computationId the computation id
* @return the computation id
*/
public static ComputationId toComputationId(TaskComputation computationId){
if(computationId==null)
return null;
return new ComputationId(computationId.getId(), computationId.getUrlId(), computationId.getOperatorId(), computationId.getOperatorName(), computationId.getEquivalentRequest());
}
}