refactoring

This commit is contained in:
Michele Artini 2023-03-20 12:26:40 +01:00
parent 836c31b6d6
commit ab3c0851ed
14 changed files with 95 additions and 106 deletions

View File

@ -24,11 +24,11 @@ import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.CountedValue;
import eu.dnetlib.utils.Stoppable;

View File

@ -1,34 +1,60 @@
package eu.dnetlib.manager.wf.nodes;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
public abstract class AbstractJobNode extends ProcessNode {
private static final Log log = LogFactory.getLog(SimpleJobNode.class);
private static final Log log = LogFactory.getLog(AbstractJobNode.class);
protected void doExecute(final Token token) {
private final boolean async;
public AbstractJobNode(final boolean async) {
this.async = async;
}
private final ExecutorService executor = Executors.newCachedThreadPool();
@Override
public final void execute(final Token token, final ExecutionCallback<Token> callback) {
try {
log.debug("START NODE: " + getBeanName());
setProgressMessage(getNodeName());
token.setProgressMessage(getNodeName());
beforeStart(token);
execute(token.getEnv());
beforeCompleted(token);
if (isAsync()) {
executor.execute(() -> doExecute(token, callback));
} else {
doExecute(token, callback);
}
log.debug("END NODE (SUCCESS): " + getBeanName());
} catch (final Throwable e) {
log.error("got exception while executing workflow node", e);
log.debug("END NODE (FAILED): " + getBeanName());
beforeFailed(token);
token.releaseAsFailed(e);
callback.onFail(token);
}
}
abstract protected void execute(final Env env) throws Exception;
private final void doExecute(final Token token, final ExecutionCallback<Token> callback) {
execute(token);
beforeCompleted(token);
callback.onSuccess(token);
}
protected abstract void execute(Token token);
public final boolean isAsync() {
return async;
}
protected void beforeStart(final Token token) {
// For optional overwrites

View File

@ -1,28 +0,0 @@
package eu.dnetlib.manager.wf.nodes;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class AsyncJobNode extends AbstractJobNode {
/**
* logger.
*/
private static final Log log = LogFactory.getLog(AsyncJobNode.class);
private final ExecutorService executor = Executors.newCachedThreadPool();
@Override
public final void execute(final Token token) {
log.info("executing async node");
executor.execute(() -> doExecute(token));
}
}

View File

@ -5,15 +5,14 @@ import eu.dnetlib.manager.wf.workflows.procs.Token;
/**
* Created by michele on 26/11/15.
*/
public final class DefaultJobNode extends ProcessNode {
public final class DefaultJobNode extends AbstractJobNode {
public DefaultJobNode(final String name) {
super();
super(false);
setNodeName(name);
}
@Override
public void execute(final Token token) {
token.release();
}
public void execute(final Token token) {}
}

View File

@ -9,10 +9,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
/**
* Created by michele on 18/11/15.
@ -29,7 +29,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
private WorkflowProcess process;
@Override
public final void execute(final Token token) {
public final void execute(final Token token, final ExecutionCallback<Token> callback) {
try {
final WorkflowInstance instance = new WorkflowInstance();
@ -56,12 +56,14 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
public void onSuccess(final WorkflowProcess t) {
log.debug("Child workflow has been completed successfully");
token.release();
callback.onSuccess(token);
}
@Override
public void onFail(final WorkflowProcess t) {
log.error("Child workflow is failed");
token.releaseAsFailed("Child workflow is failed");
callback.onFail(token);
}
});
@ -70,11 +72,11 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
log.debug("The child workflow [instance: " + instance.getId() + "] is starting with procId: " + procId);
}
setProgressMessage("Launched sub workflow, proc: " + procId);
token.setProgressMessage("Launched sub workflow, proc: " + procId);
} catch (final Throwable e) {
log.error("got exception while launching child workflow", e);
token.releaseAsFailed(e);
callback.onFail(token);
}
}

View File

@ -3,19 +3,15 @@ package eu.dnetlib.manager.wf.nodes;
import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
/**
* Created by michele on 19/11/15.
*/
public abstract class ProcessNode implements BeanNameAware {
private String beanName;
private String nodeName;
private String progressMessage;
public abstract void execute(final Token token);
public abstract void execute(final Token token, ExecutionCallback<Token> callback);
public String getBeanName() {
return this.beanName;
@ -39,11 +35,4 @@ public abstract class ProcessNode implements BeanNameAware {
return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName);
}
public String getProgressMessage() {
return progressMessage;
}
public void setProgressMessage(final String progressMessage) {
this.progressMessage = progressMessage;
}
}

View File

@ -1,12 +1,9 @@
package eu.dnetlib.manager.wf.nodes;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public abstract class SimpleJobNode extends AbstractJobNode {
@Override
public final void execute(final Token token) {
doExecute(token);
public SimpleJobNode() {
super(false);
}
}

View File

@ -5,13 +5,14 @@ import eu.dnetlib.manager.wf.workflows.procs.Token;
/**
* Created by michele on 26/11/15.
*/
public class SuccessNode extends ProcessNode {
public class SuccessNode extends AbstractJobNode {
public SuccessNode() {
super();
super(false);
setNodeName("success");
}
@Override
public void execute(final Token token) {}
protected void execute(final Token token) {}
}

View File

@ -18,6 +18,7 @@ import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.notification.EmailSender;
import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.util.NodeHelper;
@Service
@ -45,11 +46,11 @@ public class ProcessEngine {
try {
for (final GraphNode node : process.getGraph().startNodes()) {
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv());
final Token token = prepareNewToken(process, node);
final Token token = new Token();
token.getEnv().addAttributes(process.getEnv().getAttributes());
process.getTokens().add(token);
pNode.execute(token);
pNode.execute(token, newNodeCallback(process, node));
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
@ -57,6 +58,21 @@ public class ProcessEngine {
}
}
private ExecutionCallback<Token> newNodeCallback(final WorkflowProcess process, final GraphNode node) {
return new ExecutionCallback<Token>() {
@Override
public void onSuccess(final Token t) {
releaseToken(process, node, t);
}
@Override
public void onFail(final Token t) {
completeProcess(process, t);
}
};
}
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
process.setLastActivityDate(LocalDateTime.now());
@ -72,7 +88,7 @@ public class ProcessEngine {
list.add(oldToken);
if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) {
final Token token = prepareNewToken(process, node);
final Token token = new Token();
token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()])));
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
@ -82,17 +98,17 @@ public class ProcessEngine {
if (node.isSucessNode()) {
completeProcess(process, token);
} else {
pNode.execute(token);
pNode.execute(token, newNodeCallback(process, node));
}
}
} else {
final Token token = prepareNewToken(process, node);
final Token token = new Token();
token.getEnv().addAttributes(oldToken.getEnv().getAttributes());
final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
process.getTokens().add(token);
process.setLastActivityDate(LocalDateTime.now());
pNode.execute(token);
pNode.execute(token, newNodeCallback(process, node));
}
}
} catch (final Throwable e) {
@ -105,21 +121,6 @@ public class ProcessEngine {
}
private Token prepareNewToken(final WorkflowProcess process, final GraphNode node) {
return new Token(new ExecutionCallback<Token>() {
@Override
public void onSuccess(final Token t) {
releaseToken(process, node, t);
}
@Override
public void onFail(final Token t) {
completeProcess(process, t);
}
});
}
private Map<String, Object> mergeEnvParams(final Token... tokens) {
final Map<String, Object> map = new HashMap<>();
Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));

View File

@ -16,6 +16,7 @@ import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
@Component
public class ProcessFactory {

View File

@ -14,21 +14,22 @@ public class Token {
private final String id;
private final Env env = new Env();
private String progressMessage;
private final LocalDateTime startDate;
private LocalDateTime endDate = LocalDateTime.MIN;
private LocalDateTime endDate;
private final ExecutionCallback<Token> callback;
private boolean failed;
private boolean active;
private boolean failed = false;
private boolean active = true;
private String error = "";
private String errorStackTrace = "";
public Token(final ExecutionCallback<Token> callback) {
public Token() {
this.id = "token-" + UUID.randomUUID();
this.startDate = LocalDateTime.now();
this.callback = callback;
this.failed = false;
this.active = true;
}
public String getId() {
@ -70,9 +71,6 @@ public class Token {
public void release() {
setEndDate(LocalDateTime.now());
setActive(false);
if (this.callback != null) {
this.callback.onSuccess(this);
}
}
public void releaseAsFailed(final Throwable e) {
@ -81,9 +79,6 @@ public class Token {
setFailed(true);
setError(e.getMessage());
setErrorStackTrace(Throwables.getStackTraceAsString(e));
if (this.callback != null) {
this.callback.onFail(this);
}
}
public void releaseAsFailed(final String error) {
@ -91,9 +86,6 @@ public class Token {
setActive(false);
setFailed(true);
setError(error);
if (this.callback != null) {
this.callback.onFail(this);
}
}
public void checkStatus() {
@ -106,6 +98,14 @@ public class Token {
}
}
public String getProgressMessage() {
return progressMessage;
}
public void setProgressMessage(final String progressMessage) {
this.progressMessage = progressMessage;
}
public String getError() {
return this.error;
}

View File

@ -13,6 +13,7 @@ import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.history.model.WfProcessExecution;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
/**

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.manager.wf.workflows.util;
public interface ExecutionCallback<T> {

View File

@ -10,8 +10,8 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
import eu.dnetlib.manager.wf.nodes.SuccessNode;
import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
import eu.dnetlib.manager.wf.workflows.procs.Env;