workspace-task-executor-lib.../src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java

315 lines
9.1 KiB
Java

/*
*
*/
package org.gcube.common.workspacetaskexecutor.dataminer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.Status;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration;
import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus;
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException;
import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException;
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource;
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource;
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.FileParameter;
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Class DataMinerAccessPoint.
*
* @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it
* May 2, 2018
*/
public class DataMinerAccessPoint {
private DataMinerService dataMinerService;
private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class);
/** The map call back. */
// Fully synchronized HashMap
private Map<Integer, TaskExecutionStatus> mapExecutionTask = Collections.synchronizedMap(new HashMap<>());
/**
* Instantiates a new data miner access point.
*/
public DataMinerAccessPoint() {
dataMinerService = new DataMinerService();
}
/**
* Save task.
*
* @param algConfiguration the alg configuration
* @param taskStatus the task status
*/
private void saveTask(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){
mapExecutionTask.put(algConfiguration.hashCode(), taskStatus);
}
/**
* Gets the task.
*
* @param taskConfiguration the task configuration
* @return the task
*/
private TaskExecutionStatus getTask(BaseTaskConfiguration taskConfiguration){
return mapExecutionTask.get(taskConfiguration.getConfigurationKey());
}
/**
* Abort running task.
*
* @param taskConfiguration the alg configuration
* @throws TaskErrorException the task error exception
* @throws TaskNotExecutableException the task not executable exception
*/
public void abortRunningTask(TaskConfiguration taskConfiguration) throws TaskErrorException, TaskNotExecutableException{
TaskExecutionStatus task = getTask(taskConfiguration);
if(task==null)
throw new TaskErrorException("The task with configuration: "+taskConfiguration+" is not running");
SClient sClient;
try {
sClient = dataMinerService.getClient();
sClient.cancelComputation(task.getComputationId());
}
catch (Exception e) {
String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId();
logger.error(error, e);
throw new TaskNotExecutableException(error);
}
}
/**
* Do run task.
*
* @param algConfiguration the alg configuration
* @return the task execution status
* @throws TaskNotExecutableException the task not executable exception
*/
public TaskExecutionStatus doRunTask(TaskConfiguration algConfiguration) throws TaskNotExecutableException{
Operator operator;
SClient sClient;
try {
sClient = dataMinerService.getClient();
operator = sClient.getOperatorById(algConfiguration.getTaskId());
}
catch (Exception e) {
String error = "Error on get Client or the Operator for id: "+algConfiguration.getTaskId();
logger.error(error, e);
throw new TaskNotExecutableException(error);
}
if (operator == null) {
logger.error("Operator not found");
throw new TaskNotExecutableException("Data Miner operator not found");
}
try {
addParametersToOperator(operator, algConfiguration.getMapParameters());
logger.debug("Start Computation");
ComputationId computationId = sClient.startComputation(operator);
logger.debug("Started ComputationId: " + computationId);
return startComputationOnDataMiner(algConfiguration, computationId, sClient);
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
* Gets the task status. To monitor the task exection..
*
* @param algConfiguration the alg configuration
* @return the task status
* @throws TaskErrorException the task error exception
*/
public TaskExecutionStatus getTaskStatus(BaseTaskConfiguration algConfiguration) throws TaskErrorException{
TaskExecutionStatus task = getTask(algConfiguration);
if(task==null){
//TODO
//startComputationOnDM(computationId, sClient);
throw new TaskErrorException("No Task running with configuration: "+algConfiguration);
}
return task;
}
/**
* Start computation on data miner.
*
* @param algConfiguration the alg configuration
* @param computationId the computation id
* @param sClient the s client
* @return the task execution status
*/
private TaskExecutionStatus startComputationOnDataMiner(BaseTaskConfiguration algConfiguration, final ComputationId computationId, final SClient sClient){
TaskExecutionStatus status = new TaskExecutionStatus(algConfiguration, computationId);
DMMonitorListener listener = new DMMonitorListener() {
@Override
public void running(double percentage) {
logger.debug("Operation Running: " + percentage);
status.setStatus(Status.ONGOING);
status.setPercentCompleted((float) percentage);
status.setCurrentMessage("Operation Running...");
saveTask(algConfiguration, status);
}
@Override
public void failed(String message, Exception exception) {
logger.error("Operation Failed");
logger.error(message, exception);
status.setStatus(Status.FAILED);
status.setCurrentMessage(message);
status.addLog(exception.getMessage());
saveTask(algConfiguration, status);
}
@Override
public void complete(double percentage) {
logger.debug("Operation Completed");
logger.debug("Perc: " + percentage);
String log = retrieveOutput(computationId, sClient);
status.setStatus(Status.COMPLETED);
status.setCurrentMessage("Operation Completed");
status.addLog(log);
saveTask(algConfiguration, status);
}
@Override
public void cancelled() {
logger.debug("Operation Cancelled");
status.setStatus(Status.COMPLETED);
status.setCurrentMessage("Operation Cancelled");
status.addLog("Operation Cancelled");
saveTask(algConfiguration, status);
}
@Override
public void accepted() {
logger.debug("Operation Accepted");
status.setStatus(Status.ACCEPTED);
status.setCurrentMessage("Operation Accepted");
status.addLog("Operation Accepted");
saveTask(algConfiguration, status);
}
};
DMMonitor dmMonitor = new DMMonitor(computationId, sClient);
dmMonitor.add(listener);
dmMonitor.start();
return status;
}
/**
* Retrieve output.
*
* @param computationId the computation id
* @param sClient the s client
* @return the string
*/
private String retrieveOutput(ComputationId computationId, SClient sClient) {
StringBuilder builder = new StringBuilder();
try {
OutputData output = sClient.getOutputDataByComputationId(computationId);
logger.debug("Output: " + output);
builder.append("Output: " + output);
Resource resource = output.getResource();
if (resource.isMap()) {
MapResource mapResource = (MapResource) resource;
for (String key : mapResource.getMap().keySet()) {
logger.debug("Entry: " + key + " = "+ mapResource.getMap().get(key));
builder.append("Entry: " + key + " = "+ mapResource.getMap().get(key));
}
} else {
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
e.printStackTrace();
}
return builder.toString();
}
/**
* Adds the parameters to operator.
*
* @param operator the operator
* @param parameters the parameters
* @return the operator
*/
private Operator addParametersToOperator(Operator operator, Map<String, String> parameters) {
logger.debug("Adding parameters to operator");
List<Parameter> listParameters = new ArrayList<Parameter>();
for (String key : parameters.keySet()) {
// ObjectParameter op = new ObjectParameter();
// op.setName(key);
// op.setValue(parameters.get(key));
FileParameter fp=new FileParameter();
fp.setName(key);
fp.setValue(parameters.get(key));
listParameters.add(fp);
}
logger.debug("Parameters list is: " + listParameters);
operator.setOperatorParameters(listParameters);
return operator;
}
}