167 lines
5.3 KiB
Java
167 lines
5.3 KiB
Java
package eu.dnetlib.manager.wf.workflows.procs;
|
|
|
|
import java.time.LocalDateTime;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.context.ApplicationContext;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import com.google.common.base.Throwables;
|
|
|
|
import eu.dnetlib.errors.WorkflowManagerException;
|
|
import eu.dnetlib.manager.history.WorkflowLogger;
|
|
import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
|
|
import eu.dnetlib.manager.wf.nodes.ProcessNode;
|
|
import eu.dnetlib.manager.wf.nodes.SuccessNode;
|
|
import eu.dnetlib.manager.wf.notification.EmailSender;
|
|
import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
|
|
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
|
|
|
|
@Service
|
|
public class ProcessEngine {
|
|
|
|
private static final Log log = LogFactory.getLog(ProcessEngine.class);
|
|
|
|
@Autowired
|
|
private EmailSender emailSender;
|
|
|
|
@Autowired
|
|
private WorkflowLogger wfLogger;
|
|
|
|
@Autowired
|
|
private ApplicationContext applicationContext;
|
|
|
|
public void startProcess(final WorkflowProcess process) {
|
|
log.info(process.getGraph());
|
|
|
|
log.info("Starting workflow: " + process);
|
|
|
|
final LocalDateTime now = LocalDateTime.now();
|
|
process.setStatus(WorkflowProcess.Status.running);
|
|
process.setStartDate(now);
|
|
process.setLastActivityDate(now);
|
|
|
|
try {
|
|
for (final GraphNode graphNode : process.getGraph().startNodes()) {
|
|
final Token token = process.newToken(process.getEnv().getAttributes());
|
|
executeNode(process, graphNode, token);
|
|
}
|
|
} catch (final Throwable e) {
|
|
log.error("WorkflowProcess node instantiation failed", e);
|
|
process.setStatus(WorkflowProcess.Status.failure);
|
|
}
|
|
}
|
|
|
|
private NodeCallback newNodeCallback(final WorkflowProcess process, final ProcessNode pNode, final GraphNode graphNode) {
|
|
return new NodeCallback() {
|
|
|
|
@Override
|
|
public void onComplete(final Token t) {
|
|
pNode.saveOutputParams(t.getEnv());
|
|
releaseToken(process, graphNode, t);
|
|
}
|
|
|
|
@Override
|
|
public void onFail(final Token t) {
|
|
pNode.saveOutputParams(t.getEnv());
|
|
completeProcess(process, t);
|
|
}
|
|
};
|
|
}
|
|
|
|
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
|
|
process.setLastActivityDate(LocalDateTime.now());
|
|
|
|
try {
|
|
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) {
|
|
if (graphNode.isJoin() || graphNode.isSucessNode()) {
|
|
if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
|
|
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<Token>());
|
|
}
|
|
|
|
final List<Token> list = process.getPausedJoinNodeTokens().get(graphNode.getName());
|
|
|
|
list.add(oldToken);
|
|
|
|
if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) {
|
|
final Token token = process.newToken(mergeEnvParams(list.toArray(new Token[list.size()])));
|
|
|
|
if (graphNode.isSucessNode()) {
|
|
completeProcess(process, token);
|
|
} else {
|
|
executeNode(process, graphNode, token);
|
|
}
|
|
}
|
|
} else {
|
|
final Token token = process.newToken(oldToken.getEnv().getAttributes());
|
|
executeNode(process, graphNode, token);
|
|
}
|
|
}
|
|
} catch (
|
|
|
|
final Throwable e) {
|
|
log.error("WorkflowProcess node instantiation failed", e);
|
|
process.setStatus(WorkflowProcess.Status.failure);
|
|
process.setError(e.getMessage());
|
|
process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
|
|
process.setLastActivityDate(LocalDateTime.now());
|
|
}
|
|
|
|
}
|
|
|
|
private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException {
|
|
process.setLastActivityDate(LocalDateTime.now());
|
|
|
|
final ProcessNode pNode = newProcessNode(graphNode, process);
|
|
|
|
// TODO Is it correct?
|
|
token.getEnv().addAttributes(graphNode.resolveParams(token.getEnv()));
|
|
|
|
pNode.initInputParams(token.getEnv());
|
|
pNode.execute(token, newNodeCallback(process, pNode, graphNode));
|
|
}
|
|
|
|
private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {
|
|
if (graphNode.isSucessNode()) {
|
|
return new SuccessNode();
|
|
} else if (StringUtils.isBlank(graphNode.getType())) {
|
|
return new DefaultJobNode(graphNode.getName());
|
|
} else {
|
|
final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType());
|
|
if (pnode != null) {
|
|
pnode.setNodeName(graphNode.getName());
|
|
if (pnode instanceof ProcessAware) {
|
|
((ProcessAware) pnode).setProcess(process);
|
|
}
|
|
return pnode;
|
|
} else {
|
|
log.error("cannot find bean of type " + graphNode.getType());
|
|
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
private Map<String, Object> mergeEnvParams(final Token... tokens) {
|
|
final Map<String, Object> map = new HashMap<>();
|
|
Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));
|
|
return map;
|
|
}
|
|
|
|
private void completeProcess(final WorkflowProcess process, final Token token) {
|
|
token.checkStatus();
|
|
process.complete(token);
|
|
wfLogger.saveLog(process.asLog());
|
|
emailSender.sendMails(process);
|
|
}
|
|
|
|
}
|