exceptions in wf execution
This commit is contained in:
parent
89773c53a9
commit
edec46df93
|
@ -4,5 +4,5 @@ public interface DnetCallback<T> {
|
|||
|
||||
void onSuccess(T t);
|
||||
|
||||
void onFail(T t);
|
||||
void onFail(T t, Throwable e);
|
||||
}
|
||||
|
|
|
@ -256,6 +256,16 @@ public class WfTemplate implements Serializable {
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<String, String> outputToEnvMap() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
for (final NodeOutputParam p : output) {
|
||||
if (StringUtils.isBlank(p.getEnv())) {
|
||||
map.put(p.getName(), p.getEnv());
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
||||
@JsonInclude(Include.NON_NULL)
|
||||
|
|
|
@ -17,6 +17,7 @@ public class GraphNode {
|
|||
private final boolean isSuccessNode;
|
||||
private final Map<String, Object> params;
|
||||
private final Map<String, Object> envParams;
|
||||
private final Map<String, String> outputEnvMap;
|
||||
|
||||
private GraphNode(final String name,
|
||||
final String type,
|
||||
|
@ -24,7 +25,8 @@ public class GraphNode {
|
|||
final boolean isJoin,
|
||||
final boolean isSuccessNode,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
final Map<String, Object> envParams,
|
||||
final Map<String, String> outputEnvMap) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.isStart = isStart;
|
||||
|
@ -32,31 +34,35 @@ public class GraphNode {
|
|||
this.isSuccessNode = isSuccessNode;
|
||||
this.params = params;
|
||||
this.envParams = envParams;
|
||||
this.outputEnvMap = outputEnvMap;
|
||||
}
|
||||
|
||||
public static GraphNode newNode(final String name,
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, false, false, false, params, envParams);
|
||||
final Map<String, Object> envParams,
|
||||
final Map<String, String> outputEnvMap) {
|
||||
return new GraphNode(name, type, false, false, false, params, envParams, outputEnvMap);
|
||||
}
|
||||
|
||||
public static GraphNode newStartNode(final String name,
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, true, false, false, params, envParams);
|
||||
final Map<String, Object> envParams,
|
||||
final Map<String, String> outputEnvMap) {
|
||||
return new GraphNode(name, type, true, false, false, params, envParams, outputEnvMap);
|
||||
}
|
||||
|
||||
public static GraphNode newJoinNode(final String name,
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, false, true, false, params, envParams);
|
||||
final Map<String, Object> envParams,
|
||||
final Map<String, String> outputEnvMap) {
|
||||
return new GraphNode(name, type, false, true, false, params, envParams, outputEnvMap);
|
||||
}
|
||||
|
||||
public static GraphNode newSuccessNode() {
|
||||
return new GraphNode(SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>());
|
||||
return new GraphNode(SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
@ -106,4 +112,8 @@ public class GraphNode {
|
|||
return envParams;
|
||||
}
|
||||
|
||||
public Map<String, String> getOutputEnvMap() {
|
||||
return outputEnvMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,9 +5,6 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
/**
|
||||
* Created by michele on 19/11/15.
|
||||
*/
|
||||
|
@ -20,17 +17,15 @@ public class Token {
|
|||
private final LocalDateTime startDate;
|
||||
private LocalDateTime endDate;
|
||||
|
||||
private boolean failed;
|
||||
private boolean active;
|
||||
|
||||
private String error = "";
|
||||
private String errorStackTrace = "";
|
||||
private Throwable error;
|
||||
|
||||
public Token() {
|
||||
id = "token-" + UUID.randomUUID();
|
||||
startDate = LocalDateTime.now();
|
||||
failed = false;
|
||||
active = true;
|
||||
error = null;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -92,11 +87,7 @@ public class Token {
|
|||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
public void setFailed(final boolean failed) {
|
||||
this.failed = failed;
|
||||
return error != null;
|
||||
}
|
||||
|
||||
public void release() {
|
||||
|
@ -105,28 +96,8 @@ public class Token {
|
|||
}
|
||||
|
||||
public void releaseAsFailed(final Throwable e) {
|
||||
setEndDate(LocalDateTime.now());
|
||||
setActive(false);
|
||||
setFailed(true);
|
||||
setError(e.getMessage());
|
||||
setErrorStackTrace(ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
|
||||
public void releaseAsFailed(final String error) {
|
||||
setEndDate(LocalDateTime.now());
|
||||
setActive(false);
|
||||
setFailed(true);
|
||||
setError(error);
|
||||
}
|
||||
|
||||
public void checkStatus() {
|
||||
if (isActive()) {
|
||||
if (StringUtils.isNotBlank(error)) {
|
||||
releaseAsFailed(error);
|
||||
} else {
|
||||
release();
|
||||
}
|
||||
}
|
||||
setError(e);
|
||||
release();
|
||||
}
|
||||
|
||||
public String getProgressMessage() {
|
||||
|
@ -137,20 +108,12 @@ public class Token {
|
|||
this.progressMessage = progressMessage;
|
||||
}
|
||||
|
||||
public String getError() {
|
||||
public Throwable getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
public void setError(final String error) {
|
||||
public void setError(final Throwable error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
public String getErrorStackTrace() {
|
||||
return errorStackTrace;
|
||||
}
|
||||
|
||||
public void setErrorStackTrace(final String errorStackTrace) {
|
||||
this.errorStackTrace = errorStackTrace;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -73,13 +73,14 @@ public class GraphLoader {
|
|||
|
||||
final Map<String, Object> params = node.calculateInitialParams(globalParams, environment);
|
||||
final Map<String, Object> envParams = node.findEnvParams();
|
||||
final Map<String, String> outputEnvMap = node.outputToEnvMap();
|
||||
|
||||
if (isStart) {
|
||||
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams));
|
||||
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams, outputEnvMap));
|
||||
} else if (isJoin) {
|
||||
graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params, envParams));
|
||||
graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params, envParams, outputEnvMap));
|
||||
} else {
|
||||
graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams));
|
||||
graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams, outputEnvMap));
|
||||
}
|
||||
|
||||
if ((node.getArcs() == null) || node.getArcs().isEmpty()) {
|
||||
|
|
|
@ -21,7 +21,7 @@ public abstract class AbstractJobNode extends ProcessNode {
|
|||
} catch (final Throwable e) {
|
||||
log.error("got exception while executing workflow node", e);
|
||||
log.debug("END NODE (FAILED): " + getBeanName());
|
||||
callback.onFail(token);
|
||||
callback.onFail(token, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
|
||||
private ProcessEngine engine;
|
||||
|
||||
public void execute(final Token token, final Map<String, Object> params) {
|
||||
public void execute(final Token token, final Map<String, Object> params, final Map<String, String> outputToEnv) {
|
||||
try {
|
||||
initInputParams(params);
|
||||
|
||||
|
@ -46,16 +46,17 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
@Override
|
||||
public void onSuccess(final Token t) {
|
||||
try {
|
||||
saveOutputParams(t);
|
||||
saveOutputParams(t, outputToEnv);
|
||||
engine.releaseToken(process, graphNode, t);
|
||||
} catch (final WorkflowManagerException e) {
|
||||
onFail(t);
|
||||
onFail(t, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFail(final Token t) {
|
||||
log.debug("FAILURE IN NODE + " + getNodeName());
|
||||
public void onFail(final Token t, final Throwable e) {
|
||||
log.debug("FAILURE IN NODE " + getNodeName());
|
||||
token.releaseAsFailed(e);
|
||||
engine.completeProcess(process, t);
|
||||
}
|
||||
};
|
||||
|
@ -99,17 +100,28 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
}
|
||||
}
|
||||
|
||||
private void saveOutputParams(final Token token) throws WorkflowManagerException {
|
||||
private void saveOutputParams(final Token token, final Map<String, String> outputToEnv) throws WorkflowManagerException {
|
||||
|
||||
for (final Field field : findFields(getClass(), WfOutputParam.class)) {
|
||||
final String annName = field.getAnnotation(WfOutputParam.class).value();
|
||||
|
||||
final boolean optional = field.getAnnotation(WfOutputParam.class).optional();
|
||||
|
||||
final String outputParam = StringUtils.isNotBlank(annName) ? annName : field.getName();
|
||||
|
||||
if (!outputToEnv.containsKey(outputParam)) { throw new WorkflowManagerException("Not mapped output field: " + outputParam); }
|
||||
|
||||
final String envParam = outputToEnv.get(outputParam);
|
||||
|
||||
final Object value = fieldValue(field);
|
||||
|
||||
if (!optional && (value == null)) { throw new WorkflowManagerException("NULL value in OUTPUT parameter: " + field.getName()); }
|
||||
|
||||
token.setEnvAttribute(StringUtils.isNotBlank(annName) ? annName : field.getName(), value);
|
||||
token.setEnvAttribute(envParam, value);
|
||||
|
||||
log.debug("SAVED ENV PARAMETER " + envParam + "=" + value);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
token.getEnv().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v));
|
||||
}
|
||||
|
|
|
@ -86,9 +86,9 @@ public class LaunchWorkflowJobNode extends ProcessNode {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFail(final WorkflowProcess t) {
|
||||
public void onFail(final WorkflowProcess t, final Throwable e) {
|
||||
log.error("Child workflow is failed");
|
||||
nodeCallback.onFail(token);
|
||||
nodeCallback.onFail(token, e);
|
||||
}
|
||||
|
||||
});
|
||||
|
@ -101,7 +101,7 @@ public class LaunchWorkflowJobNode extends ProcessNode {
|
|||
|
||||
} catch (final Throwable e) {
|
||||
log.error("got exception while launching child workflow", e);
|
||||
nodeCallback.onFail(token);
|
||||
nodeCallback.onFail(token, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -259,7 +259,7 @@ public class ProcessEngine {
|
|||
|
||||
final ProcessNode pNode = newProcessNode(graphNode, process);
|
||||
|
||||
pNode.execute(token, params);
|
||||
pNode.execute(token, params, graphNode.getOutputEnvMap());
|
||||
|
||||
}
|
||||
|
||||
|
@ -287,7 +287,6 @@ public class ProcessEngine {
|
|||
}
|
||||
|
||||
public void completeProcess(final WorkflowProcess process, final Token token) {
|
||||
token.checkStatus();
|
||||
process.complete(token);
|
||||
|
||||
log.debug("Process completed " + process.getId());
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
|
||||
import eu.dnetlib.domain.resource.SimpleResource;
|
||||
|
@ -205,14 +206,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
|
||||
if (token.isFailed()) {
|
||||
setStatus(JobStatus.failure);
|
||||
setError(token.getError());
|
||||
setErrorStacktrace(token.getErrorStackTrace());
|
||||
setError(token.getError().getMessage());
|
||||
setErrorStacktrace(ExceptionUtils.getStackTrace(token.getError()));
|
||||
setLastActivityDate(LocalDateTime.now());
|
||||
}
|
||||
|
||||
if (callback != null) {
|
||||
if (token.isFailed()) {
|
||||
callback.onFail(this);
|
||||
callback.onFail(this, token.getError());
|
||||
} else {
|
||||
callback.onSuccess(this);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue