Enhancement on Project Activity #11690

git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/Common/workspace-task-executor-library@167308 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Francesco Mangiacrapa 2018-05-04 09:14:27 +00:00
parent 8bc1186ed0
commit 1cd02963ba
4 changed files with 154 additions and 97 deletions

View File

@ -17,6 +17,7 @@ import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStat
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException;
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException;
import org.gcube.common.workspacetaskexecutor.util.Converter; import org.gcube.common.workspacetaskexecutor.util.Converter;
import org.gcube.common.workspacetaskexecutor.util.EncrypterUtil;
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService; import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient; import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData; import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
@ -122,20 +123,21 @@ public class DataMinerAccessPoint {
/** /**
* Do run task. * Do run task.
* *
* @param algConfiguration the alg configuration * @param taskConfiguration the alg configuration
* @return the task execution status * @return the task execution status
* @throws TaskNotExecutableException the task not executable exception * @throws TaskNotExecutableException the task not executable exception
*/ */
public TaskExecutionStatus doRunTask(TaskConfiguration algConfiguration) throws TaskNotExecutableException{ public TaskExecutionStatus doRunTask(TaskConfiguration taskConfiguration) throws TaskNotExecutableException{
Operator operator; Operator operator;
SClient sClient; SClient sClient;
try { try {
sClient = dataMinerService.getClient(algConfiguration.getToken()); String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken());
operator = sClient.getOperatorById(algConfiguration.getTaskId()); sClient = dataMinerService.getClient(token);
operator = sClient.getOperatorById(taskConfiguration.getTaskId());
} }
catch (Exception e) { catch (Exception e) {
String error = "Error on get Client or the Operator for id: "+algConfiguration.getTaskId(); String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId();
logger.error(error, e); logger.error(error, e);
throw new TaskNotExecutableException(error); throw new TaskNotExecutableException(error);
} }
@ -146,11 +148,11 @@ public class DataMinerAccessPoint {
} }
try { try {
addParametersToOperator(operator, algConfiguration.getMapParameters()); addParametersToOperator(operator, taskConfiguration.getMapParameters());
logger.debug("Start Computation"); logger.debug("Start Computation");
ComputationId computationId = sClient.startComputation(operator); ComputationId computationId = sClient.startComputation(operator);
logger.debug("Started ComputationId: " + computationId); logger.debug("Started ComputationId: " + computationId);
return getTaskStatus(algConfiguration, Converter.toDMComputationId(computationId)); return getTaskStatus(taskConfiguration, Converter.toDMComputationId(computationId));
} }
catch (Exception e) { catch (Exception e) {
@ -164,25 +166,34 @@ public class DataMinerAccessPoint {
/** /**
* Gets the task status. To monitor the task exection.. * Gets the task status. To monitor the task exection..
* *
* @param algConfiguration the alg configuration * @param taskConfiguration the alg configuration
* @param taskComputation the task computation * @param taskComputation the task computation
* @return the task status * @return the task status
* @throws TaskErrorException the task error exception * @throws TaskErrorException the task error exception
* @throws TaskNotExecutableException the task not executable exception * @throws TaskNotExecutableException the task not executable exception
*/ */
public TaskExecutionStatus getTaskStatus(TaskConfiguration algConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ public TaskExecutionStatus getTaskStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
//TODO ???
TaskExecutionStatus theTaskStatus = getTask(taskConfiguration);
if(theTaskStatus==null)
throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey());
SClient sClient; SClient sClient;
ComputationId computationId = Converter.toComputationId(taskComputation); ComputationId computationId = Converter.toComputationId(taskComputation);
try { try {
sClient = dataMinerService.getClient(algConfiguration.getToken()); String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken());
sClient = dataMinerService.getClient(token);
} }
catch (Exception e) { catch (Exception e) {
logger.error("Error on get DM client", e); logger.error("Error on get DM client", e);
throw new TaskErrorException("Error on getting DataMiner client, Try later"); throw new TaskErrorException("Error on getting DataMiner client, Try later");
} }
TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, taskComputation); TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(taskConfiguration, taskComputation);
logger.debug("Requesting operation progress"); logger.debug("Requesting operation progress");
ComputationStatus computationStatus = null; ComputationStatus computationStatus = null;
@ -213,33 +224,33 @@ public class DataMinerAccessPoint {
logger.debug("Operation "+TaskStatus.ACCEPTED); logger.debug("Operation "+TaskStatus.ACCEPTED);
taskExecutiontatus.setStatus(TaskStatus.ACCEPTED); taskExecutiontatus.setStatus(TaskStatus.ACCEPTED);
taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage());
saveTask(algConfiguration, taskExecutiontatus); saveTask(taskConfiguration, taskExecutiontatus);
break; break;
case CANCELLED: case CANCELLED:
logger.debug("Operation "+TaskStatus.CANCELLED); logger.debug("Operation "+TaskStatus.CANCELLED);
taskExecutiontatus.setStatus(TaskStatus.CANCELLED); taskExecutiontatus.setStatus(TaskStatus.CANCELLED);
taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage());
saveTask(algConfiguration, taskExecutiontatus); saveTask(taskConfiguration, taskExecutiontatus);
break; break;
case COMPLETE: case COMPLETE:
logger.debug("Operation "+TaskStatus.COMPLETED); logger.debug("Operation "+TaskStatus.COMPLETED);
taskExecutiontatus.setStatus(TaskStatus.COMPLETED); taskExecutiontatus.setStatus(TaskStatus.COMPLETED);
taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage());
saveTask(algConfiguration, taskExecutiontatus); saveTask(taskConfiguration, taskExecutiontatus);
break; break;
case FAILED: case FAILED:
logger.debug("Operation "+TaskStatus.FAILED); logger.debug("Operation "+TaskStatus.FAILED);
taskExecutiontatus.setStatus(TaskStatus.FAILED); taskExecutiontatus.setStatus(TaskStatus.FAILED);
taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage()); taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage());
taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage());
saveTask(algConfiguration, taskExecutiontatus); saveTask(taskConfiguration, taskExecutiontatus);
break; break;
case RUNNING: case RUNNING:
logger.debug("Operation "+TaskStatus.ONGOING); logger.debug("Operation "+TaskStatus.ONGOING);
taskExecutiontatus.setStatus(TaskStatus.ONGOING); taskExecutiontatus.setStatus(TaskStatus.ONGOING);
taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage());
taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage()); taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage());
saveTask(algConfiguration, taskExecutiontatus); saveTask(taskConfiguration, taskExecutiontatus);
break; break;
default: default:
break; break;

