/* * */ package org.gcube.common.workspacetaskexecutor.dataminer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration; import org.gcube.common.workspacetaskexecutor.shared.TaskParameter; import org.gcube.common.workspacetaskexecutor.shared.TaskStatus; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException; import org.gcube.common.workspacetaskexecutor.util.Converter; import org.gcube.common.workspacetaskexecutor.util.EncrypterUtil; import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService; import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient; import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData; import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource; import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter; import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus; import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus.Status; import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The Class DataMinerAccessPoint. * * @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it * May 2, 2018 */ public class DataMinerAccessPoint { private DataMinerService dataMinerService; private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class); /** The map call back. */ // Fully synchronized HashMap private Map mapExecutionTask = Collections.synchronizedMap(new HashMap()); /** * Instantiates a new data miner access point. */ public DataMinerAccessPoint() { dataMinerService = new DataMinerService(); } /** * Removes the task. * * @param algConfiguration the alg configuration * @return the task execution status */ private TaskExecutionStatus removeTask(BaseTaskConfiguration algConfiguration){ return mapExecutionTask.remove(algConfiguration.getConfigurationKey()); } /** * Save task. * * @param algConfiguration the alg configuration * @param taskStatus the task status */ private void saveTask(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){ mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus); } /** * Gets the running task. * * @param taskConfiguration the task configuration * @return the running task */ public TaskExecutionStatus getRunningTask(BaseTaskConfiguration taskConfiguration){ return mapExecutionTask.get(taskConfiguration.getConfigurationKey()); } /** * Abort running task. * * @param taskConfiguration the alg configuration * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ public void abortRunningTask(TaskConfiguration taskConfiguration) throws TaskErrorException, TaskNotExecutableException{ TaskExecutionStatus task = getRunningTask(taskConfiguration); if(task==null) throw new TaskErrorException("The task with configuration: "+taskConfiguration+" is not running"); SClient sClient; try { sClient = dataMinerService.getClient(); sClient.cancelComputation(Converter.toComputationId(task.getTaskComputation())); } catch (Exception e) { String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId(); logger.error(error, e); throw new TaskNotExecutableException(error); } } /** * Do run task. * * @param taskConfiguration the alg configuration * @return the task execution status * @throws TaskNotExecutableException the task not executable exception */ public TaskExecutionStatus doRunTask(TaskConfiguration taskConfiguration) throws TaskNotExecutableException{ Operator operator; SClient sClient; try { String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken()); sClient = dataMinerService.getClient(token); operator = sClient.getOperatorById(taskConfiguration.getTaskId()); } catch (Exception e) { String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId(); logger.error(error, e); throw new TaskNotExecutableException(error); } if (operator == null) { logger.error("Operator not found"); throw new TaskNotExecutableException("Data Miner operator not found"); } try { addParametersToOperator(operator, taskConfiguration.getListParameters()); logger.debug("Start Computation"); ComputationId computationId = sClient.startComputation(operator); logger.debug("Started ComputationId: " + computationId); return getTaskStatus(taskConfiguration, Converter.toDMComputationId(computationId)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } /** * Gets the task status. To monitor the task exection.. * * @param taskConfiguration the alg configuration * @param taskComputation the task computation * @return the task status * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ public TaskExecutionStatus getTaskStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ //TODO ??? TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration); if(theTaskStatus==null) throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey()); SClient sClient; ComputationId computationId = Converter.toComputationId(taskComputation); try { String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken()); sClient = dataMinerService.getClient(token); } catch (Exception e) { logger.error("Error on get DM client", e); throw new TaskErrorException("Error on getting DataMiner client, Try later"); } TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(taskConfiguration, taskComputation); logger.debug("Requesting operation progress"); ComputationStatus computationStatus = null; try { computationStatus = sClient.getComputationStatus(computationId); } catch (Exception e) { logger.error(e.getLocalizedMessage()); e.printStackTrace(); } logger.debug("ComputationStatus: " + computationStatus); if (computationStatus == null) { logger.error("ComputationStatus is null"); return taskExecutiontatus; } Status status = computationStatus.getStatus(); if (status == null) { logger.error("Status is null"); return taskExecutiontatus; } taskExecutiontatus.setCurrentMessage(computationStatus.getMessage()); switch (status) { case ACCEPTED: logger.debug("Operation "+TaskStatus.ACCEPTED); taskExecutiontatus.setStatus(TaskStatus.ACCEPTED); taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage()); saveTask(taskConfiguration, taskExecutiontatus); break; case CANCELLED: logger.debug("Operation "+TaskStatus.CANCELLED); taskExecutiontatus.setStatus(TaskStatus.CANCELLED); taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage()); saveTask(taskConfiguration, taskExecutiontatus); break; case COMPLETE: logger.debug("Operation "+TaskStatus.COMPLETED); taskExecutiontatus.setStatus(TaskStatus.COMPLETED); taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage()); saveTask(taskConfiguration, taskExecutiontatus); break; case FAILED: logger.debug("Operation "+TaskStatus.FAILED); taskExecutiontatus.setStatus(TaskStatus.FAILED); taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage()); taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage()); saveTask(taskConfiguration, taskExecutiontatus); break; case RUNNING: logger.debug("Operation "+TaskStatus.ONGOING); taskExecutiontatus.setStatus(TaskStatus.ONGOING); taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage()); taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage()); saveTask(taskConfiguration, taskExecutiontatus); break; default: break; } return taskExecutiontatus; } /** * Retrieve output. * * @param computationId the computation id * @param sClient the s client * @return the string */ private String retrieveOutput(ComputationId computationId, SClient sClient) { StringBuilder builder = new StringBuilder(); try { OutputData output = sClient.getOutputDataByComputationId(computationId); logger.debug("Output: " + output); builder.append("Output: " + output); Resource resource = output.getResource(); if (resource.isMap()) { MapResource mapResource = (MapResource) resource; for (String key : mapResource.getMap().keySet()) { logger.debug("Entry: " + key + " = "+ mapResource.getMap().get(key)); builder.append("Entry: " + key + " = "+ mapResource.getMap().get(key)); } } else { } } catch (Exception e) { logger.error(e.getLocalizedMessage()); e.printStackTrace(); } return builder.toString(); } /** * Adds the parameters to operator. * * @param operator the operator * @param parameters the parameters * @return the operator */ private Operator addParametersToOperator(Operator operator, List parameters) { logger.debug("Adding parameters to operator"); List listParameters = new ArrayList(); for (TaskParameter taskParameter : parameters) { if(taskParameter.getType()==null) continue; Parameter dmParameter = DMConverter.toDMParameter(taskParameter); if(dmParameter!=null) listParameters.add(dmParameter); } logger.debug("Parameters list is: " + listParameters); operator.setOperatorParameters(listParameters); return operator; } }