diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/history/WorkflowLogger.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/history/WorkflowLogger.java index c1dd8fd8..6212f9a7 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/history/WorkflowLogger.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/history/WorkflowLogger.java @@ -40,8 +40,8 @@ public class WorkflowLogger { return wfProcessExecutionRepository.findById(processId).get(); } - public void saveProcessExecution(final WfProcessExecution proc) { - wfProcessExecutionRepository.save(proc); + public void saveProcessExecution(final WfProcessExecution pe) { + wfProcessExecutionRepository.save(pe); } public Optional getLastExecutionForInstance(final String id) { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java index 0eec4f58..3551d986 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java @@ -17,15 +17,12 @@ import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; -import eu.dnetlib.utils.DateUtils; @Service public class ScheduledWorkflowLauncher { private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class); - private static final DateUtils dateUtils = new DateUtils(); - @Autowired private WorkflowExecutor workflowExecutor; @@ -54,7 +51,7 @@ public class ScheduledWorkflowLauncher { .filter(this::isReady) .forEach(instance -> { try { - workflowExecutor.startWorkflowInstance(instance, null, null); + workflowExecutor.startWorkflowInstance(instance, null, null, null); } catch (final Exception e) { log.error("Error launching scheduled wf instance: " + instance.getId(), e); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java index e74cff41..212fd659 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java @@ -13,9 +13,12 @@ public abstract class AbstractJobNode extends ProcessNode { protected void doExecute(final Token token) { try { log.debug("START NODE: " + getBeanName()); + setProgressMessage(getNodeName()); + beforeStart(token); execute(token.getEnv()); beforeCompleted(token); + log.debug("END NODE (SUCCESS): " + getBeanName()); } catch (final Throwable e) { log.error("got exception while executing workflow node", e); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java index 8b9f758a..f642d589 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java @@ -9,8 +9,6 @@ import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; -import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; -import eu.dnetlib.manager.wf.workflows.util.SubWorkflowProgressProvider; /** * Created by michele on 18/11/15. @@ -33,28 +31,23 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { public final void execute(final Token token) { try { - final String procId = executor.startWorkflowInstance(getWfInstanceId(), new ProcessCallback() { - - @Override - public void onSuccess() { - log.debug("Child workflow has been completed successfully"); - token.release(); - } - - @Override - public void onFail() { - log.error("Child workflow is failed"); - token.releaseAsFailed("Child workflow is failed"); - } - }, process.getWfInstanceId()); + final String procId = executor.startWorkflowInstance(getWfInstanceId(), process.getWfInstanceId(), (proc) -> { + log.debug("Child workflow has been completed successfully"); + token.release(); + }, (proc) -> { + log.error("Child workflow is failed"); + token.releaseAsFailed("Child workflow is failed"); + }); if (log.isDebugEnabled()) { log.debug("The child workflow [instance: " + getWfInstanceId() + "] is starting with procId: " + procId); } - token.setProgressProvider(new SubWorkflowProgressProvider(procId, processRegistry)); + setProgressMessage("Launched sub workflow, proc: " + procId); - } catch (final Throwable e) { + } catch ( + + final Throwable e) { log.error("got exception while launching child workflow", e); token.releaseAsFailed(e); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java index 5052bb31..48664307 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java @@ -13,6 +13,8 @@ public abstract class ProcessNode implements BeanNameAware { private String nodeName; + private String progressMessage; + public abstract void execute(final Token token); public String getBeanName() { @@ -36,4 +38,12 @@ public abstract class ProcessNode implements BeanNameAware { public String toString() { return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName); } + + public String getProgressMessage() { + return progressMessage; + } + + public void setProgressMessage(final String progressMessage) { + this.progressMessage = progressMessage; + } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java index 50eead65..3755628a 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java @@ -4,11 +4,9 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -17,14 +15,10 @@ import org.springframework.stereotype.Service; import com.google.common.base.Throwables; import eu.dnetlib.manager.history.WorkflowLogger; -import eu.dnetlib.manager.history.model.WfProcessExecution; import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.notification.EmailSender; import eu.dnetlib.manager.wf.workflows.graph.GraphNode; -import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess.Status; import eu.dnetlib.manager.wf.workflows.util.NodeHelper; -import eu.dnetlib.manager.wf.workflows.util.NodeTokenCallback; -import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; @Service public class ProcessEngine { @@ -51,8 +45,7 @@ public class ProcessEngine { try { for (final GraphNode node : process.getGraph().startNodes()) { final ProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv()); - final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); - + final Token token = prepareNewToken(process, node); token.getEnv().addAttributes(process.getEnv().getAttributes()); process.getTokens().add(token); @@ -79,7 +72,7 @@ public class ProcessEngine { list.add(oldToken); if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) { - final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); + final Token token = prepareNewToken(process, node); token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()]))); final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); @@ -87,13 +80,13 @@ public class ProcessEngine { process.setLastActivityDate(LocalDateTime.now()); if (node.isSucessNode()) { - markAsCompleted(process, token); + completeProcess(process, token); } else { pNode.execute(token); } } } else { - final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); + final Token token = prepareNewToken(process, node); token.getEnv().addAttributes(oldToken.getEnv().getAttributes()); final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); @@ -112,19 +105,10 @@ public class ProcessEngine { } - private NodeTokenCallback newNodeTokenCallback(final WorkflowProcess process, final GraphNode node) { - return new NodeTokenCallback() { - - @Override - public void onSuccess(final Token token) { - releaseToken(process, node, token); - } - - @Override - public void onFail(final Token token) { - completeProcess(process, token); - } - }; + private Token prepareNewToken(final WorkflowProcess process, final GraphNode node) { + return new Token( + token -> releaseToken(process, node, token), + token -> completeProcess(process, token)); } private Map mergeEnvParams(final Token... tokens) { @@ -133,71 +117,11 @@ public class ProcessEngine { return map; } - private void markAsCompleted(final WorkflowProcess process, final Token token) { - completeProcess(process, token); - } - private void completeProcess(final WorkflowProcess process, final Token token) { - if (token.isActive()) { - if (StringUtils.isNotBlank(token.getError())) { - token.releaseAsFailed(token.getError()); - } else { - token.release(); - } - } - - final LocalDateTime now = token.getEndDate(); - - process.setLastActivityDate(now); - process.setEndDate(now); - process.setStatus(token.isFailed() ? WorkflowProcess.Status.FAILURE : WorkflowProcess.Status.SUCCESS); - - if (token.isFailed()) { - process.setStatus(Status.FAILURE); - process.setError(token.getError()); - process.setErrorStacktrace(token.getErrorStackTrace()); - process.setLastActivityDate(LocalDateTime.now()); - } - - if (process.getCallback() != null) { - if (token.isFailed()) { - process.getCallback().onFail(); - } else { - process.getCallback().onSuccess(); - } - } - - final Map details = new LinkedHashMap<>(); - details.putAll(process.getOutputParams()); - details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + process.getPriority()); - details.put(WorkflowsConstants.LOG_WF_ID, process.getWfId()); - details.put(WorkflowsConstants.LOG_WF_ID, process.getWfInstanceId()); - - if (process.getError() != null) { - details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError()); - details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace()); - } - - final WfProcessExecution pe = new WfProcessExecution(); - pe.setProcessId(process.getId()); - pe.setName(process.getName()); - pe.setFamily(process.getFamily()); - - pe.setDsId(process.getDsId()); - pe.setDsName(process.getDsName()); - pe.setDsApi(process.getDsInterface()); - - pe.setStartDate(process.getStartDate()); - pe.setEndDate(process.getEndDate()); - - pe.setStatus(process.getStatus().toString()); - - pe.setDetails(details); - - wfLogger.saveProcessExecution(pe); - + token.checkStatus(); + process.complete(token); + wfLogger.saveProcessExecution(process.asLog()); emailSender.sendMails(process); - } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java index c7c07ace..c6479f17 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java @@ -4,6 +4,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -16,7 +17,6 @@ import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; -import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @Component public class ProcessFactory { @@ -33,8 +33,9 @@ public class ProcessFactory { public WorkflowProcess newProcess(final SimpleResource wfMetadata, final WorkflowGraph wfGraph, final WorkflowInstance wfInstance, - final ProcessCallback processCallback, - final String parent) throws WorkflowManagerException { + final String parent, + final Consumer onSuccess, + final Consumer onFail) throws WorkflowManagerException { final Map globalParams = new HashMap<>(); globalParams.putAll(wfInstance.getSystemParams()); @@ -42,7 +43,7 @@ public class ProcessFactory { final Graph graph = graphLoader.loadGraph(wfGraph, globalParams); - return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, processCallback, parent); + return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, parent, onSuccess, onFail); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java index 3b565dc1..c4635939 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java @@ -2,34 +2,36 @@ package eu.dnetlib.manager.wf.workflows.procs; import java.time.LocalDateTime; import java.util.UUID; +import java.util.function.Consumer; + +import org.apache.commons.lang3.StringUtils; import com.google.common.base.Throwables; -import eu.dnetlib.manager.wf.workflows.util.NodeTokenCallback; -import eu.dnetlib.manager.wf.workflows.util.ProgressProvider; - /** * Created by michele on 19/11/15. */ public class Token { private final String id; - private final String nodeName; private final Env env = new Env(); private final LocalDateTime startDate; - private final NodeTokenCallback callback; - private boolean failed = false; private LocalDateTime endDate = LocalDateTime.MIN; + + private final Consumer onSuccess; + private final Consumer onFail; + + private boolean failed = false; + private boolean active = true; private String error = ""; private String errorStackTrace = ""; - private ProgressProvider progressProvider; - public Token(final String nodeName, final NodeTokenCallback callback) { + public Token(final Consumer onSuccess, final Consumer onFail) { this.id = "token-" + UUID.randomUUID(); - this.nodeName = nodeName; this.startDate = LocalDateTime.now(); - this.callback = callback; + this.onSuccess = onSuccess; + this.onFail = onFail; } public String getId() { @@ -71,8 +73,8 @@ public class Token { public void release() { setEndDate(LocalDateTime.now()); setActive(false); - if (this.callback != null) { - this.callback.onSuccess(this); + if (this.onSuccess != null) { + this.onSuccess.accept(this); } } @@ -82,8 +84,8 @@ public class Token { setFailed(true); setError(e.getMessage()); setErrorStackTrace(Throwables.getStackTraceAsString(e)); - if (this.callback != null) { - this.callback.onFail(this); + if (this.onFail != null) { + this.onFail.accept(this); } } @@ -92,13 +94,19 @@ public class Token { setActive(false); setFailed(true); setError(error); - if (this.callback != null) { - this.callback.onFail(this); + if (this.onFail != null) { + this.onFail.accept(this); } } - public String getNodeName() { - return this.nodeName; + public void checkStatus() { + if (isActive()) { + if (StringUtils.isNotBlank(error)) { + releaseAsFailed(error); + } else { + release(); + } + } } public String getError() { @@ -117,12 +125,4 @@ public class Token { this.errorStackTrace = errorStackTrace; } - public ProgressProvider getProgressProvider() { - return this.progressProvider; - } - - public void setProgressProvider(final ProgressProvider progressProvider) { - this.progressProvider = progressProvider; - } - } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java index e83add35..dc1a727c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java @@ -5,6 +5,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.annotation.PostConstruct; @@ -23,7 +24,6 @@ import eu.dnetlib.is.resource.repository.SimpleResourceRepository; import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; -import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; import eu.dnetlib.utils.Stoppable; import eu.dnetlib.utils.StoppableDetails; @@ -63,7 +63,12 @@ public class WorkflowExecutor implements Stoppable { }, 10, 10, TimeUnit.SECONDS); } - public String startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, final ProcessCallback processCallback, final String parent) + public String startRepoHiWorkflow(final String wfId, + final String dsId, + final String apiId, + final String parent, + final Consumer onSuccess, + final Consumer onFail) throws WorkflowManagerException { if (isPaused()) { @@ -91,13 +96,16 @@ public class WorkflowExecutor implements Stoppable { instance.setSystemParams(new HashMap<>()); instance.setUserParams(new HashMap<>()); - return startWorkflowInstance(instance, processCallback, parent); + return startWorkflowInstance(instance, parent, onSuccess, onFail); } catch (final DsmException e) { throw new WorkflowManagerException("Invalid datasource: " + dsId, e); } } - public String startWorkflowInstance(final String wfInstanceId, final ProcessCallback processCallback, final String parent) throws Exception { + public String startWorkflowInstance(final String wfInstanceId, + final String parent, + final Consumer onSuccess, + final Consumer onFail) throws Exception { if (isPaused()) { log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown"); @@ -106,10 +114,13 @@ public class WorkflowExecutor implements Stoppable { final WorkflowInstance instance = workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId)); - return startWorkflowInstance(instance, processCallback, parent); + return startWorkflowInstance(instance, parent, onSuccess, onFail); } - public String startWorkflowInstance(final WorkflowInstance wfInstance, final ProcessCallback processCallback, final String parent) + public String startWorkflowInstance(final WorkflowInstance wfInstance, + final String parent, + final Consumer onSuccess, + final Consumer onFail) throws WorkflowManagerException { if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) { @@ -134,7 +145,7 @@ public class WorkflowExecutor implements Stoppable { .orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId())); final WorkflowProcess process = - processFactory.newProcess(wfMetadata, wfGraph, wfInstance, processCallback, parent); + processFactory.newProcess(wfMetadata, wfGraph, wfInstance, parent, onSuccess, onFail); return processRegistry.registerProcess(process, wfInstance); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java index a62bc6bd..2b7f09a8 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java @@ -2,16 +2,19 @@ package eu.dnetlib.manager.wf.workflows.procs; import java.time.LocalDateTime; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import org.apache.commons.lang3.math.NumberUtils; import eu.dnetlib.is.model.resource.SimpleResource; +import eu.dnetlib.manager.history.model.WfProcessExecution; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.graph.Graph; -import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; +import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; /** * Created by michele on 19/11/15. @@ -36,7 +39,8 @@ public class WorkflowProcess implements Comparable { private final SimpleResource wfMetadata; private final WorkflowInstance wfInstance; private final Graph graph; - private final ProcessCallback callback; + private final Consumer onSuccess; + private final Consumer onFail; private final Env env; private final List tokens = new CopyOnWriteArrayList<>(); private LocalDateTime lastActivityDate; @@ -56,13 +60,15 @@ public class WorkflowProcess implements Comparable { final WorkflowInstance wfInstance, final Graph graph, final Map globalParams, - final ProcessCallback callback, - final String parentProfileId) { + final String parentProfileId, + final Consumer onSuccess, + final Consumer onFail) { this.id = id; this.wfMetadata = wfMetadata; this.wfInstance = wfInstance; this.graph = graph; - this.callback = callback; + this.onSuccess = onSuccess; + this.onFail = onFail; this.status = Status.CREATED; this.env = new Env(); this.globalParams = globalParams; @@ -134,10 +140,6 @@ public class WorkflowProcess implements Comparable { setStatus(Status.KILLED); } - public ProcessCallback getCallback() { - return callback; - } - public boolean isTerminated() { switch (status) { case SUCCESS: @@ -211,4 +213,56 @@ public class WorkflowProcess implements Comparable { return parentProfileId; } + public void complete(final Token token) { + final LocalDateTime now = token.getEndDate(); + setLastActivityDate(now); + setEndDate(now); + setStatus(token.isFailed() ? WorkflowProcess.Status.FAILURE : WorkflowProcess.Status.SUCCESS); + + if (token.isFailed()) { + setStatus(Status.FAILURE); + setError(token.getError()); + setErrorStacktrace(token.getErrorStackTrace()); + setLastActivityDate(LocalDateTime.now()); + } + + if (token.isFailed() && onFail != null) { + onFail.accept(this); + } + if (!token.isFailed() && onSuccess != null) { + onSuccess.accept(this);; + } + + } + + public WfProcessExecution asLog() { + final Map details = new LinkedHashMap<>(); + details.putAll(getOutputParams()); + details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + getPriority()); + details.put(WorkflowsConstants.LOG_WF_ID, getWfId()); + details.put(WorkflowsConstants.LOG_WF_ID, getWfInstanceId()); + + if (getError() != null) { + details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, getError()); + details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, getErrorStacktrace()); + } + + final WfProcessExecution pe = new WfProcessExecution(); + pe.setProcessId(getId()); + pe.setName(getName()); + pe.setFamily(getFamily()); + + pe.setDsId(getDsId()); + pe.setDsName(getDsName()); + pe.setDsApi(getDsInterface()); + + pe.setStartDate(getStartDate()); + pe.setEndDate(getEndDate()); + pe.setStatus(getStatus().toString()); + + pe.setDetails(details); + + return pe; + } + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java deleted file mode 100644 index 7496186b..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java +++ /dev/null @@ -1,13 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -import eu.dnetlib.manager.wf.workflows.procs.Token; - -/** - * Created by michele on 26/11/15. - */ -public interface NodeTokenCallback { - - void onSuccess(Token token); - - void onFail(Token token); -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java deleted file mode 100644 index 4f70b5da..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java +++ /dev/null @@ -1,12 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -/** - * Created by michele on 18/11/15. - */ -public interface ProcessCallback { - - void onSuccess(); - - void onFail(); - -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java deleted file mode 100644 index d9f84f45..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java +++ /dev/null @@ -1,6 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -public interface ProgressProvider { - - String getProgressDescription(); -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java deleted file mode 100644 index 499349a3..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -import java.util.List; -import java.util.stream.Collectors; - -import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; -import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; - -public class SubWorkflowProgressProvider implements ProgressProvider { - - private final String procId; - private final ProcessRegistry processRegistry; - - public SubWorkflowProgressProvider(final String procId, final ProcessRegistry processRegistry) { - super(); - this.procId = procId; - this.processRegistry = processRegistry; - } - - @Override - public String getProgressDescription() { - final WorkflowProcess proc = this.processRegistry.findProcess(this.procId); - - if (proc == null) { return "-"; } - - final List list = proc.getTokens() - .stream() - .filter(t -> t.isActive()) - .map(t -> t.getProgressProvider() != null ? String.format("%s (%s)", t.getNodeName(), t.getProgressProvider().getProgressDescription()) - : t.getNodeName()) - .collect(Collectors.toList()); - if (!list.isEmpty()) { return list.stream().collect(Collectors.joining(", ")); } - - return "-"; - } - -}