diff --git a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java index 44b808b..5357089 100644 --- a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java +++ b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/DataMinerAccessPoint.java @@ -10,21 +10,21 @@ import java.util.List; import java.util.Map; import org.gcube.common.workspacetaskexecutor.shared.BaseTaskConfiguration; -import org.gcube.common.workspacetaskexecutor.shared.Status; +import org.gcube.common.workspacetaskexecutor.shared.TaskStatus; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskConfiguration; import org.gcube.common.workspacetaskexecutor.shared.dataminer.TaskExecutionStatus; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskErrorException; import org.gcube.common.workspacetaskexecutor.shared.exception.TaskNotExecutableException; import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService; import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient; -import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor; -import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener; import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData; import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource; import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource; import org.gcube.data.analysis.dataminermanagercl.shared.parameters.FileParameter; import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter; +import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus; +import org.gcube.data.analysis.dataminermanagercl.shared.process.ComputationStatus.Status; import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +118,7 @@ public class DataMinerAccessPoint { Operator operator; SClient sClient; try { - sClient = dataMinerService.getClient(); + sClient = dataMinerService.getClient(algConfiguration.getToken()); operator = sClient.getOperatorById(algConfiguration.getTaskId()); } catch (Exception e) { @@ -180,67 +180,70 @@ public class DataMinerAccessPoint { */ private TaskExecutionStatus startComputationOnDataMiner(BaseTaskConfiguration algConfiguration, final ComputationId computationId, final SClient sClient){ - TaskExecutionStatus status = new TaskExecutionStatus(algConfiguration, computationId); + TaskExecutionStatus taskExecutiontatus = new TaskExecutionStatus(algConfiguration, computationId); - DMMonitorListener listener = new DMMonitorListener() { + logger.debug("Requesting operation progress"); + ComputationStatus computationStatus = null; + try { + computationStatus = sClient.getComputationStatus(computationId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + e.printStackTrace(); - @Override - public void running(double percentage) { - logger.debug("Operation Running: " + percentage); - status.setStatus(Status.ONGOING); - status.setPercentCompleted((float) percentage); - status.setCurrentMessage("Operation Running..."); - saveTask(algConfiguration, status); + } + logger.debug("ComputationStatus: " + computationStatus); + if (computationStatus == null) { + logger.error("ComputationStatus is null"); + return taskExecutiontatus; + } - } + Status status = computationStatus.getStatus(); + if (status == null) { + logger.error("Status is null"); + return taskExecutiontatus; + } - @Override - public void failed(String message, Exception exception) { - logger.error("Operation Failed"); - logger.error(message, exception); - status.setStatus(Status.FAILED); - status.setCurrentMessage(message); - status.addLog(exception.getMessage()); - saveTask(algConfiguration, status); + taskExecutiontatus.setCurrentMessage(computationStatus.getMessage()); - } + switch (status) { + case ACCEPTED: + logger.debug("Operation "+TaskStatus.ACCEPTED); + taskExecutiontatus.setStatus(TaskStatus.ACCEPTED); + taskExecutiontatus.addLog("Status "+TaskStatus.ACCEPTED+": "+computationStatus.getMessage()); + saveTask(algConfiguration, taskExecutiontatus); + break; + case CANCELLED: + logger.debug("Operation "+TaskStatus.CANCELLED); + taskExecutiontatus.setStatus(TaskStatus.CANCELLED); + taskExecutiontatus.addLog("Status "+TaskStatus.CANCELLED+": "+computationStatus.getMessage()); + saveTask(algConfiguration, taskExecutiontatus); + break; + case COMPLETE: + logger.debug("Operation "+TaskStatus.COMPLETED); + taskExecutiontatus.setStatus(TaskStatus.COMPLETED); + taskExecutiontatus.addLog("Status "+TaskStatus.COMPLETED+": "+computationStatus.getMessage()); + saveTask(algConfiguration, taskExecutiontatus); + break; + case FAILED: + logger.debug("Operation "+TaskStatus.FAILED); + taskExecutiontatus.setStatus(TaskStatus.FAILED); + taskExecutiontatus.setCurrentMessage(computationStatus.getError().getMessage()); + taskExecutiontatus.addLog("Status "+TaskStatus.FAILED+": "+computationStatus.getMessage()); + saveTask(algConfiguration, taskExecutiontatus); + break; + case RUNNING: + logger.debug("Operation "+TaskStatus.ONGOING); + taskExecutiontatus.setStatus(TaskStatus.ONGOING); + taskExecutiontatus.addLog("Status "+TaskStatus.ONGOING+": "+computationStatus.getMessage()); + taskExecutiontatus.setPercentCompleted((float) computationStatus.getPercentage()); + saveTask(algConfiguration, taskExecutiontatus); + break; + default: + break; - @Override - public void complete(double percentage) { - logger.debug("Operation Completed"); - logger.debug("Perc: " + percentage); - String log = retrieveOutput(computationId, sClient); - status.setStatus(Status.COMPLETED); - status.setCurrentMessage("Operation Completed"); - status.addLog(log); - saveTask(algConfiguration, status); + } - } - - @Override - public void cancelled() { - logger.debug("Operation Cancelled"); - status.setStatus(Status.COMPLETED); - status.setCurrentMessage("Operation Cancelled"); - status.addLog("Operation Cancelled"); - saveTask(algConfiguration, status); - } - - @Override - public void accepted() { - logger.debug("Operation Accepted"); - status.setStatus(Status.ACCEPTED); - status.setCurrentMessage("Operation Accepted"); - status.addLog("Operation Accepted"); - saveTask(algConfiguration, status); - } - }; - - DMMonitor dmMonitor = new DMMonitor(computationId, sClient); - dmMonitor.add(listener); - dmMonitor.start(); - - return status; + return taskExecutiontatus; } diff --git a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java index 029cc5a..2538e61 100644 --- a/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java +++ b/src/main/java/org/gcube/common/workspacetaskexecutor/dataminer/WorkspaceDataMinerTaskExecutor.java @@ -31,10 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference; */ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask, ExecutableItem{ - /** - * - */ - public static final String FIELD_CONFIGURATION_KEY = "configurationKey"; + /** The logger. */ private static Logger logger = LoggerFactory.getLogger(WorkspaceDataMinerTaskExecutor.class); @@ -141,7 +138,7 @@ public class WorkspaceDataMinerTaskExecutor implements ExecutableTask