diff --git a/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/RuntimeEnv.java b/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/RuntimeEnv.java new file mode 100644 index 0000000..18c9253 --- /dev/null +++ b/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/RuntimeEnv.java @@ -0,0 +1,60 @@ +package eu.dnetlib.wfs.procs; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by michele on 19/11/15. + */ +public class RuntimeEnv { + + private final Map attributes = new HashMap<>(); + private Throwable error; + + public RuntimeEnv() { + this.error = null; + } + + public Map getAttributes() { + return this.attributes; + } + + public void clearEnv() { + this.attributes.clear(); + } + + public void addAttributes(final Map map) { + if (map != null) { + this.attributes.putAll(map); + } + } + + public void setAttribute(final String name, final Object value) { + this.attributes.put(name, value); + } + + public Object getAttribute(final String name) { + return this.attributes.get(name); + } + + public T getAttribute(final String name, final Class clazz) { + return clazz.cast(this.attributes.get(name)); + } + + public boolean hasAttribute(final String name) { + return this.attributes.containsKey(name); + } + + public Object removeAttribute(final String name) { + return this.attributes.remove(name); + } + + public Throwable getError() { + return this.error; + } + + public void setError(final Throwable error) { + this.error = error; + } + +} diff --git a/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/Token.java b/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/Token.java deleted file mode 100644 index 8cb3c10..0000000 --- a/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/procs/Token.java +++ /dev/null @@ -1,119 +0,0 @@ -package eu.dnetlib.wfs.procs; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -/** - * Created by michele on 19/11/15. - */ -public class Token { - - private final String id; - private final Map env = new HashMap<>(); - - private String progressMessage; - private final LocalDateTime startDate; - private LocalDateTime endDate; - - private boolean active; - - private Throwable error; - - public Token() { - id = "token-" + UUID.randomUUID(); - startDate = LocalDateTime.now(); - active = true; - error = null; - } - - public String getId() { - return id; - } - - public Map getEnv() { - return env; - } - - public void clearEnv() { - env.clear(); - } - - public void addEnvAttributes(final Map map) { - if (map != null) { - env.putAll(map); - } - } - - public void setEnvAttribute(final String name, final Object value) { - env.put(name, value); - } - - public Object getEnvAttribute(final String name) { - return env.get(name); - } - - public T getEnvAttribute(final String name, final Class clazz) { - return clazz.cast(env.get(name)); - } - - public boolean hasEnvAttribute(final String name) { - return env.containsKey(name); - } - - public Object removeEnvAttribute(final String name) { - return env.remove(name); - } - - public LocalDateTime getStartDate() { - return startDate; - } - - public LocalDateTime getEndDate() { - return endDate; - } - - public void setEndDate(final LocalDateTime endDate) { - this.endDate = endDate; - } - - public boolean isActive() { - return active; - } - - public void setActive(final boolean active) { - this.active = active; - } - - public boolean isFailed() { - return error != null; - } - - public void release() { - setEndDate(LocalDateTime.now()); - setActive(false); - } - - public void releaseAsFailed(final Throwable e) { - setError(e); - release(); - } - - public String getProgressMessage() { - return progressMessage; - } - - public void setProgressMessage(final String progressMessage) { - this.progressMessage = progressMessage; - } - - public Throwable getError() { - return error; - } - - public void setError(final Throwable error) { - this.error = error; - } - -} diff --git a/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/utils/GraphUtils.java b/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/utils/GraphUtils.java index c66393a..304892f 100644 --- a/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/utils/GraphUtils.java +++ b/dnet-app/libs/dnet-wf-common/src/main/java/eu/dnetlib/wfs/utils/GraphUtils.java @@ -21,9 +21,8 @@ import eu.dnetlib.domain.wfs.graph.Node; import eu.dnetlib.domain.wfs.graph.runtime.RuntimeArc; import eu.dnetlib.domain.wfs.graph.runtime.RuntimeGraph; import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNode; -import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNodeStatus; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.wfs.procs.Token; +import eu.dnetlib.wfs.procs.RuntimeEnv; public class GraphUtils { @@ -122,11 +121,11 @@ public class GraphUtils { .count(); } - public static Set nextNodes(final RuntimeGraph graph, final RuntimeNode current, final Token token) { + public static Set nextNodes(final RuntimeGraph graph, final RuntimeNode current, final RuntimeEnv env) { final List arcs = graph.getArcs() .stream() .filter(arc -> StringUtils.equals(arc.getFrom(), current.getName())) - .filter(arc -> isValidArc(arc, token)) + .filter(arc -> isValidArc(arc, env)) .toList(); final Set res = new HashSet<>(); @@ -139,31 +138,22 @@ public class GraphUtils { return res; } - private static boolean isValidArc(final RuntimeArc arc, final Token token) { - final Function condFunction = generateFunction(arc.getCondition()); - if (condFunction != null) { return condFunction.apply(token); } + private static boolean isValidArc(final RuntimeArc arc, final RuntimeEnv env) { + final Function condFunction = generateFunction(arc.getCondition()); + if (condFunction != null) { return condFunction.apply(env); } return true; } - private static Function generateFunction(final String condition) { - if (StringUtils.isBlank(condition)) { return token -> true; } + private static Function generateFunction(final String condition) { + if (StringUtils.isBlank(condition)) { return env -> true; } return token -> { final ExpressionParser parser = new SpelExpressionParser(); - final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv()); + final StandardEvaluationContext context = new StandardEvaluationContext(token.getAttributes()); context.addPropertyAccessor(new MapAccessor()); return parser.parseExpression(condition).getValue(context, Boolean.class); }; } - public static void prepareNewExecution(final RuntimeNode graphNode) { - graphNode.setStatus(RuntimeNodeStatus.running); - graphNode.setnExecutions(graphNode.getnExecutions() + 1); - } - - public static void completeExecution(final RuntimeNode graphNode, final boolean success) { - graphNode.setStatus(success ? RuntimeNodeStatus.completed : RuntimeNodeStatus.failed); - } - } diff --git a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/nodes/ProcessNode.java b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/nodes/ProcessNode.java index 1d81d05..646fe7c 100644 --- a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/nodes/ProcessNode.java +++ b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/nodes/ProcessNode.java @@ -14,12 +14,13 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanNameAware; import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNode; +import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNodeStatus; import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.wfs.annotations.WfInputParam; import eu.dnetlib.wfs.annotations.WfNode; import eu.dnetlib.wfs.annotations.WfOutputParam; import eu.dnetlib.wfs.procs.ProcessEngine; -import eu.dnetlib.wfs.procs.Token; +import eu.dnetlib.wfs.procs.RuntimeEnv; import eu.dnetlib.wfs.procs.WorkflowProcess; import eu.dnetlib.wfs.utils.NodeCallback; @@ -37,7 +38,10 @@ public abstract class ProcessNode implements BeanNameAware { private ProcessEngine engine; - public void execute(final Token token, final Map params) { + public void execute(final RuntimeEnv env, final Map params) { + + this.graphNode.setnExecutions(this.graphNode.getnExecutions() + 1); + this.graphNode.setStatus(RuntimeNodeStatus.running); try { initInputParams(params); @@ -45,45 +49,47 @@ public abstract class ProcessNode implements BeanNameAware { final NodeCallback callback = new NodeCallback() { @Override - public void onSuccess(final Token t) { + public void onSuccess(final RuntimeEnv env) { try { - saveOutputParams(t); - ProcessNode.this.engine.releaseToken(ProcessNode.this.process, ProcessNode.this.graphNode, t); + saveOutputParams(env); + ProcessNode.this.engine.releaseToken(ProcessNode.this.process, ProcessNode.this.graphNode, env); } catch (final WorkflowManagerException e) { - onFail(t, e); + onFail(env, e); } } @Override - public void onFail(final Token t, final Throwable e) { + public void onFail(final RuntimeEnv env, final Throwable e) { log.debug("FAILURE IN NODE " + getNodeName()); - token.releaseAsFailed(e); - ProcessNode.this.engine.completeProcess(ProcessNode.this.process, t); + env.setError(e); + ProcessNode.this.engine.completeProcess(ProcessNode.this.process, env); } }; if (isAsync()) { - Executors.newSingleThreadExecutor().execute(() -> execute(token, callback)); + Executors.newSingleThreadExecutor().execute(() -> execute(env, callback)); } else { - execute(token, callback); + execute(env, callback); } } catch (final Throwable e) { - token.releaseAsFailed(e); - this.engine.completeProcess(this.process, token); + env.setError(e); + this.graphNode.setStatus(RuntimeNodeStatus.failed); + this.engine.completeProcess(this.process, env); } } - private final void execute(final Token token, final NodeCallback callback) { + private final void execute(final RuntimeEnv env, final NodeCallback callback) { try { log.debug("START NODE: " + getBeanName()); - token.setProgressMessage(getNodeName()); execute(); - callback.onSuccess(token); + this.graphNode.setStatus(RuntimeNodeStatus.completed); + callback.onSuccess(env); log.debug("END NODE (SUCCESS): " + getBeanName()); } catch (final Throwable e) { log.error("got exception while executing workflow node", e); log.debug("END NODE (FAILED): " + getBeanName()); - callback.onFail(token, e); + this.graphNode.setStatus(RuntimeNodeStatus.failed); + callback.onFail(env, e); } } @@ -117,7 +123,7 @@ public abstract class ProcessNode implements BeanNameAware { } } - private void saveOutputParams(final Token token) throws WorkflowManagerException { + private void saveOutputParams(final RuntimeEnv env) throws WorkflowManagerException { final Map outputToEnv = this.graphNode.getOutputEnvMap(); @@ -142,13 +148,13 @@ public abstract class ProcessNode implements BeanNameAware { if (!optional && (value == null)) { throw new WorkflowManagerException("NULL value in OUTPUT parameter: " + field.getName()); } - token.setEnvAttribute(envParam, value); + env.setAttribute(envParam, value); log.debug("SAVED ENV PARAMETER " + envParam + "=" + value); } if (log.isDebugEnabled()) { - token.getEnv().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v)); + env.getAttributes().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v)); } } @@ -251,4 +257,8 @@ public abstract class ProcessNode implements BeanNameAware { return fields; } + protected void updateProcessMessage(final String message) { + this.graphNode.setProgressMessage(message); + } + } diff --git a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/ProcessEngine.java b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/ProcessEngine.java index 269365e..4ae4745 100644 --- a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/ProcessEngine.java +++ b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/ProcessEngine.java @@ -206,8 +206,8 @@ public class ProcessEngine { try { for (final RuntimeNode graphNode : GraphUtils.startNodes(process.getJobDetails().getGraph())) { - final Token token = process.newToken(process.getJobDetails().getInputParams()); - executeNode(process, graphNode, token); + final RuntimeEnv env = process.newEnv(process.getJobDetails().getInputParams()); + executeNode(process, graphNode, env); } } catch (final Throwable e) { log.error("WorkflowProcess node instantiation failed", e); @@ -216,30 +216,30 @@ public class ProcessEngine { } } - public void releaseToken(final WorkflowProcess process, final RuntimeNode oldGraphNode, final Token oldToken) { + public void releaseToken(final WorkflowProcess process, final RuntimeNode oldGraphNode, final RuntimeEnv oldEnv) { try { - for (final RuntimeNode graphNode : GraphUtils.nextNodes(process.getJobDetails().getGraph(), oldGraphNode, oldToken)) { + for (final RuntimeNode graphNode : GraphUtils.nextNodes(process.getJobDetails().getGraph(), oldGraphNode, oldEnv)) { if (graphNode.isJoin() || graphNode.isSuccessNode()) { if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) { process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<>()); } - final List list = process.getPausedJoinNodeTokens().get(graphNode.getName()); + final List list = process.getPausedJoinNodeTokens().get(graphNode.getName()); - list.add(oldToken); + list.add(oldEnv); if (list.size() == GraphUtils.getNumberOfIncomingArcs(process.getJobDetails().getGraph(), graphNode)) { - final Token token = process.newToken(mergeEnvParams(list.toArray(new Token[list.size()]))); + final RuntimeEnv env = process.newEnv(mergeEnvParams(list.toArray(new RuntimeEnv[list.size()]))); - executeNode(process, graphNode, token); + executeNode(process, graphNode, env); if (graphNode.isSuccessNode()) { - completeProcess(process, token); + completeProcess(process, env); } } } else { - executeNode(process, graphNode, process.newToken(oldToken.getEnv())); + executeNode(process, graphNode, process.newEnv(oldEnv.getAttributes())); } } @@ -250,8 +250,7 @@ public class ProcessEngine { } } - private void executeNode(final WorkflowProcess process, final RuntimeNode graphNode, final Token token) throws WorkflowManagerException { - GraphUtils.prepareNewExecution(graphNode); + private void executeNode(final WorkflowProcess process, final RuntimeNode graphNode, final RuntimeEnv env) throws WorkflowManagerException { updateRunningJob(process); try { @@ -260,17 +259,14 @@ public class ProcessEngine { graphNode.getParams().forEach((k, v) -> params.put(k, v)); } if (graphNode.getEnvParams() != null) { - graphNode.getEnvParams().forEach((k, v) -> params.put(k, token.getEnvAttribute(v.toString()))); + graphNode.getEnvParams().forEach((k, v) -> params.put(k, env.getAttribute(v.toString()))); } final ProcessNode pNode = newProcessNode(graphNode, process); - pNode.execute(token, params); - - GraphUtils.completeExecution(graphNode, true); + pNode.execute(env, params); } catch (final Throwable e) { - GraphUtils.completeExecution(graphNode, false); throw new WorkflowManagerException("Error executing node " + graphNode.getName(), e); } finally { updateRunningJob(process); @@ -302,15 +298,15 @@ public class ProcessEngine { } - private Map mergeEnvParams(final Token... tokens) { + private Map mergeEnvParams(final RuntimeEnv... envs) { final Map map = new HashMap<>(); - Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv())); + Arrays.stream(envs).forEach(t -> map.putAll(t.getAttributes())); return map; } - public void completeProcess(final WorkflowProcess process, final Token token) { + public void completeProcess(final WorkflowProcess process, final RuntimeEnv env) { - process.complete(token); + process.complete(env); updateRunningJob(process); saveHistoryJob(process); diff --git a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/WorkflowProcess.java b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/WorkflowProcess.java index ec4dd5e..b9a7f91 100644 --- a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/WorkflowProcess.java +++ b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/WorkflowProcess.java @@ -27,9 +27,9 @@ public class WorkflowProcess implements Comparable { private final WfRunningJob jobDetails; private ProcessCallback callback; - private List tokens = new CopyOnWriteArrayList<>(); + private List envs = new CopyOnWriteArrayList<>(); - private final Map> pausedJoinNodeTokens = new HashMap<>(); + private final Map> pausedJoinNodeTokens = new HashMap<>(); private final Map outputParams = new HashMap<>(); private Throwable error; @@ -47,12 +47,12 @@ public class WorkflowProcess implements Comparable { return this.jobDetails; } - public Map> getPausedJoinNodeTokens() { + public Map> getPausedJoinNodeTokens() { return this.pausedJoinNodeTokens; } - public List getTokens() { - return this.tokens; + public List getEnvs() { + return this.envs; } public void kill() { @@ -91,28 +91,28 @@ public class WorkflowProcess implements Comparable { return this.outputParams; } - public Token newToken(final Map attrs) { - final Token token = new Token(); - token.addEnvAttributes(attrs); - this.tokens.add(token); - return token; + public RuntimeEnv newEnv(final Map attrs) { + final RuntimeEnv env = new RuntimeEnv(); + env.addAttributes(attrs); + this.envs.add(env); + return env; } - public void complete(final Token token) { - final LocalDateTime now = token.getEndDate(); + public void complete(final RuntimeEnv env) { + final LocalDateTime now = LocalDateTime.now(); this.jobDetails.setLastUpdate(now); this.jobDetails.setEndDate(now); - this.jobDetails.setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success); + this.jobDetails.setStatus(env.getError() != null ? JobStatus.failure : JobStatus.success); - if (token.isFailed()) { - setError(token.getError()); + if (env.getError() != null) { + setError(env.getError()); } if (this.callback != null) { - if (token.isFailed()) { - this.callback.onFail(this, token.getError()); + if (env.getError() != null) { + this.callback.onFail(this, env.getError()); } else { this.callback.onSuccess(this); } @@ -128,8 +128,8 @@ public class WorkflowProcess implements Comparable { this.callback = callback; } - public void setTokens(final List tokens) { - this.tokens = tokens; + public void setEnvs(final List envs) { + this.envs = envs; } } diff --git a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/utils/NodeCallback.java b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/utils/NodeCallback.java index e0a44a0..884d597 100644 --- a/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/utils/NodeCallback.java +++ b/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/utils/NodeCallback.java @@ -1,8 +1,8 @@ package eu.dnetlib.wfs.utils; import eu.dnetlib.utils.DnetCallback; -import eu.dnetlib.wfs.procs.Token; +import eu.dnetlib.wfs.procs.RuntimeEnv; -public interface NodeCallback extends DnetCallback { +public interface NodeCallback extends DnetCallback { } diff --git a/dnet-app/libs/dnet-wf-executor-common/src/test/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoaderTest.java b/dnet-app/libs/dnet-wf-executor-common/src/test/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoaderTest.java index d2a3b4e..c1db4d4 100644 --- a/dnet-app/libs/dnet-wf-executor-common/src/test/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoaderTest.java +++ b/dnet-app/libs/dnet-wf-executor-common/src/test/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoaderTest.java @@ -12,39 +12,39 @@ import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; -import eu.dnetlib.wfs.procs.Token; +import eu.dnetlib.wfs.procs.RuntimeEnv; public class GraphLoaderTest { - private Token token; + private RuntimeEnv env; @BeforeEach void setUp() throws Exception { - token = new Token(); - token.setEnvAttribute("author", "Michele Artini"); - token.setEnvAttribute("age", 47); + this.env = new RuntimeEnv(); + this.env.setAttribute("author", "Michele Artini"); + this.env.setAttribute("age", 47); } @Test final void testExpressions() { - assertTrue(evalFunction("age == 47").apply(token)); - assertTrue(evalFunction("age > 40").apply(token)); - assertTrue(evalFunction("author == 'Michele Artini'").apply(token)); - assertTrue(evalFunction("age == 47 && author == 'Michele Artini'").apply(token)); - assertTrue(evalFunction("age == 47 || author == 'Michele Artini'").apply(token)); - assertTrue(evalFunction("age == 47 || author == 'Claudio Atzori'").apply(token)); - assertTrue(evalFunction("age == 22 || author == 'Michele Artini'").apply(token)); - assertFalse(evalFunction("age != 47").apply(token)); - assertFalse(evalFunction("age < 40").apply(token)); - assertFalse(evalFunction("author != 'Michele Artini'").apply(token)); - assertFalse(evalFunction("age == 47 && author == 'Claudio Atzori'").apply(token)); + assertTrue(evalFunction("age == 47").apply(this.env)); + assertTrue(evalFunction("age > 40").apply(this.env)); + assertTrue(evalFunction("author == 'Michele Artini'").apply(this.env)); + assertTrue(evalFunction("age == 47 && author == 'Michele Artini'").apply(this.env)); + assertTrue(evalFunction("age == 47 || author == 'Michele Artini'").apply(this.env)); + assertTrue(evalFunction("age == 47 || author == 'Claudio Atzori'").apply(this.env)); + assertTrue(evalFunction("age == 22 || author == 'Michele Artini'").apply(this.env)); + assertFalse(evalFunction("age != 47").apply(this.env)); + assertFalse(evalFunction("age < 40").apply(this.env)); + assertFalse(evalFunction("author != 'Michele Artini'").apply(this.env)); + assertFalse(evalFunction("age == 47 && author == 'Claudio Atzori'").apply(this.env)); } - private Function evalFunction(final String f) { - return token -> { + private Function evalFunction(final String f) { + return env -> { final ExpressionParser parser = new SpelExpressionParser(); - final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv()); + final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes()); context.addPropertyAccessor(new MapAccessor()); return parser.parseExpression(f).getValue(context, Boolean.class);