partial wf engine refactoring

This commit is contained in:
Michele Artini 2023-12-12 15:07:12 +01:00
parent 94c38ac79b
commit a7042a967f
4 changed files with 81 additions and 57 deletions

View File

@ -5,6 +5,7 @@ import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -17,7 +18,10 @@ import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.env.Env;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class ProcessNode implements BeanNameAware {
@ -28,15 +32,54 @@ public abstract class ProcessNode implements BeanNameAware {
private String nodeName;
public abstract void execute(final Token token, NodeCallback callback);
private GraphNode graphNode;
private WorkflowProcess process;
private ProcessEngine engine;
public void execute(final Token token, final Map<String, Object> params) {
try {
initInputParams(params);
final NodeCallback callback = new NodeCallback() {
@Override
public void onSuccess(final Token t) {
try {
saveOutputParams(t.getEnv());
engine.releaseToken(process, graphNode, t);
} catch (final WorkflowManagerException e) {
onFail(t);
}
}
@Override
public void onFail(final Token t) {
log.debug("FAILURE IN NODE + " + getNodeName());
engine.completeProcess(process, t);
}
};
if (isAsync()) {
Executors.newSingleThreadExecutor().execute(() -> execute(token, callback));
} else {
execute(token, callback);
}
} catch (final Throwable e) {
token.releaseAsFailed(e);
engine.completeProcess(process, token);
}
}
protected abstract void execute(final Token token, NodeCallback callback);
public final boolean isAsync() {
// return this.getClass().getAnnotation(WfNode.class).async;
return WfNode.async;
}
public final void initInputParams(final Map<String, Object> params) throws WorkflowManagerException {
private void initInputParams(final Map<String, Object> params) throws WorkflowManagerException {
for (final Field field : findFields(getClass(), WfInputParam.class)) {
final String annName = field.getAnnotation(WfInputParam.class).value();
final boolean optional = field.getAnnotation(WfInputParam.class).optional();
@ -57,8 +100,7 @@ public abstract class ProcessNode implements BeanNameAware {
}
}
public final void saveOutputParams(final Env env) throws WorkflowManagerException {
private void saveOutputParams(final Env env) throws WorkflowManagerException {
for (final Field field : findFields(getClass(), WfOutputParam.class)) {
final String annName = field.getAnnotation(WfOutputParam.class).value();
final boolean optional = field.getAnnotation(WfOutputParam.class).optional();
@ -69,6 +111,9 @@ public abstract class ProcessNode implements BeanNameAware {
env.setAttribute(StringUtils.isNotBlank(annName) ? annName : field.getName(), value);
}
if (log.isDebugEnabled()) {
env.getAttributes().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v));
}
}
private final void fieldValue(final Field field, final Object value) {
@ -127,6 +172,30 @@ public abstract class ProcessNode implements BeanNameAware {
this.nodeName = nodeName;
}
public WorkflowProcess getProcess() {
return process;
}
public void setProcess(final WorkflowProcess process) {
this.process = process;
}
public GraphNode getGraphNode() {
return graphNode;
}
public void setGraphNode(final GraphNode graphNode) {
this.graphNode = graphNode;
}
public ProcessEngine getEngine() {
return engine;
}
public void setEngine(final ProcessEngine engine) {
this.engine = engine;
}
@Override
public String toString() {
return String.format("[node beanName=%s, name=%s, object: %s]", beanName, nodeName, super.toString());

View File

@ -17,7 +17,6 @@ import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.procs.ProcessAware;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.service.WfExecutorService;
@ -26,7 +25,7 @@ import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WfProcessUtils;
@WfNode("launchWorkflow")
public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
public class LaunchWorkflowJobNode extends ProcessNode {
private static final Log log = LogFactory.getLog(LaunchWorkflowJobNode.class);

View File

@ -1,10 +0,0 @@
package eu.dnetlib.wfs.procs;
/**
* Created by michele on 24/11/15.
*/
public interface ProcessAware {
void setProcess(WorkflowProcess process);
}

View File

@ -6,7 +6,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -25,7 +24,6 @@ import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.NodeCallback;
@Service
public class ProcessEngine {
@ -62,32 +60,6 @@ public class ProcessEngine {
}
}
private NodeCallback newNodeCallback(final WorkflowProcess process, final ProcessNode pNode, final GraphNode graphNode) {
return new NodeCallback() {
@Override
public void onSuccess(final Token t) {
try {
pNode.saveOutputParams(t.getEnv());
releaseToken(process, graphNode, t);
} catch (final WorkflowManagerException e) {
onFail(t);
}
}
@Override
public void onFail(final Token t) {
try {
pNode.saveOutputParams(t.getEnv());
} catch (final WorkflowManagerException e) {
// Probably the LOG LEVEL should be DEBUG
log.warn("Output parameters not correctly saved");
}
completeProcess(process, t);
}
};
}
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
process.setLastActivityDate(LocalDateTime.now());
@ -140,15 +112,8 @@ public class ProcessEngine {
}
final ProcessNode pNode = newProcessNode(graphNode, process);
pNode.initInputParams(params);
final NodeCallback callback = newNodeCallback(process, pNode, graphNode);
if (pNode.isAsync()) {
Executors.newSingleThreadExecutor().execute(() -> pNode.execute(token, callback));
} else {
pNode.execute(token, callback);
}
pNode.execute(token, params);
}
@ -161,9 +126,10 @@ public class ProcessEngine {
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
}
pnode.setNodeName(graphNode.getName());
if (pnode instanceof ProcessAware) {
((ProcessAware) pnode).setProcess(process);
}
pnode.setProcess(process);
pnode.setGraphNode(graphNode);
pnode.setEngine(this);
return pnode;
}
@ -174,7 +140,7 @@ public class ProcessEngine {
return map;
}
private void completeProcess(final WorkflowProcess process, final Token token) {
public void completeProcess(final WorkflowProcess process, final Token token) {
token.checkStatus();
process.complete(token);