/* * */ package org.gcube.common.workspacetaskexecutor.dataminer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration; import org.gcube.common.workspacetaskexecutor.shared.TaskParameter; import org.gcube.common.workspacetaskexecutor.shared.TaskStatus; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskComputation; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException; import org.gcube.common.workspacetaskexecutor.util.EncrypterUtil; import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService; import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient; import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData; import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource; import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter; import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus; import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus.Status; import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The Class DataMinerAccessPoint. * * @author Francesco Mangiacrapa francesco.mangiacrapa@isti.cnr.it * May 2, 2018 */ public class DataMinerAccessPoint { private DataMinerService dataMinerService; private static Logger logger = LoggerFactory.getLogger(DataMinerAccessPoint.class); /** The map call back. */ // Fully synchronized HashMap private Map mapExecutionTask = Collections.synchronizedMap(new HashMap()); /** * Instantiates a new data miner access point. */ public DataMinerAccessPoint() { dataMinerService = new DataMinerService(); } /** * Removes the task from memory. * * @param algConfiguration the alg configuration * @return the task execution status */ private TaskExecutionStatus removeTaskFromMemory(BaseTaskConfiguration algConfiguration){ return mapExecutionTask.remove(algConfiguration.getConfigurationKey()); } /** * Save task in memory. * * @param algConfiguration the alg configuration * @param taskStatus the task status */ private void saveTaskInMemory(BaseTaskConfiguration algConfiguration, TaskExecutionStatus taskStatus){ mapExecutionTask.put(algConfiguration.getConfigurationKey(), taskStatus); } /** * Gets the running task. * * @param taskConfiguration the task configuration * @return the running task */ public TaskExecutionStatus getRunningTask(BaseTaskConfiguration taskConfiguration){ return mapExecutionTask.get(taskConfiguration.getConfigurationKey()); } /** * Abort running task. * * @param taskConfiguration the alg configuration * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ public void abortRunningTask(TaskConfiguration taskConfiguration) throws TaskErrorException, TaskNotExecutableException{ TaskExecutionStatus task = getRunningTask(taskConfiguration); if(task==null) throw new TaskErrorException("The task with configuration: "+taskConfiguration+" is not running"); SClient sClient; try { sClient = dataMinerService.getClient(); sClient.cancelComputation(DMConverter.toComputationId(task.getTaskComputation())); } catch (Exception e) { String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId(); logger.error(error, e); throw new TaskNotExecutableException(error); } } /** * Do run task. * * @param taskConfiguration the alg configuration * @return the task execution status * @throws TaskNotExecutableException the task not executable exception */ public TaskExecutionStatus doRunTask(TaskConfiguration taskConfiguration) throws TaskNotExecutableException{ Operator operator; SClient sClient; try { String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken()); sClient = dataMinerService.getClient(token); operator = sClient.getOperatorById(taskConfiguration.getTaskId()); } catch (Exception e) { String error = "Error on get Client or the Operator for id: "+taskConfiguration.getTaskId(); logger.error(error, e); throw new TaskNotExecutableException(error); } if (operator == null) { logger.error("Operator not found"); throw new TaskNotExecutableException("Data Miner operator not found"); } try { addParametersToOperator(operator, taskConfiguration.getListParameters()); logger.debug("Start Computation"); ComputationId computationId = sClient.startComputation(operator); logger.debug("Started ComputationId: " + computationId); return loadTaskExecutionStatus(taskConfiguration, DMConverter.toDMComputationId(computationId, System.currentTimeMillis(), null)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } /** * Monitor status. * * @param taskConfiguration the task configuration * @param taskComputation the task computation * @return the task execution status * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ public TaskExecutionStatus monitorStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ TaskExecutionStatus theTaskStatus = getRunningTask(taskConfiguration); if(theTaskStatus==null) throw new TaskErrorException("No Task is running with the configuration: "+taskConfiguration.getConfigurationKey()); TaskExecutionStatus tes = loadTaskExecutionStatus(taskConfiguration, taskComputation); switch (tes.getStatus()) { case COMPLETED: case CANCELLED: case FAILED: logger.info("Removing "+tes+ "from memory"); removeTaskFromMemory(tes.getTaskConfiguration()); break; default: break; } return tes; } /** * Load task execution status. * * @param taskConfiguration the task configuration * @param taskComputation the task computation * @return the task execution status * @throws TaskErrorException the task error exception * @throws TaskNotExecutableException the task not executable exception */ private TaskExecutionStatus loadTaskExecutionStatus(TaskConfiguration taskConfiguration, final TaskComputation taskComputation) throws TaskErrorException, TaskNotExecutableException{ TaskExecutionStatus oldTaskExecutionStatus = getRunningTask(taskConfiguration); // Long startTime = taskComputation.getStartTime(); // if(oldTaskExecutionStatus!=null) // startTime = oldTaskExecutionStatus.getTaskComputation().getStartTime(); //copyng start time from bean saved on server SClient sClient; ComputationId computationId = DMConverter.toComputationId(taskComputation); try { String token = EncrypterUtil.decryptString(taskConfiguration.getMaskedToken()); sClient = dataMinerService.getClient(token); } catch (Exception e) { logger.error("Error on get DM client", e); throw new TaskErrorException("Error on getting DataMiner client, Try later"); } TaskExecutionStatus newTaskExecutionStatus = new TaskExecutionStatus(taskConfiguration, taskComputation); /* if(!isStart) //adding previous log if(oldTaskExecutionStatus.getLog().length()>maxLogSize){ String confs=NEW_LINE+NEW_STATUS_CHARS+Converter.getCurrentFormattedDate(null)+NEW_LINE; confs+="Cancelled previous logs"; newTaskExecutionStatus.addLog(confs); }else newTaskExecutionStatus.addLog(oldTaskExecutionStatus.getLog()); else{ //first time adding the configurations String confs=NEW_LINE+NEW_STATUS_CHARS+Converter.getCurrentFormattedDate(null)+NEW_LINE; confs+="Computation starts with configurations..."; confs+=NEW_LINE+NEW_LINE+DASH+DASH+"Operator Id: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorId(); confs+=NEW_LINE+NEW_LINE+DASH+DASH+"Operator Name: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getOperatorName(); confs+=NEW_LINE+NEW_LINE+DASH+DASH+"EquivalentRequest: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getEquivalentRequest(); confs+=NEW_LINE+NEW_LINE+DASH+DASH+"URL ID: "+NEW_LINE+newTaskExecutionStatus.getTaskComputation().getUrlId(); newTaskExecutionStatus.addLog(confs+NEW_LINE); }*/ logger.debug("Requesting operation progress"); ComputationStatus computationStatus = null; try { computationStatus = sClient.getComputationStatus(computationId); } catch (Exception e) { logger.error(e.getLocalizedMessage()); e.printStackTrace(); } logger.debug("ComputationStatus: " + computationStatus); if (computationStatus == null) { logger.error("ComputationStatus is null"); return newTaskExecutionStatus; } Status status = computationStatus.getStatus(); if (status == null) { logger.error("Status is null"); return newTaskExecutionStatus; } newTaskExecutionStatus.setMessage(computationStatus.getMessage()); newTaskExecutionStatus.setPercentCompleted((float) computationStatus.getPercentage()); //String historyMsg=NEW_LINE+NEW_STATUS_CHARS+Converter.getCurrentFormattedDate(null)+NEW_LINE; String historyMsg=""; switch (status) { case ACCEPTED: logger.debug("Operation "+TaskStatus.ACCEPTED); newTaskExecutionStatus.setStatus(TaskStatus.ACCEPTED); historyMsg+= "Status "+TaskStatus.ACCEPTED; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; case CANCELLED: logger.debug("Operation "+TaskStatus.CANCELLED); newTaskExecutionStatus.setStatus(TaskStatus.CANCELLED); historyMsg+= "Status "+TaskStatus.CANCELLED; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; taskComputation.setEndTime(System.currentTimeMillis()); break; case COMPLETE: logger.debug("Operation "+TaskStatus.COMPLETED); newTaskExecutionStatus.setStatus(TaskStatus.COMPLETED); historyMsg+= "Status "+TaskStatus.COMPLETED; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; taskComputation.setEndTime(System.currentTimeMillis()); break; case FAILED: logger.debug("Operation "+TaskStatus.FAILED); newTaskExecutionStatus.setStatus(TaskStatus.FAILED); historyMsg+= "Status "+TaskStatus.FAILED; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; taskComputation.setEndTime(System.currentTimeMillis()); break; case RUNNING: logger.debug("Operation "+TaskStatus.ONGOING); newTaskExecutionStatus.setStatus(TaskStatus.ONGOING); historyMsg+= "Status "+TaskStatus.ONGOING; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; default: newTaskExecutionStatus.setStatus(TaskStatus.INITIALIZING); historyMsg+= "Status "+TaskStatus.INITIALIZING; historyMsg+=computationStatus.getMessage()!=null?": "+computationStatus.getMessage():""; break; } newTaskExecutionStatus.setMessage(historyMsg); saveTaskInMemory(taskConfiguration, newTaskExecutionStatus); return newTaskExecutionStatus; } /** * Retrieve output. * * @param computationId the computation id * @param sClient the s client * @return the string */ private String retrieveOutput(ComputationId computationId, SClient sClient) { StringBuilder builder = new StringBuilder(); try { OutputData output = sClient.getOutputDataByComputationId(computationId); logger.debug("Output: " + output); builder.append("Output: " + output); Resource resource = output.getResource(); if (resource.isMap()) { MapResource mapResource = (MapResource) resource; for (String key : mapResource.getMap().keySet()) { logger.debug("Entry: " + key + " = "+ mapResource.getMap().get(key)); builder.append("Entry: " + key + " = "+ mapResource.getMap().get(key)); } } else { } } catch (Exception e) { logger.error(e.getLocalizedMessage()); e.printStackTrace(); } return builder.toString(); } /** * Adds the parameters to operator. * * @param operator the operator * @param parameters the parameters * @return the operator */ private Operator addParametersToOperator(Operator operator, List parameters) { logger.debug("Adding parameters to operator"); List listParameters = new ArrayList(); for (TaskParameter taskParameter : parameters) { if(taskParameter.getType()==null) continue; Parameter dmParameter = DMConverter.toDMParameter(taskParameter); if(dmParameter!=null) listParameters.add(dmParameter); } logger.debug("Parameters list is: " + listParameters); operator.setOperatorParameters(listParameters); return operator; } }