View File

@ -32,8 +32,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
*/ */
public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfiguration, TaskComputation, TaskExecutionStatus>, ExecutableItem<TaskConfiguration>{ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask<TaskConfiguration, TaskComputation, TaskExecutionStatus>, ExecutableItem<TaskConfiguration>{
/** The logger. */ /** The logger. */
private static Logger logger = LoggerFactory.getLogger(WorkspaceDataMinerTaskExecutor.class); private static Logger logger = LoggerFactory.getLogger(WorkspaceDataMinerTaskExecutor.class);

View File

@ -12,13 +12,10 @@ import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/** /**
* The Class TaskConfiguration. * The Class TaskConfiguration.
* *
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it * @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it May 2, 2018
* May 2, 2018
*/ */
public class TaskConfiguration implements BaseTaskConfiguration, Serializable { public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
@ -27,23 +24,23 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
*/ */
@JsonIgnoreProperties @JsonIgnoreProperties
public static final String FIELD_CONFIGURATION_KEY = "configurationKey"; public static final String FIELD_CONFIGURATION_KEY = "configurationKey";
/** /**
* *
*/ */
private static final long serialVersionUID = -3380573762288127547L; private static final long serialVersionUID = -3380573762288127547L;
private String taskId; private String taskId;
@JsonIgnoreProperties @JsonIgnoreProperties
private String taskDescription; //optional private String taskDescription; // optional
@JsonIgnoreProperties /*
private String token; //optional * The encrypted VRE Token of the user in the VRE where submit the
* computation
*/
private String maskedToken;
private String workspaceItemId; private String workspaceItemId;
@JsonIgnoreProperties @JsonIgnoreProperties
private Map<String, String> mapParameters; //optional private Map<String, String> mapParameters; // optional
private String configurationKey; private String configurationKey;
/** /**
* Instantiates a new task configuration. * Instantiates a new task configuration.
*/ */
@ -51,43 +48,52 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
} }
/** /**
* Instantiates a new task configuration. * Instantiates a new task configuration.
* *
* @param configurationKey the configuration key * @param configurationKey
* @param taskId the task id * the configuration key
* @param taskDescription the task description * @param taskId
* @param token the token * the task id
* @param workspaceItemId the workspace item id * @param taskDescription
* @param mapParameters the map parameters * the task description
* @param maskedToken
* the token
* @param workspaceItemId
* the workspace item id
* @param mapParameters
* the map parameters
*/ */
public TaskConfiguration(String configurationKey, public TaskConfiguration(
String taskId, String taskDescription, String token, String configurationKey, String taskId, String taskDescription,
String workspaceItemId, Map<String, String> mapParameters) { String maskedToken, String workspaceItemId,
Map<String, String> mapParameters) {
setConfigurationKey(configurationKey); setConfigurationKey(configurationKey);
this.taskId = taskId; this.taskId = taskId;
this.taskDescription = taskDescription; this.taskDescription = taskDescription;
this.token = token; this.maskedToken = maskedToken;
this.workspaceItemId = workspaceItemId; this.workspaceItemId = workspaceItemId;
this.mapParameters = mapParameters; this.mapParameters = mapParameters;
} }
/* (non-Javadoc) /*
* @see org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration#getAccessKey() * (non-Javadoc)
* @see org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration#
* getAccessKey()
*/ */
@Override @Override
public String getConfigurationKey() { public String getConfigurationKey() {
if(configurationKey==null) if (configurationKey == null)
configurationKey = hashCode()+""; configurationKey = hashCode() + "";
return configurationKey; return configurationKey;
} }
/* (non-Javadoc) /*
* @see org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration#setAccessKey(java.lang.String) * (non-Javadoc)
* @see org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration#
* setAccessKey(java.lang.String)
*/ */
@Override @Override
public void setConfigurationKey(String configurationKey) { public void setConfigurationKey(String configurationKey) {
@ -95,9 +101,8 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
this.configurationKey = configurationKey; this.configurationKey = configurationKey;
} }
/*
* (non-Javadoc)
/* (non-Javadoc)
* @see org.gcube.common.workspacetaskexecutor.TaskConfiguration#getTaskId() * @see org.gcube.common.workspacetaskexecutor.TaskConfiguration#getTaskId()
*/ */
@Override @Override
@ -106,7 +111,6 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
return taskId; return taskId;
} }
/** /**
* Gets the task description. * Gets the task description.
* *
@ -117,20 +121,6 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
return taskDescription; return taskDescription;
} }
/**
* Gets the token.
*
* @return the token
*/
public String getToken() {
return token;
}
/** /**
* Gets the workspace item id. * Gets the workspace item id.
* *
@ -141,8 +131,6 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
return workspaceItemId; return workspaceItemId;
} }
/** /**
* Gets the map parameters. * Gets the map parameters.
* *
@ -153,66 +141,52 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
return mapParameters; return mapParameters;
} }
/** /**
* Sets the task description. * Sets the task description.
* *
* @param taskDescription the taskDescription to set * @param taskDescription
* the taskDescription to set
*/ */
public void setTaskDescription(String taskDescription) { public void setTaskDescription(String taskDescription) {
this.taskDescription = taskDescription; this.taskDescription = taskDescription;
} }
/** /**
* Sets the task id. * Sets the task id.
* *
* @param taskId the taskId to set * @param taskId
* the taskId to set
*/ */
public void setTaskId(String taskId) { public void setTaskId(String taskId) {
this.taskId = taskId; this.taskId = taskId;
} }
/**
* Sets the token.
*
* @param token the token to set
*/
public void setToken(String token) {
this.token = token;
}
/** /**
* Sets the workspace item id. * Sets the workspace item id.
* *
* @param workspaceItemId the workspaceItemId to set * @param workspaceItemId
* the workspaceItemId to set
*/ */
public void setWorkspaceItemId(String workspaceItemId) { public void setWorkspaceItemId(String workspaceItemId) {
this.workspaceItemId = workspaceItemId; this.workspaceItemId = workspaceItemId;
} }
/** /**
* Sets the map parameters. * Sets the map parameters.
* *
* @param mapParameters the mapParameters to set * @param mapParameters
* the mapParameters to set
*/ */
public void setMapParameters(Map<String, String> mapParameters) { public void setMapParameters(Map<String, String> mapParameters) {
this.mapParameters = mapParameters; this.mapParameters = mapParameters;
} }
/*
/* (non-Javadoc) * (non-Javadoc)
* @see java.lang.Object#hashCode() * @see java.lang.Object#hashCode()
*/ */
@Override @Override
@ -220,12 +194,33 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
int hash = 1; int hash = 1;
hash = hash * 13 + (taskId == null ? 0 : taskId.hashCode()); hash = hash * 13 + (taskId == null ? 0 : taskId.hashCode());
hash = hash * 17 + (workspaceItemId == null ? 0 : workspaceItemId.hashCode()); hash =
hash * 17 +
(workspaceItemId == null ? 0 : workspaceItemId.hashCode());
hash = hash * new Random().nextInt(); hash = hash * new Random().nextInt();
return hash; return hash;
} }
/* (non-Javadoc)
/**
* @return the maskedToken
*/
public String getMaskedToken() {
return maskedToken;
}
/**
* @param maskedToken the maskedToken to set
*/
public void setMaskedToken(String maskedToken) {
this.maskedToken = maskedToken;
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString() * @see java.lang.Object#toString()
*/ */
@Override @Override
@ -237,7 +232,7 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
builder.append(", taskDescription="); builder.append(", taskDescription=");
builder.append(taskDescription); builder.append(taskDescription);
builder.append(", token="); builder.append(", token=");
builder.append(token); builder.append(maskedToken);
builder.append(", workspaceItemId="); builder.append(", workspaceItemId=");
builder.append(workspaceItemId); builder.append(workspaceItemId);
builder.append(", mapParameters="); builder.append(", mapParameters=");
@ -247,7 +242,4 @@ public class TaskConfiguration implements BaseTaskConfiguration, Serializable {
builder.append("]"); builder.append("]");
return builder.toString(); return builder.toString();
} }
} }

View File

@ -0,0 +1,56 @@
/**
*
*/
package org.gcube.common.workspacetaskexecutor.util;
import org.gcube.common.encryption.StringEncrypter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Class EncrypterUtil.
*
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 4, 2018
*/
public class EncrypterUtil {
private static Logger logger = LoggerFactory.getLogger(EncrypterUtil.class);
/**
* Encrypt string.
*
* @param toEncrypt the to encrypt
* @return the string
*/
public static String encryptString(String toEncrypt){
try {
return StringEncrypter.getEncrypter().encrypt(toEncrypt);
}
catch (Exception e) {
//silent
logger.warn("Encrypt error for the string: "+toEncrypt);
}
return toEncrypt;
}
/**
* Decrypt string.
*
* @param toDecrypt the to decrypt
* @return the string
*/
public static String decryptString(String toDecrypt){
try {
return StringEncrypter.getEncrypter().decrypt(toDecrypt);
}
catch (Exception e) {
//silent
logger.warn("Decrypt error for the string: "+toDecrypt);
}
return toDecrypt;
}
}