annotation params management

This commit is contained in:
Michele Artini 2023-04-20 11:53:35 +02:00
parent f348e5c1a0
commit 5cd6efbba6
21 changed files with 154 additions and 108 deletions

View File

@ -35,7 +35,7 @@ import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory; import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.Stoppable; import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails; import eu.dnetlib.utils.StoppableDetails;
@ -86,7 +86,7 @@ public class WorkflowManagerService implements Stoppable {
public ExecutionStatus startRepoHiWorkflow(final String wfId, public ExecutionStatus startRepoHiWorkflow(final String wfId,
final String dsId, final String dsId,
final String apiId, final String apiId,
final ExecutionCallback<WorkflowProcess> callback) throws WorkflowManagerException { final ProcessCallback callback) throws WorkflowManagerException {
if (isPaused()) { if (isPaused()) {
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown"); log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
@ -134,7 +134,7 @@ public class WorkflowManagerService implements Stoppable {
public ExecutionStatus startWorkflow(final String wfId, public ExecutionStatus startWorkflow(final String wfId,
final WorkflowConfiguration conf, final WorkflowConfiguration conf,
final ExecutionCallback<WorkflowProcess> callback) final ProcessCallback callback)
throws WorkflowManagerException { throws WorkflowManagerException {
if (!conf.isEnabled() || !conf.isConfigured()) { if (!conf.isEnabled() || !conf.isConfigured()) {

View File

@ -1,13 +1,12 @@
package eu.dnetlib.manager.wf.nodes; package eu.dnetlib.manager.wf.nodes;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
public abstract class AbstractJobNode extends ProcessNode { public abstract class AbstractJobNode extends ProcessNode {
@ -19,17 +18,14 @@ public abstract class AbstractJobNode extends ProcessNode {
this.async = async; this.async = async;
} }
private final ExecutorService executor = Executors.newCachedThreadPool();
@Override @Override
public final void execute(final Token token, final ExecutionCallback<Token> callback) { public final void execute(final Token token, final NodeCallback callback) {
try { try {
log.debug("START NODE: " + getBeanName()); log.debug("START NODE: " + getBeanName());
token.setProgressMessage(getNodeName()); token.setProgressMessage(getNodeName());
onStart(token);
if (isAsync()) { if (isAsync()) {
executor.execute(() -> doExecute(token, callback)); Executors.newSingleThreadExecutor().execute(() -> doExecute(token, callback));
} else { } else {
doExecute(token, callback); doExecute(token, callback);
} }
@ -38,19 +34,17 @@ public abstract class AbstractJobNode extends ProcessNode {
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("got exception while executing workflow node", e); log.error("got exception while executing workflow node", e);
log.debug("END NODE (FAILED): " + getBeanName()); log.debug("END NODE (FAILED): " + getBeanName());
onFailed(token);
callback.onFail(token); callback.onFail(token);
} }
} }
private final void doExecute(final Token token, final ExecutionCallback<Token> callback) { private final void doExecute(final Token token, final NodeCallback callback) {
execute(token); execute();
onDone(token); callback.onComplete(token);
callback.onSuccess(token);
} }
protected abstract void execute(Token token); protected abstract void execute();
public final boolean isAsync() { public final boolean isAsync() {
return async; return async;

View File

@ -1,7 +1,5 @@
package eu.dnetlib.manager.wf.nodes; package eu.dnetlib.manager.wf.nodes;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public final class DefaultJobNode extends AbstractJobNode { public final class DefaultJobNode extends AbstractJobNode {
public DefaultJobNode(final String name) { public DefaultJobNode(final String name) {
@ -10,6 +8,6 @@ public final class DefaultJobNode extends AbstractJobNode {
} }
@Override @Override
public final void execute(final Token token) {} public final void execute() {}
} }

View File

@ -13,7 +13,8 @@ import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
/** /**
* Created by michele on 18/11/15. * Created by michele on 18/11/15.
@ -30,8 +31,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
private WorkflowProcess process; private WorkflowProcess process;
@Override @Override
public final void execute(final Token token, final ExecutionCallback<Token> callback) { public final void execute(final Token token, final NodeCallback nodeCallback) {
onStart(token);
try { try {
final WorkflowConfiguration conf = new WorkflowConfiguration(); final WorkflowConfiguration conf = new WorkflowConfiguration();
conf.setId("CHILD_" + UUID.randomUUID()); conf.setId("CHILD_" + UUID.randomUUID());
@ -51,22 +51,18 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
conf.setSystemParams(process.getGlobalParams()); conf.setSystemParams(process.getGlobalParams());
conf.setUserParams(new HashMap<>()); conf.setUserParams(new HashMap<>());
final ExecutionStatus info = wfManagerService.startWorkflow(wfId, conf, new ExecutionCallback<WorkflowProcess>() { final ExecutionStatus info = wfManagerService.startWorkflow(wfId, conf, new ProcessCallback() {
@Override @Override
public void onSuccess(final WorkflowProcess t) { public void onSuccess(final WorkflowProcess t) {
log.debug("Child workflow has been completed successfully"); log.debug("Child workflow has been completed successfully");
onDone(token); nodeCallback.onComplete(token);
token.release();
callback.onSuccess(token);
} }
@Override @Override
public void onFail(final WorkflowProcess t) { public void onFail(final WorkflowProcess t) {
log.error("Child workflow is failed"); log.error("Child workflow is failed");
onFailed(token); nodeCallback.onFail(token);
token.releaseAsFailed("Child workflow is failed");
callback.onFail(token);
} }
}); });
@ -79,7 +75,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("got exception while launching child workflow", e); log.error("got exception while launching child workflow", e);
callback.onFail(token); nodeCallback.onFail(token);
} }
} }

View File

@ -1,28 +1,72 @@
package eu.dnetlib.manager.wf.nodes; package eu.dnetlib.manager.wf.nodes;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
public abstract class ProcessNode implements BeanNameAware { public abstract class ProcessNode implements BeanNameAware {
private static final Log log = LogFactory.getLog(ProcessEngine.class);
private String beanName; private String beanName;
private String nodeName; private String nodeName;
public abstract void execute(final Token token, ExecutionCallback<Token> callback); public abstract void execute(final Token token, NodeCallback callback);
protected void onStart(final Token token) { public final void initInputParams(final Env env) {
// For optional overwrites findFields(getClass(), WfInputParam.class).forEach(field -> {
final String annName = field.getAnnotation(WfInputParam.class).value();
if (StringUtils.isNotBlank(annName)) {
fieldValue(field, env.getAttribute(annName, field.getType()));
} else {
fieldValue(field, env.getAttribute(field.getName(), field.getType()));
}
});
} }
protected void onDone(final Token token) { public final void saveOutputParams(final Env env) {
// For optional overwrites findFields(getClass(), WfOutputParam.class).forEach(field -> {
final String annName = field.getAnnotation(WfOutputParam.class).value();
if (StringUtils.isNotBlank(annName)) {
env.setAttribute(annName, fieldValue(field));
} else {
env.setAttribute(field.getName(), fieldValue(field));
}
});
} }
protected void onFailed(final Token token) { private final void fieldValue(final Field field, final Object object) {
// For optional overwrites try {
field.set(this, object);
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("Error setting field " + field.getName(), e);
throw new RuntimeException(e);
}
}
private Object fieldValue(final Field field) {
try {
return field.get(this);
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("Error getting field " + field.getName(), e);
throw new RuntimeException(e);
}
} }
public String getBeanName() { public String getBeanName() {
@ -47,4 +91,18 @@ public abstract class ProcessNode implements BeanNameAware {
return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName); return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName);
} }
private final Set<Field> findFields(final Class<?> clazz, final Class<? extends Annotation> ann) {
final Set<Field> fields = new HashSet<>();
if (clazz != null) {
fields.addAll(findFields(clazz.getSuperclass(), ann));
for (final Field field : clazz.getDeclaredFields()) {
if (field.isAnnotationPresent(ann)) {
fields.add(field);
}
}
}
return fields;
}
} }

View File

@ -1,7 +1,5 @@
package eu.dnetlib.manager.wf.nodes; package eu.dnetlib.manager.wf.nodes;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public class SuccessNode extends AbstractJobNode { public class SuccessNode extends AbstractJobNode {
public SuccessNode() { public SuccessNode() {
@ -10,6 +8,6 @@ public class SuccessNode extends AbstractJobNode {
} }
@Override @Override
public final void execute(final Token token) {} public final void execute() {}
} }

View File

@ -6,7 +6,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode; import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@WfNode("oai_collect") @WfNode("oai_collect")
public class CollectOAINode extends StreamSupplierNode<MetadataRecord> { public class CollectOAINode extends StreamSupplierNode<MetadataRecord> {
@ -18,7 +17,7 @@ public class CollectOAINode extends StreamSupplierNode<MetadataRecord> {
private String apiId; private String apiId;
@Override @Override
protected Stream<MetadataRecord> prepareStream(final Token token) { protected Stream<MetadataRecord> prepareStream() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }

View File

@ -12,7 +12,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@WfNode("clean") @WfNode("clean")
public class MetadataCleanerNode extends StreamMapperNode<MetadataRecord, MetadataRecord> { public class MetadataCleanerNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@ -24,7 +23,7 @@ public class MetadataCleanerNode extends StreamMapperNode<MetadataRecord, Metada
private CleanerFactory cleanerFactory; private CleanerFactory cleanerFactory;
@Override @Override
protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input, final Token token) { protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input) {
try { try {
final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId); final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId);

View File

@ -10,7 +10,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@WfNode("xpath_filter") @WfNode("xpath_filter")
public class MetadataFilterNode extends StreamMapperNode<MetadataRecord, MetadataRecord> { public class MetadataFilterNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@ -19,7 +18,7 @@ public class MetadataFilterNode extends StreamMapperNode<MetadataRecord, Metadat
private String xpath; private String xpath;
@Override @Override
protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input, final Token token) { protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input) {
return input.filter(in -> { return input.filter(in -> {
try { try {
final Document doc = DocumentHelper.parseText(in.getBody()); final Document doc = DocumentHelper.parseText(in.getBody());

View File

@ -9,7 +9,6 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode; import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@WfNode("applyXslt") @WfNode("applyXslt")
public class MetadataXsltTransformNode extends StreamMapperNode<MetadataRecord, MetadataRecord> { public class MetadataXsltTransformNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@ -21,7 +20,7 @@ public class MetadataXsltTransformNode extends StreamMapperNode<MetadataRecord,
// private CleanerFactory cleanerFactory; // private CleanerFactory cleanerFactory;
@Override @Override
protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input, final Token token) { protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input) {
try { try {
// final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId); // final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId);

View File

@ -4,7 +4,6 @@ import java.util.stream.Stream;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode; import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class StreamConsumerNode<T, K> extends AbstractJobNode { public abstract class StreamConsumerNode<T, K> extends AbstractJobNode {
@ -15,11 +14,11 @@ public abstract class StreamConsumerNode<T, K> extends AbstractJobNode {
super(false); super(false);
} }
abstract protected Stream<T> consumeStream(Stream<T> stream, final Token token); abstract protected Stream<T> consumeStream(Stream<T> stream);
@Override @Override
protected final void execute(final Token token) { protected final void execute() {
consumeStream(inputStream, token); consumeStream(inputStream);
} }
} }

View File

@ -5,7 +5,6 @@ import java.util.stream.Stream;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode; import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class StreamFilterNode<T> extends AbstractJobNode { public abstract class StreamFilterNode<T> extends AbstractJobNode {
@ -19,11 +18,11 @@ public abstract class StreamFilterNode<T> extends AbstractJobNode {
super(false); super(false);
} }
abstract protected Stream<T> filterStream(Stream<T> input, Token token); abstract protected Stream<T> filterStream(Stream<T> input);
@Override @Override
protected final void execute(final Token token) { protected final void execute() {
outputStream = filterStream(inputStream, token); outputStream = filterStream(inputStream);
} }
} }

View File

@ -5,7 +5,6 @@ import java.util.stream.Stream;
import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode; import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class StreamMapperNode<T, K> extends AbstractJobNode { public abstract class StreamMapperNode<T, K> extends AbstractJobNode {
@ -19,11 +18,11 @@ public abstract class StreamMapperNode<T, K> extends AbstractJobNode {
super(false); super(false);
} }
abstract protected Stream<K> mapStream(Stream<T> input, Token token); abstract protected Stream<K> mapStream(Stream<T> input);
@Override @Override
protected void execute(final Token token) { protected void execute() {
outputStream = mapStream(inputStream, token); outputStream = mapStream(inputStream);
} }
} }

View File

@ -4,7 +4,6 @@ import java.util.stream.Stream;
import eu.dnetlib.manager.wf.annotations.WfOutputParam; import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode; import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class StreamSupplierNode<T> extends AbstractJobNode { public abstract class StreamSupplierNode<T> extends AbstractJobNode {
@ -15,11 +14,11 @@ public abstract class StreamSupplierNode<T> extends AbstractJobNode {
super(false); super(false);
} }
abstract protected Stream<T> prepareStream(Token token); abstract protected Stream<T> prepareStream();
@Override @Override
protected void execute(final Token token) { protected void execute() {
outputStream = prepareStream(token); outputStream = prepareStream();
} }
} }

View File

@ -18,7 +18,7 @@ import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.notification.EmailSender; import eu.dnetlib.manager.wf.notification.EmailSender;
import eu.dnetlib.manager.wf.workflows.graph.GraphNode; import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.manager.wf.workflows.util.NodeHelper; import eu.dnetlib.manager.wf.workflows.util.NodeHelper;
@Service @Service
@ -44,13 +44,14 @@ public class ProcessEngine {
process.setLastActivityDate(now); process.setLastActivityDate(now);
try { try {
for (final GraphNode node : process.getGraph().startNodes()) { for (final GraphNode graphNode : process.getGraph().startNodes()) {
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv()); final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process);
final Token token = new Token(); final Token token = new Token();
token.getEnv().addAttributes(process.getEnv().getAttributes()); token.getEnv().addAttributes(process.getEnv().getAttributes());
process.getTokens().add(token); process.getTokens().add(token);
pNode.execute(token, newNodeCallback(process, node)); pNode.initInputParams(token.getEnv());
pNode.execute(token, newNodeCallback(process, pNode, graphNode));
} }
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e); log.error("WorkflowProcess node instantiation failed", e);
@ -58,16 +59,18 @@ public class ProcessEngine {
} }
} }
private ExecutionCallback<Token> newNodeCallback(final WorkflowProcess process, final GraphNode node) { private NodeCallback newNodeCallback(final WorkflowProcess process, final ProcessNode pNode, final GraphNode graphNode) {
return new ExecutionCallback<>() { return new NodeCallback() {
@Override @Override
public void onSuccess(final Token t) { public void onComplete(final Token t) {
releaseToken(process, node, t); pNode.saveOutputParams(t.getEnv());
releaseToken(process, graphNode, t);
} }
@Override @Override
public void onFail(final Token t) { public void onFail(final Token t) {
pNode.saveOutputParams(t.getEnv());
completeProcess(process, t); completeProcess(process, t);
} }
}; };
@ -77,38 +80,38 @@ public class ProcessEngine {
process.setLastActivityDate(LocalDateTime.now()); process.setLastActivityDate(LocalDateTime.now());
try { try {
for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) { for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) {
if (node.isJoin() || node.isSucessNode()) { if (graphNode.isJoin() || graphNode.isSucessNode()) {
if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) { if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList<Token>()); process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<Token>());
} }
final List<Token> list = process.getPausedJoinNodeTokens().get(node.getName()); final List<Token> list = process.getPausedJoinNodeTokens().get(graphNode.getName());
list.add(oldToken); list.add(oldToken);
if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) { if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) {
final Token token = new Token(); final Token token = new Token();
token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()]))); token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()])));
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process);
process.getTokens().add(token); process.getTokens().add(token);
process.setLastActivityDate(LocalDateTime.now()); process.setLastActivityDate(LocalDateTime.now());
if (node.isSucessNode()) { if (graphNode.isSucessNode()) {
completeProcess(process, token); completeProcess(process, token);
} else { } else {
pNode.execute(token, newNodeCallback(process, node)); pNode.execute(token, newNodeCallback(process, pNode, graphNode));
} }
} }
} else { } else {
final Token token = new Token(); final Token token = new Token();
token.getEnv().addAttributes(oldToken.getEnv().getAttributes()); token.getEnv().addAttributes(oldToken.getEnv().getAttributes());
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process);
process.getTokens().add(token); process.getTokens().add(token);
process.setLastActivityDate(LocalDateTime.now()); process.setLastActivityDate(LocalDateTime.now());
pNode.execute(token, newNodeCallback(process, node)); pNode.execute(token, newNodeCallback(process, pNode, graphNode));
} }
} }
} catch (final Throwable e) { } catch (final Throwable e) {

View File

@ -16,7 +16,7 @@ import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.model.WorkflowTemplate; import eu.dnetlib.manager.wf.model.WorkflowTemplate;
import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
@Component @Component
public class ProcessFactory { public class ProcessFactory {
@ -33,7 +33,7 @@ public class ProcessFactory {
public WorkflowProcess newProcess(final SimpleResource wfMetadata, public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowTemplate wfTemplate, final WorkflowTemplate wfTemplate,
final WorkflowConfiguration conf, final WorkflowConfiguration conf,
final ExecutionCallback<WorkflowProcess> callback) throws WorkflowManagerException { final ProcessCallback callback) throws WorkflowManagerException {
final Map<String, String> globalParams = new HashMap<>(); final Map<String, String> globalParams = new HashMap<>();
globalParams.putAll(conf.getSystemParams()); globalParams.putAll(conf.getSystemParams());

View File

@ -13,7 +13,7 @@ import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.history.model.WfHistoryEntry; import eu.dnetlib.manager.history.model.WfHistoryEntry;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration; import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
/** /**
@ -39,7 +39,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private final SimpleResource wfMetadata; private final SimpleResource wfMetadata;
private final WorkflowConfiguration wfConf; private final WorkflowConfiguration wfConf;
private final Graph graph; private final Graph graph;
private final ExecutionCallback<WorkflowProcess> callback; private final ProcessCallback callback;
private final Env env; private final Env env;
private final List<Token> tokens = new CopyOnWriteArrayList<>(); private final List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate; private LocalDateTime lastActivityDate;
@ -58,7 +58,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
final WorkflowConfiguration wfConf, final WorkflowConfiguration wfConf,
final Graph graph, final Graph graph,
final Map<String, String> globalParams, final Map<String, String> globalParams,
final ExecutionCallback<WorkflowProcess> callback) { final ProcessCallback callback) {
this.id = id; this.id = id;
this.wfMetadata = wfMetadata; this.wfMetadata = wfMetadata;
this.wfConf = wfConf; this.wfConf = wfConf;

View File

@ -1,9 +0,0 @@
package eu.dnetlib.manager.wf.workflows.util;
public interface ExecutionCallback<T> {
void onSuccess(T t);
void onFail(T t);
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.manager.wf.workflows.util;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public interface NodeCallback {
void onComplete(Token t);
void onFail(Token t);
}

View File

@ -4,17 +4,15 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.nodes.DefaultJobNode; import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.nodes.SuccessNode; import eu.dnetlib.manager.wf.nodes.SuccessNode;
import eu.dnetlib.manager.wf.workflows.graph.GraphNode; import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@ -26,21 +24,17 @@ public class NodeHelper implements ApplicationContextAware {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process, final Env env) throws WorkflowManagerException { public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process) throws WorkflowManagerException {
if (node.isSucessNode()) { if (node.isSucessNode()) {
return new SuccessNode(); return new SuccessNode();
} else if (StringUtils.isBlank(node.getType())) { } else if (StringUtils.isBlank(node.getType())) {
return new DefaultJobNode(node.getName()); return new DefaultJobNode(node.getName());
} else { } else {
// TODO: Considerare i nodi annotati con WfNode
final ProcessNode pnode = this.applicationContext.getBean(beanNamePrefix + node.getType(), ProcessNode.class); final ProcessNode pnode = this.applicationContext.getBean(beanNamePrefix + node.getType(), ProcessNode.class);
if (pnode != null) { if (pnode != null) {
pnode.setNodeName(node.getName()); pnode.setNodeName(node.getName());
// I invoke the setter methods using the static params of the graph node
try {
PropertyAccessorFactory.forBeanPropertyAccess(pnode).setPropertyValues(node.resolveParams(env));
} catch (final Throwable e) {
throw new WorkflowManagerException(String.format("error setting parameters in wfNode %s", node.getName()), e);
}
if (pnode instanceof ProcessAware) { if (pnode instanceof ProcessAware) {
((ProcessAware) pnode).setProcess(process); ((ProcessAware) pnode).setProcess(process);
} }

View File

@ -0,0 +1,11 @@
package eu.dnetlib.manager.wf.workflows.util;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
public interface ProcessCallback {
void onSuccess(WorkflowProcess proc);
void onFail(WorkflowProcess proc);
}