From 5cd6efbba683d7a17697693ffdd3aaad9554b838 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 20 Apr 2023 11:53:35 +0200 Subject: [PATCH] annotation params management --- .../manager/wf/WorkflowManagerService.java | 6 +- .../manager/wf/nodes/AbstractJobNode.java | 20 ++--- .../manager/wf/nodes/DefaultJobNode.java | 4 +- .../wf/nodes/LaunchWorkflowJobNode.java | 18 ++--- .../dnetlib/manager/wf/nodes/ProcessNode.java | 74 +++++++++++++++++-- .../dnetlib/manager/wf/nodes/SuccessNode.java | 4 +- .../wf/nodes/aggregation/CollectOAINode.java | 3 +- .../aggregation/MetadataCleanerNode.java | 3 +- .../nodes/aggregation/MetadataFilterNode.java | 3 +- .../MetadataXsltTransformNode.java | 3 +- .../wf/nodes/stream/StreamConsumerNode.java | 7 +- .../wf/nodes/stream/StreamFilterNode.java | 7 +- .../wf/nodes/stream/StreamMapperNode.java | 7 +- .../wf/nodes/stream/StreamSupplierNode.java | 7 +- .../wf/workflows/procs/ProcessEngine.java | 41 +++++----- .../wf/workflows/procs/ProcessFactory.java | 4 +- .../wf/workflows/procs/WorkflowProcess.java | 6 +- .../wf/workflows/util/ExecutionCallback.java | 9 --- .../wf/workflows/util/NodeCallback.java | 11 +++ .../manager/wf/workflows/util/NodeHelper.java | 14 +--- .../wf/workflows/util/ProcessCallback.java | 11 +++ 21 files changed, 154 insertions(+), 108 deletions(-) delete mode 100644 libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ExecutionCallback.java create mode 100644 libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeCallback.java create mode 100644 libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java index 9529a0d0..799b8ed3 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java @@ -35,7 +35,7 @@ import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine; import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory; import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; import eu.dnetlib.utils.Stoppable; import eu.dnetlib.utils.StoppableDetails; @@ -86,7 +86,7 @@ public class WorkflowManagerService implements Stoppable { public ExecutionStatus startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, - final ExecutionCallback callback) throws WorkflowManagerException { + final ProcessCallback callback) throws WorkflowManagerException { if (isPaused()) { log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown"); @@ -134,7 +134,7 @@ public class WorkflowManagerService implements Stoppable { public ExecutionStatus startWorkflow(final String wfId, final WorkflowConfiguration conf, - final ExecutionCallback callback) + final ProcessCallback callback) throws WorkflowManagerException { if (!conf.isEnabled() || !conf.isConfigured()) { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java index 35ee8a0f..fbfb374c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java @@ -1,13 +1,12 @@ package eu.dnetlib.manager.wf.nodes; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.dnetlib.manager.wf.workflows.procs.Token; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.NodeCallback; public abstract class AbstractJobNode extends ProcessNode { @@ -19,17 +18,14 @@ public abstract class AbstractJobNode extends ProcessNode { this.async = async; } - private final ExecutorService executor = Executors.newCachedThreadPool(); - @Override - public final void execute(final Token token, final ExecutionCallback callback) { + public final void execute(final Token token, final NodeCallback callback) { try { log.debug("START NODE: " + getBeanName()); token.setProgressMessage(getNodeName()); - onStart(token); if (isAsync()) { - executor.execute(() -> doExecute(token, callback)); + Executors.newSingleThreadExecutor().execute(() -> doExecute(token, callback)); } else { doExecute(token, callback); } @@ -38,19 +34,17 @@ public abstract class AbstractJobNode extends ProcessNode { } catch (final Throwable e) { log.error("got exception while executing workflow node", e); log.debug("END NODE (FAILED): " + getBeanName()); - onFailed(token); callback.onFail(token); } } - private final void doExecute(final Token token, final ExecutionCallback callback) { - execute(token); - onDone(token); - callback.onSuccess(token); + private final void doExecute(final Token token, final NodeCallback callback) { + execute(); + callback.onComplete(token); } - protected abstract void execute(Token token); + protected abstract void execute(); public final boolean isAsync() { return async; diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java index 96319cd6..96afc760 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java @@ -1,7 +1,5 @@ package eu.dnetlib.manager.wf.nodes; -import eu.dnetlib.manager.wf.workflows.procs.Token; - public final class DefaultJobNode extends AbstractJobNode { public DefaultJobNode(final String name) { @@ -10,6 +8,6 @@ public final class DefaultJobNode extends AbstractJobNode { } @Override - public final void execute(final Token token) {} + public final void execute() {} } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java index a10682df..6b852e8b 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java @@ -13,7 +13,8 @@ import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.NodeCallback; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; /** * Created by michele on 18/11/15. @@ -30,8 +31,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { private WorkflowProcess process; @Override - public final void execute(final Token token, final ExecutionCallback callback) { - onStart(token); + public final void execute(final Token token, final NodeCallback nodeCallback) { try { final WorkflowConfiguration conf = new WorkflowConfiguration(); conf.setId("CHILD_" + UUID.randomUUID()); @@ -51,22 +51,18 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { conf.setSystemParams(process.getGlobalParams()); conf.setUserParams(new HashMap<>()); - final ExecutionStatus info = wfManagerService.startWorkflow(wfId, conf, new ExecutionCallback() { + final ExecutionStatus info = wfManagerService.startWorkflow(wfId, conf, new ProcessCallback() { @Override public void onSuccess(final WorkflowProcess t) { log.debug("Child workflow has been completed successfully"); - onDone(token); - token.release(); - callback.onSuccess(token); + nodeCallback.onComplete(token); } @Override public void onFail(final WorkflowProcess t) { log.error("Child workflow is failed"); - onFailed(token); - token.releaseAsFailed("Child workflow is failed"); - callback.onFail(token); + nodeCallback.onFail(token); } }); @@ -79,7 +75,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { } catch (final Throwable e) { log.error("got exception while launching child workflow", e); - callback.onFail(token); + nodeCallback.onFail(token); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java index e18a1e27..125c43c2 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java @@ -1,28 +1,72 @@ package eu.dnetlib.manager.wf.nodes; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanNameAware; +import eu.dnetlib.manager.wf.annotations.WfInputParam; +import eu.dnetlib.manager.wf.annotations.WfOutputParam; +import eu.dnetlib.manager.wf.workflows.procs.Env; +import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine; import eu.dnetlib.manager.wf.workflows.procs.Token; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.NodeCallback; public abstract class ProcessNode implements BeanNameAware { + private static final Log log = LogFactory.getLog(ProcessEngine.class); + private String beanName; private String nodeName; - public abstract void execute(final Token token, ExecutionCallback callback); + public abstract void execute(final Token token, NodeCallback callback); - protected void onStart(final Token token) { - // For optional overwrites + public final void initInputParams(final Env env) { + findFields(getClass(), WfInputParam.class).forEach(field -> { + final String annName = field.getAnnotation(WfInputParam.class).value(); + if (StringUtils.isNotBlank(annName)) { + fieldValue(field, env.getAttribute(annName, field.getType())); + } else { + fieldValue(field, env.getAttribute(field.getName(), field.getType())); + } + + }); } - protected void onDone(final Token token) { - // For optional overwrites + public final void saveOutputParams(final Env env) { + findFields(getClass(), WfOutputParam.class).forEach(field -> { + final String annName = field.getAnnotation(WfOutputParam.class).value(); + if (StringUtils.isNotBlank(annName)) { + env.setAttribute(annName, fieldValue(field)); + } else { + env.setAttribute(field.getName(), fieldValue(field)); + } + + }); } - protected void onFailed(final Token token) { - // For optional overwrites + private final void fieldValue(final Field field, final Object object) { + try { + field.set(this, object); + } catch (IllegalArgumentException | IllegalAccessException e) { + log.error("Error setting field " + field.getName(), e); + throw new RuntimeException(e); + } + } + + private Object fieldValue(final Field field) { + try { + return field.get(this); + } catch (IllegalArgumentException | IllegalAccessException e) { + log.error("Error getting field " + field.getName(), e); + throw new RuntimeException(e); + } } public String getBeanName() { @@ -47,4 +91,18 @@ public abstract class ProcessNode implements BeanNameAware { return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName); } + private final Set findFields(final Class clazz, final Class ann) { + final Set fields = new HashSet<>(); + if (clazz != null) { + fields.addAll(findFields(clazz.getSuperclass(), ann)); + + for (final Field field : clazz.getDeclaredFields()) { + if (field.isAnnotationPresent(ann)) { + fields.add(field); + } + } + } + return fields; + } + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java index d0f015d2..4b884587 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java @@ -1,7 +1,5 @@ package eu.dnetlib.manager.wf.nodes; -import eu.dnetlib.manager.wf.workflows.procs.Token; - public class SuccessNode extends AbstractJobNode { public SuccessNode() { @@ -10,6 +8,6 @@ public class SuccessNode extends AbstractJobNode { } @Override - public final void execute(final Token token) {} + public final void execute() {} } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/CollectOAINode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/CollectOAINode.java index e62819f0..679d0d9a 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/CollectOAINode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/CollectOAINode.java @@ -6,7 +6,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecord; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; @WfNode("oai_collect") public class CollectOAINode extends StreamSupplierNode { @@ -18,7 +17,7 @@ public class CollectOAINode extends StreamSupplierNode { private String apiId; @Override - protected Stream prepareStream(final Token token) { + protected Stream prepareStream() { // TODO Auto-generated method stub return null; } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataCleanerNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataCleanerNode.java index af2eb4f0..226b5b30 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataCleanerNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataCleanerNode.java @@ -12,7 +12,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; @WfNode("clean") public class MetadataCleanerNode extends StreamMapperNode { @@ -24,7 +23,7 @@ public class MetadataCleanerNode extends StreamMapperNode mapStream(final Stream input, final Token token) { + protected Stream mapStream(final Stream input) { try { final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataFilterNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataFilterNode.java index 24788209..47e9eb69 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataFilterNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataFilterNode.java @@ -10,7 +10,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecord; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; @WfNode("xpath_filter") public class MetadataFilterNode extends StreamMapperNode { @@ -19,7 +18,7 @@ public class MetadataFilterNode extends StreamMapperNode mapStream(final Stream input, final Token token) { + protected Stream mapStream(final Stream input) { return input.filter(in -> { try { final Document doc = DocumentHelper.parseText(in.getBody()); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java index d3837e65..fc011de5 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java @@ -9,7 +9,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; @WfNode("applyXslt") public class MetadataXsltTransformNode extends StreamMapperNode { @@ -21,7 +20,7 @@ public class MetadataXsltTransformNode extends StreamMapperNode mapStream(final Stream input, final Token token) { + protected Stream mapStream(final Stream input) { try { // final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java index f3ed994d..05216a8f 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java @@ -4,7 +4,6 @@ import java.util.stream.Stream; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; public abstract class StreamConsumerNode extends AbstractJobNode { @@ -15,11 +14,11 @@ public abstract class StreamConsumerNode extends AbstractJobNode { super(false); } - abstract protected Stream consumeStream(Stream stream, final Token token); + abstract protected Stream consumeStream(Stream stream); @Override - protected final void execute(final Token token) { - consumeStream(inputStream, token); + protected final void execute() { + consumeStream(inputStream); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java index f2f2fe61..8d7f174a 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java @@ -5,7 +5,6 @@ import java.util.stream.Stream; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; public abstract class StreamFilterNode extends AbstractJobNode { @@ -19,11 +18,11 @@ public abstract class StreamFilterNode extends AbstractJobNode { super(false); } - abstract protected Stream filterStream(Stream input, Token token); + abstract protected Stream filterStream(Stream input); @Override - protected final void execute(final Token token) { - outputStream = filterStream(inputStream, token); + protected final void execute() { + outputStream = filterStream(inputStream); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamMapperNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamMapperNode.java index 1bce4228..c452595c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamMapperNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamMapperNode.java @@ -5,7 +5,6 @@ import java.util.stream.Stream; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; public abstract class StreamMapperNode extends AbstractJobNode { @@ -19,11 +18,11 @@ public abstract class StreamMapperNode extends AbstractJobNode { super(false); } - abstract protected Stream mapStream(Stream input, Token token); + abstract protected Stream mapStream(Stream input); @Override - protected void execute(final Token token) { - outputStream = mapStream(inputStream, token); + protected void execute() { + outputStream = mapStream(inputStream); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamSupplierNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamSupplierNode.java index 020e015a..a9492128 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamSupplierNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamSupplierNode.java @@ -4,7 +4,6 @@ import java.util.stream.Stream; import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; -import eu.dnetlib.manager.wf.workflows.procs.Token; public abstract class StreamSupplierNode extends AbstractJobNode { @@ -15,11 +14,11 @@ public abstract class StreamSupplierNode extends AbstractJobNode { super(false); } - abstract protected Stream prepareStream(Token token); + abstract protected Stream prepareStream(); @Override - protected void execute(final Token token) { - outputStream = prepareStream(token); + protected void execute() { + outputStream = prepareStream(); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java index fc9d4056..d5776c8c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java @@ -18,7 +18,7 @@ import eu.dnetlib.manager.history.WorkflowLogger; import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.notification.EmailSender; import eu.dnetlib.manager.wf.workflows.graph.GraphNode; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.NodeCallback; import eu.dnetlib.manager.wf.workflows.util.NodeHelper; @Service @@ -44,13 +44,14 @@ public class ProcessEngine { process.setLastActivityDate(now); try { - for (final GraphNode node : process.getGraph().startNodes()) { - final ProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv()); + for (final GraphNode graphNode : process.getGraph().startNodes()) { + final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); final Token token = new Token(); token.getEnv().addAttributes(process.getEnv().getAttributes()); process.getTokens().add(token); - pNode.execute(token, newNodeCallback(process, node)); + pNode.initInputParams(token.getEnv()); + pNode.execute(token, newNodeCallback(process, pNode, graphNode)); } } catch (final Throwable e) { log.error("WorkflowProcess node instantiation failed", e); @@ -58,16 +59,18 @@ public class ProcessEngine { } } - private ExecutionCallback newNodeCallback(final WorkflowProcess process, final GraphNode node) { - return new ExecutionCallback<>() { + private NodeCallback newNodeCallback(final WorkflowProcess process, final ProcessNode pNode, final GraphNode graphNode) { + return new NodeCallback() { @Override - public void onSuccess(final Token t) { - releaseToken(process, node, t); + public void onComplete(final Token t) { + pNode.saveOutputParams(t.getEnv()); + releaseToken(process, graphNode, t); } @Override public void onFail(final Token t) { + pNode.saveOutputParams(t.getEnv()); completeProcess(process, t); } }; @@ -77,38 +80,38 @@ public class ProcessEngine { process.setLastActivityDate(LocalDateTime.now()); try { - for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) { - if (node.isJoin() || node.isSucessNode()) { - if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) { - process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList()); + for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) { + if (graphNode.isJoin() || graphNode.isSucessNode()) { + if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) { + process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList()); } - final List list = process.getPausedJoinNodeTokens().get(node.getName()); + final List list = process.getPausedJoinNodeTokens().get(graphNode.getName()); list.add(oldToken); - if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) { + if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) { final Token token = new Token(); token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()]))); - final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); + final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); process.getTokens().add(token); process.setLastActivityDate(LocalDateTime.now()); - if (node.isSucessNode()) { + if (graphNode.isSucessNode()) { completeProcess(process, token); } else { - pNode.execute(token, newNodeCallback(process, node)); + pNode.execute(token, newNodeCallback(process, pNode, graphNode)); } } } else { final Token token = new Token(); token.getEnv().addAttributes(oldToken.getEnv().getAttributes()); - final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); + final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); process.getTokens().add(token); process.setLastActivityDate(LocalDateTime.now()); - pNode.execute(token, newNodeCallback(process, node)); + pNode.execute(token, newNodeCallback(process, pNode, graphNode)); } } } catch (final Throwable e) { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java index 5e8832b4..134f0cc3 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java @@ -16,7 +16,7 @@ import eu.dnetlib.manager.wf.model.WorkflowConfiguration; import eu.dnetlib.manager.wf.model.WorkflowTemplate; import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @Component public class ProcessFactory { @@ -33,7 +33,7 @@ public class ProcessFactory { public WorkflowProcess newProcess(final SimpleResource wfMetadata, final WorkflowTemplate wfTemplate, final WorkflowConfiguration conf, - final ExecutionCallback callback) throws WorkflowManagerException { + final ProcessCallback callback) throws WorkflowManagerException { final Map globalParams = new HashMap<>(); globalParams.putAll(conf.getSystemParams()); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java index 84cce528..3a260feb 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java @@ -13,7 +13,7 @@ import eu.dnetlib.is.model.resource.SimpleResource; import eu.dnetlib.manager.history.model.WfHistoryEntry; import eu.dnetlib.manager.wf.model.WorkflowConfiguration; import eu.dnetlib.manager.wf.workflows.graph.Graph; -import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; /** @@ -39,7 +39,7 @@ public class WorkflowProcess implements Comparable { private final SimpleResource wfMetadata; private final WorkflowConfiguration wfConf; private final Graph graph; - private final ExecutionCallback callback; + private final ProcessCallback callback; private final Env env; private final List tokens = new CopyOnWriteArrayList<>(); private LocalDateTime lastActivityDate; @@ -58,7 +58,7 @@ public class WorkflowProcess implements Comparable { final WorkflowConfiguration wfConf, final Graph graph, final Map globalParams, - final ExecutionCallback callback) { + final ProcessCallback callback) { this.id = id; this.wfMetadata = wfMetadata; this.wfConf = wfConf; diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ExecutionCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ExecutionCallback.java deleted file mode 100644 index 04bcdf38..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ExecutionCallback.java +++ /dev/null @@ -1,9 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -public interface ExecutionCallback { - - void onSuccess(T t); - - void onFail(T t); - -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeCallback.java new file mode 100644 index 00000000..235cd011 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeCallback.java @@ -0,0 +1,11 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +public interface NodeCallback { + + void onComplete(Token t); + + void onFail(Token t); + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java index c9c11307..20ec408c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java @@ -4,17 +4,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; -import org.springframework.beans.PropertyAccessorFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.nodes.DefaultJobNode; +import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.nodes.SuccessNode; import eu.dnetlib.manager.wf.workflows.graph.GraphNode; -import eu.dnetlib.manager.wf.workflows.procs.Env; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; @@ -26,21 +24,17 @@ public class NodeHelper implements ApplicationContextAware { private ApplicationContext applicationContext; - public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process, final Env env) throws WorkflowManagerException { + public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process) throws WorkflowManagerException { if (node.isSucessNode()) { return new SuccessNode(); } else if (StringUtils.isBlank(node.getType())) { return new DefaultJobNode(node.getName()); } else { + + // TODO: Considerare i nodi annotati con WfNode final ProcessNode pnode = this.applicationContext.getBean(beanNamePrefix + node.getType(), ProcessNode.class); if (pnode != null) { pnode.setNodeName(node.getName()); - // I invoke the setter methods using the static params of the graph node - try { - PropertyAccessorFactory.forBeanPropertyAccess(pnode).setPropertyValues(node.resolveParams(env)); - } catch (final Throwable e) { - throw new WorkflowManagerException(String.format("error setting parameters in wfNode %s", node.getName()), e); - } if (pnode instanceof ProcessAware) { ((ProcessAware) pnode).setProcess(process); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java new file mode 100644 index 00000000..7bc4ec9b --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java @@ -0,0 +1,11 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; + +public interface ProcessCallback { + + void onSuccess(WorkflowProcess proc); + + void onFail(WorkflowProcess proc); + +}