387 lines
13 KiB
Java
387 lines
13 KiB
Java
/*
|
|
*
|
|
*/
|
|
package org.gcube.common.workspacetaskexecutor.dataminer;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
|
|
import org.gcube.common.workspacetaskexecutor.shared.TaskParameter;
|
|
import org.gcube.common.workspacetaskexecutor.shared.TaskStatus;
|
|
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation;
|
|
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration;
|
|
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus;
|
|
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException;
|
|
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException;
|
|
import org.gcube.common.workspacetaskexecutor.util.Converter;
|
|
import org.gcube.common.workspacetaskexecutor.util.EncrypterUtil;
|
|
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
|
|
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus.Status;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
/**
|
|
* The Class DataMinerAccessPoint.
|
|
*
|
|
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
|
|
* May 2, 2018
|
|
*/
|
|
public class DataMinerAccessPoint {
|
|
|
|
/**
|
|
*
|
|
*/
|
|
private static final String NEW_STATUS_CHARS = "******* ";
|
|
/**
|
|
*
|
|
*/
|
|
private static final String NEW_LINE = "\n";
|
|
private static final String DASH = "-";
|
|
private DataMinerService dataMinerService;
|
|
private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class);
|
|
|
|
/** The map call back. */
|
|
// Fully synchronized HashMap
|
|
private Map<String, TaskExecutionStatus> mapExecutionTask = Collections.synchronizedMap(new HashMap<String, TaskExecutionStatus>());
|
|
|
|
|
|
/**
|
|
* Instantiates a new data miner access point.
|
|
*/
|
|
public DataMinerAccessPoint() {
|
|
dataMinerService = new DataMinerService();
|
|
}
|
|
|
|
/**
|
|
* Removes the task from memory.
|
|
*
|
|
* @param algConfiguration the alg configuration
|
|
* @return the task execution status
|
|
*/
|
|
private TaskExecutionStatus removeTaskFromMemory(BaseTaskConfiguration algConfiguration){
|
|
|
|
return mapExecutionTask.remove(algConfiguration.getConfigurationKey());
|
|
}
|
|
|
|
|
|
/**
|
|
* Save task in memory.
|
|
*
|
|
* @param algConfiguration the alg configuration
|
|
* @param taskStatus the task status
|
|
*/
|
|
private void saveTaskInMemory(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){
|
|
|
|
mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus);
|
|
}
|
|
|
|
|
|
/**
|
|
* Gets the running task.
|
|
*
|
|
* @param taskConfiguration the task configuration
|
|
* @return the running task
|
|
*/
|
|
public TaskExecutionStatus getRunningTask(BaseTaskConfiguration taskConfiguration){
|
|
return mapExecutionTask.get(taskConfiguration.getConfigurationKey());
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
* Abort running task.
|
|
*
|
|
* @param taskConfiguration the alg configuration
|
|
* @throws TaskErrorException the task error exception
|
|
* @throws TaskNotExecutableException the task not executable exception
|
|
*/
|
|
public void abortRunningTask(TaskConfiguration taskConfiguration) throws TaskErrorException, TaskNotExecutableException{
|
|
|
|
TaskExecutionStatus task = getRunningTask(taskConfiguration);
|
|
|
|
if(task==null)
|
|
throw new TaskErrorException("The task with configuration: "+taskConfiguration+" is not running");
|
|
|
|
SClient sClient;
|
|
try {
|
|
sClient = dataMinerService.getClient();
|
|
sClient.cancelComputation(DMConverter.toComputationId(task.getTaskComputation()));
|
|
}
|
|
catch (Exception e) {
|
|
String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId();
|
|
logger.error(error, e);
|
|
throw new TaskNotExecutableException(error);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Do run task.
|
|
*
|
|
* @param taskConfiguration the alg configuration
|
|
* @return the task execution status
|
|
* @throws TaskNotExecutableException the task not executable exception
|
|
*/
|
|
public TaskExecutionStatus doRunTask(TaskConfiguration taskConfiguration) throws TaskNotExecutableException{
|
|
|
|
Operator operator;
|
|
SClient sClient;
|
|
try {
|
|
String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken());
|
|
sClient = dataMinerService.getClient(token);
|
|
operator = sClient.getOperatorById(taskConfiguration.getTaskId());
|
|
}
|
|
catch (Exception e) {
|
|
String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId();
|
|
logger.error(error, e);
|
|
throw new TaskNotExecutableException(error);
|
|
}
|
|
|
|
if (operator == null) {
|
|
logger.error("Operator not found");
|
|
throw new TaskNotExecutableException("Data Miner operator not found");
|
|
}
|
|
|
|
try {
|
|
addParametersToOperator(operator, taskConfiguration.getListParameters());
|
|
logger.debug("Start Computation");
|
|
ComputationId computationId = sClient.startComputation(operator);
|
|
logger.debug("Started ComputationId: " + computationId);
|
|
return loadTaskExecutionStatus(taskConfiguration, DMConverter.toDMComputationId(computationId));
|
|
|
|
}
|
|
catch (Exception e) {
|
|
// TODO Auto-generated catch block
|
|
e.printStackTrace();
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Monitor status.
|
|
*
|
|
* @param taskConfiguration the task configuration
|
|
* @param taskComputation the task computation
|
|
* @return the task execution status
|
|
* @throws TaskErrorException the task error exception
|
|
* @throws TaskNotExecutableException the task not executable exception
|
|
*/
|
|
public TaskExecutionStatus monitorStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
|
|
TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration);
|
|
|
|
if(theTaskStatus==null)
|
|
throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey());
|
|
|
|
TaskExecutionStatus tes = loadTaskExecutionStatus(taskConfiguration, taskComputation);
|
|
|
|
switch (tes.getStatus()) {
|
|
case COMPLETED:
|
|
case CANCELLED:
|
|
case FAILED:
|
|
logger.info("Removing "+tes+ "from memory");
|
|
removeTaskFromMemory(tes.getTaskConfiguration());
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return tes;
|
|
}
|
|
|
|
|
|
/**
|
|
* Load task execution status.
|
|
*
|
|
* @param taskConfiguration the task configuration
|
|
* @param taskComputation the task computation
|
|
* @return the task execution status
|
|
* @throws TaskErrorException the task error exception
|
|
* @throws TaskNotExecutableException the task not executable exception
|
|
*/
|
|
private TaskExecutionStatus loadTaskExecutionStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{
|
|
|
|
TaskExecutionStatus oldTaskExecutionStatus = getRunningTask(taskConfiguration);
|
|
boolean isStart = false;
|
|
if(oldTaskExecutionStatus==null)
|
|
isStart = true;
|
|
|
|
SClient sClient;
|
|
ComputationId computationId = DMConverter.toComputationId(taskComputation);
|
|
try {
|
|
String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken());
|
|
sClient = dataMinerService.getClient(token);
|
|
}
|
|
catch (Exception e) {
|
|
logger.error("Error on get DM client", e);
|
|
throw new TaskErrorException("Error on getting DataMiner client, Try later");
|
|
}
|
|
|
|
TaskExecutionStatus newTaskExecutionStatus = new TaskExecutionStatus(taskConfiguration, taskComputation);
|
|
|
|
if(!isStart)
|
|
//adding previous log
|
|
newTaskExecutionStatus.addLog(oldTaskExecutionStatus.getLog());
|
|
else{
|
|
//first time adding the configurations
|
|
String confs=NEW_LINE+NEW_STATUS_CHARS+Converter.getCurrentFormattedDate(null)+NEW_LINE;
|
|
confs+="Computation starts with configurations...";
|
|
confs+=NEW_LINE+NEW_LINE+DASH+DASH+"Operator Id: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorId();
|
|
confs+=NEW_LINE+NEW_LINE+DASH+DASH+"Operator Name: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorName();
|
|
confs+=NEW_LINE+NEW_LINE+DASH+DASH+"EquivalentRequest: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getEquivalentRequest();
|
|
confs+=NEW_LINE+NEW_LINE+DASH+DASH+"URL ID: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getUrlId();
|
|
newTaskExecutionStatus.addLog(confs+NEW_LINE);
|
|
}
|
|
|
|
logger.debug("Requesting operation progress");
|
|
ComputationStatus computationStatus = null;
|
|
try {
|
|
computationStatus = sClient.getComputationStatus(computationId);
|
|
} catch (Exception e) {
|
|
logger.error(e.getLocalizedMessage());
|
|
e.printStackTrace();
|
|
|
|
}
|
|
|
|
logger.debug("ComputationStatus: " + computationStatus);
|
|
if (computationStatus == null) {
|
|
logger.error("ComputationStatus is null");
|
|
return newTaskExecutionStatus;
|
|
}
|
|
|
|
Status status = computationStatus.getStatus();
|
|
if (status == null) {
|
|
logger.error("Status is null");
|
|
return newTaskExecutionStatus;
|
|
}
|
|
|
|
newTaskExecutionStatus.setCurrentMessage(computationStatus.getMessage());
|
|
newTaskExecutionStatus.setPercentCompleted((float) computationStatus.getPercentage());
|
|
String historyMsg=NEW_LINE+NEW_STATUS_CHARS+Converter.getCurrentFormattedDate(null)+NEW_LINE;
|
|
switch (status) {
|
|
case ACCEPTED:
|
|
logger.debug("Operation "+TaskStatus.ACCEPTED);
|
|
newTaskExecutionStatus.setStatus(TaskStatus.ACCEPTED);
|
|
historyMsg+= "Status "+TaskStatus.ACCEPTED;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
case CANCELLED:
|
|
logger.debug("Operation "+TaskStatus.CANCELLED);
|
|
newTaskExecutionStatus.setStatus(TaskStatus.CANCELLED);
|
|
historyMsg+= "Status "+TaskStatus.CANCELLED;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
case COMPLETE:
|
|
logger.debug("Operation "+TaskStatus.COMPLETED);
|
|
newTaskExecutionStatus.setStatus(TaskStatus.COMPLETED);
|
|
historyMsg+= "Status "+TaskStatus.COMPLETED;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
case FAILED:
|
|
logger.debug("Operation "+TaskStatus.FAILED);
|
|
newTaskExecutionStatus.setStatus(TaskStatus.FAILED);
|
|
historyMsg+= "Status "+TaskStatus.FAILED;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
case RUNNING:
|
|
logger.debug("Operation "+TaskStatus.ONGOING);
|
|
newTaskExecutionStatus.setStatus(TaskStatus.ONGOING);
|
|
historyMsg+= "Status "+TaskStatus.ONGOING;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
default:
|
|
newTaskExecutionStatus.setStatus(TaskStatus.INITIALIZING);
|
|
historyMsg+= "Status "+TaskStatus.INITIALIZING;
|
|
historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():"";
|
|
break;
|
|
|
|
}
|
|
|
|
newTaskExecutionStatus.addLog(historyMsg+NEW_LINE);
|
|
saveTaskInMemory(taskConfiguration, newTaskExecutionStatus);
|
|
return newTaskExecutionStatus;
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
* Retrieve output.
|
|
*
|
|
* @param computationId the computation id
|
|
* @param sClient the s client
|
|
* @return the string
|
|
*/
|
|
private String retrieveOutput(ComputationId computationId, SClient sClient) {
|
|
StringBuilder builder = new StringBuilder();
|
|
try {
|
|
|
|
OutputData output = sClient.getOutputDataByComputationId(computationId);
|
|
logger.debug("Output: " + output);
|
|
builder.append("Output: " + output);
|
|
Resource resource = output.getResource();
|
|
if (resource.isMap()) {
|
|
MapResource mapResource = (MapResource) resource;
|
|
for (String key : mapResource.getMap().keySet()) {
|
|
logger.debug("Entry: " + key + " = "+ mapResource.getMap().get(key));
|
|
builder.append("Entry: " + key + " = "+ mapResource.getMap().get(key));
|
|
}
|
|
|
|
} else {
|
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
logger.error(e.getLocalizedMessage());
|
|
e.printStackTrace();
|
|
}
|
|
|
|
return builder.toString();
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
* Adds the parameters to operator.
|
|
*
|
|
* @param operator the operator
|
|
* @param parameters the parameters
|
|
* @return the operator
|
|
*/
|
|
private Operator addParametersToOperator(Operator operator, List<TaskParameter> parameters) {
|
|
logger.debug("Adding parameters to operator");
|
|
|
|
List<Parameter> listParameters = new ArrayList<Parameter>();
|
|
for (TaskParameter taskParameter : parameters) {
|
|
|
|
if(taskParameter.getType()==null)
|
|
continue;
|
|
|
|
Parameter dmParameter = DMConverter.toDMParameter(taskParameter);
|
|
if(dmParameter!=null)
|
|
listParameters.add(dmParameter);
|
|
|
|
|
|
}
|
|
logger.debug("Parameters list is: " + listParameters);
|
|
operator.setOperatorParameters(listParameters);
|
|
return operator;
|
|
|
|
}
|
|
|
|
|
|
}
|