token and env
This commit is contained in:
parent
8ab33cf333
commit
89773c53a9
|
@ -68,11 +68,11 @@ public class WfConfiguration implements Serializable {
|
|||
|
||||
@Type(JsonType.class)
|
||||
@Column(name = "system_params")
|
||||
private Map<String, String> systemParams = new LinkedHashMap<>();
|
||||
private Map<String, Object> systemParams = new LinkedHashMap<>();
|
||||
|
||||
@Type(JsonType.class)
|
||||
@Column(name = "user_params")
|
||||
private Map<String, String> userParams = new LinkedHashMap<>();
|
||||
private Map<String, Object> userParams = new LinkedHashMap<>();
|
||||
|
||||
@Transient
|
||||
private String parentId;
|
||||
|
@ -197,19 +197,19 @@ public class WfConfiguration implements Serializable {
|
|||
this.destroyWf = destroyWf;
|
||||
}
|
||||
|
||||
public Map<String, String> getSystemParams() {
|
||||
public Map<String, Object> getSystemParams() {
|
||||
return systemParams;
|
||||
}
|
||||
|
||||
public void setSystemParams(final Map<String, String> systemParams) {
|
||||
public void setSystemParams(final Map<String, Object> systemParams) {
|
||||
this.systemParams = systemParams;
|
||||
}
|
||||
|
||||
public Map<String, String> getUserParams() {
|
||||
public Map<String, Object> getUserParams() {
|
||||
return userParams;
|
||||
}
|
||||
|
||||
public void setUserParams(final Map<String, String> userParams) {
|
||||
public void setUserParams(final Map<String, Object> userParams) {
|
||||
this.userParams = userParams;
|
||||
}
|
||||
|
||||
|
|
|
@ -217,15 +217,15 @@ public class WfTemplate implements Serializable {
|
|||
this.output = output;
|
||||
}
|
||||
|
||||
public Map<String, String> findEnvParams() {
|
||||
public Map<String, Object> findEnvParams() {
|
||||
return input.stream()
|
||||
.filter(p -> StringUtils.isNotBlank(p.getEnv()))
|
||||
.collect(Collectors.toMap(NodeInputParam::getName, NodeInputParam::getEnv));
|
||||
}
|
||||
|
||||
public Map<String, String> calculateInitialParams(final Map<String, String> globalParams, final Environment environment) {
|
||||
public Map<String, Object> calculateInitialParams(final Map<String, Object> globalParams, final Environment environment) {
|
||||
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
final Map<String, Object> map = new HashMap<>();
|
||||
for (final NodeInputParam p : input) {
|
||||
if (StringUtils.isBlank(p.getEnv())) {
|
||||
map.put(p.getName(), calculateSimpleValue(p, globalParams, environment));
|
||||
|
@ -235,23 +235,25 @@ public class WfTemplate implements Serializable {
|
|||
return map;
|
||||
}
|
||||
|
||||
private String calculateSimpleValue(final NodeInputParam p, final Map<String, String> globalParams, final Environment environment) {
|
||||
private Object calculateSimpleValue(final NodeInputParam p, final Map<String, Object> globalParams, final Environment environment) {
|
||||
String value = p.getValue();
|
||||
final String ref = p.getRef();
|
||||
final String prop = p.getProperty();
|
||||
|
||||
if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) { return globalParams.get(ref); }
|
||||
if (StringUtils.isNotBlank(ref) && (globalParams.get(ref) != null)) { return globalParams.get(ref); }
|
||||
|
||||
if (StringUtils.isNotBlank(value)) {
|
||||
final Matcher matcher = Pattern.compile(regExRef, Pattern.MULTILINE).matcher(value);
|
||||
while (matcher.find()) {
|
||||
final String rName = matcher.group(1);
|
||||
final String rValue = globalParams.get(rName);
|
||||
if (StringUtils.isBlank(rValue)) { return null; }
|
||||
value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue);
|
||||
final Object rValue = globalParams.get(rName);
|
||||
|
||||
value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue.toString());
|
||||
}
|
||||
return value;
|
||||
}
|
||||
if (StringUtils.isNotBlank(prop)) { return environment.getProperty(prop); }
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
package eu.dnetlib.wfs.env;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Created by michele on 23/11/15.
|
||||
*/
|
||||
public class Env {
|
||||
|
||||
private final Map<String, Object> attrs;
|
||||
|
||||
public Env() {
|
||||
this.attrs = new HashMap<>();
|
||||
}
|
||||
|
||||
public Env(final Map<String, Object> attrs) {
|
||||
this.attrs = attrs;
|
||||
}
|
||||
|
||||
public Map<String, Object> getAttributes() {
|
||||
return attrs;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
attrs.clear();
|
||||
}
|
||||
|
||||
public void addAttributes(final Map<String, Object> map) {
|
||||
if (map != null) {
|
||||
attrs.putAll(map);
|
||||
}
|
||||
}
|
||||
|
||||
public void setAttribute(final String name, final Object value) {
|
||||
attrs.put(name, value);
|
||||
}
|
||||
|
||||
public Object getAttribute(final String name) {
|
||||
return attrs.get(name);
|
||||
}
|
||||
|
||||
public <T> T getAttribute(final String name, Class<T> clazz) {
|
||||
return clazz.cast(attrs.get(name));
|
||||
}
|
||||
|
||||
public boolean hasAttribute(final String name) {
|
||||
return attrs.containsKey(name);
|
||||
}
|
||||
|
||||
public Object removeAttribute(final String name) {
|
||||
return attrs.remove(name);
|
||||
}
|
||||
}
|
|
@ -2,15 +2,15 @@ package eu.dnetlib.wfs.graph;
|
|||
|
||||
import java.util.function.Function;
|
||||
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
|
||||
public class Arc {
|
||||
|
||||
private final String from;
|
||||
private final String to;
|
||||
private final Function<Env, Boolean> condFunction;
|
||||
private final Function<Token, Boolean> condFunction;
|
||||
|
||||
public Arc(final String from, final String to, final Function<Env, Boolean> condFunction) {
|
||||
public Arc(final String from, final String to, final Function<Token, Boolean> condFunction) {
|
||||
this.from = from;
|
||||
this.to = to;
|
||||
this.condFunction = condFunction;
|
||||
|
@ -24,8 +24,8 @@ public class Arc {
|
|||
return to;
|
||||
}
|
||||
|
||||
public boolean isValid(final Env env) {
|
||||
if (condFunction != null) { return condFunction.apply(env); }
|
||||
public boolean isValid(final Token token) {
|
||||
if (condFunction != null) { return condFunction.apply(token); }
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
|
||||
public class Graph {
|
||||
|
||||
|
@ -56,10 +56,10 @@ public class Graph {
|
|||
return res;
|
||||
}
|
||||
|
||||
public Set<GraphNode> nextNodes(final GraphNode current, final Env env) {
|
||||
public Set<GraphNode> nextNodes(final GraphNode current, final Token token) {
|
||||
return arcs.stream()
|
||||
.filter(arc -> StringUtils.equals(arc.getFrom(), current.getName()))
|
||||
.filter(arc -> arc.isValid(env))
|
||||
.filter(arc -> arc.isValid(token))
|
||||
.map(Arc::getTo)
|
||||
.distinct()
|
||||
.map(to -> nodes.get(to))
|
||||
|
|
|
@ -14,44 +14,44 @@ public class GraphNode {
|
|||
private final String type;
|
||||
private final boolean isStart;
|
||||
private final boolean isJoin;
|
||||
private final boolean isSucessNode;
|
||||
private final Map<String, String> params;
|
||||
private final Map<String, String> envParams;
|
||||
private final boolean isSuccessNode;
|
||||
private final Map<String, Object> params;
|
||||
private final Map<String, Object> envParams;
|
||||
|
||||
private GraphNode(final String name,
|
||||
final String type,
|
||||
final boolean isStart,
|
||||
final boolean isJoin,
|
||||
final boolean isSuccessNode,
|
||||
final Map<String, String> params,
|
||||
final Map<String, String> envParams) {
|
||||
final String type,
|
||||
final boolean isStart,
|
||||
final boolean isJoin,
|
||||
final boolean isSuccessNode,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.isStart = isStart;
|
||||
this.isJoin = isJoin;
|
||||
this.isSucessNode = isSuccessNode;
|
||||
this.isSuccessNode = isSuccessNode;
|
||||
this.params = params;
|
||||
this.envParams = envParams;
|
||||
}
|
||||
|
||||
public static GraphNode newNode(final String name,
|
||||
final String type,
|
||||
final Map<String, String> params,
|
||||
final Map<String, String> envParams) {
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, false, false, false, params, envParams);
|
||||
}
|
||||
|
||||
public static GraphNode newStartNode(final String name,
|
||||
final String type,
|
||||
final Map<String, String> params,
|
||||
final Map<String, String> envParams) {
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, true, false, false, params, envParams);
|
||||
}
|
||||
|
||||
public static GraphNode newJoinNode(final String name,
|
||||
final String type,
|
||||
final Map<String, String> params,
|
||||
final Map<String, String> envParams) {
|
||||
final String type,
|
||||
final Map<String, Object> params,
|
||||
final Map<String, Object> envParams) {
|
||||
return new GraphNode(name, type, false, true, false, params, envParams);
|
||||
}
|
||||
|
||||
|
@ -60,33 +60,33 @@ public class GraphNode {
|
|||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
return name;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return this.type;
|
||||
return type;
|
||||
}
|
||||
|
||||
public boolean isStart() {
|
||||
return this.isStart;
|
||||
return isStart;
|
||||
}
|
||||
|
||||
public boolean isJoin() {
|
||||
return this.isJoin;
|
||||
return isJoin;
|
||||
}
|
||||
|
||||
public boolean isSucessNode() {
|
||||
return this.isSucessNode;
|
||||
public boolean isSuccessNode() {
|
||||
return isSuccessNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringWriter sw = new StringWriter();
|
||||
sw.append("[ name: ");
|
||||
sw.append(this.name);
|
||||
if (StringUtils.isNotBlank(this.type)) {
|
||||
sw.append(name);
|
||||
if (StringUtils.isNotBlank(type)) {
|
||||
sw.append(", type: ");
|
||||
sw.append(this.type);
|
||||
sw.append(type);
|
||||
}
|
||||
if (isStart()) {
|
||||
sw.append(" - isStart");
|
||||
|
@ -98,12 +98,12 @@ public class GraphNode {
|
|||
return sw.toString();
|
||||
}
|
||||
|
||||
public Map<String, String> getParams() {
|
||||
return this.params;
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
public Map<String, String> getEnvParams() {
|
||||
return this.envParams;
|
||||
public Map<String, Object> getEnvParams() {
|
||||
return envParams;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
package eu.dnetlib.wfs.procs;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
|
||||
/**
|
||||
* Created by michele on 19/11/15.
|
||||
*/
|
||||
public class Token {
|
||||
|
||||
private final String id;
|
||||
private final Env env = new Env();
|
||||
private final Map<String, Object> env = new HashMap<>();
|
||||
|
||||
private String progressMessage;
|
||||
private final LocalDateTime startDate;
|
||||
|
@ -27,26 +27,56 @@ public class Token {
|
|||
private String errorStackTrace = "";
|
||||
|
||||
public Token() {
|
||||
this.id = "token-" + UUID.randomUUID();
|
||||
this.startDate = LocalDateTime.now();
|
||||
this.failed = false;
|
||||
this.active = true;
|
||||
id = "token-" + UUID.randomUUID();
|
||||
startDate = LocalDateTime.now();
|
||||
failed = false;
|
||||
active = true;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return this.id;
|
||||
return id;
|
||||
}
|
||||
|
||||
public Env getEnv() {
|
||||
return this.env;
|
||||
public Map<String, Object> getEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
public void clearEnv() {
|
||||
env.clear();
|
||||
}
|
||||
|
||||
public void addEnvAttributes(final Map<String, Object> map) {
|
||||
if (map != null) {
|
||||
env.putAll(map);
|
||||
}
|
||||
}
|
||||
|
||||
public void setEnvAttribute(final String name, final Object value) {
|
||||
env.put(name, value);
|
||||
}
|
||||
|
||||
public Object getEnvAttribute(final String name) {
|
||||
return env.get(name);
|
||||
}
|
||||
|
||||
public <T> T getEnvAttribute(final String name, final Class<T> clazz) {
|
||||
return clazz.cast(env.get(name));
|
||||
}
|
||||
|
||||
public boolean hasEnvAttribute(final String name) {
|
||||
return env.containsKey(name);
|
||||
}
|
||||
|
||||
public Object removeEnvAttribute(final String name) {
|
||||
return env.remove(name);
|
||||
}
|
||||
|
||||
public LocalDateTime getStartDate() {
|
||||
return this.startDate;
|
||||
return startDate;
|
||||
}
|
||||
|
||||
public LocalDateTime getEndDate() {
|
||||
return this.endDate;
|
||||
return endDate;
|
||||
}
|
||||
|
||||
public void setEndDate(final LocalDateTime endDate) {
|
||||
|
@ -54,7 +84,7 @@ public class Token {
|
|||
}
|
||||
|
||||
public boolean isActive() {
|
||||
return this.active;
|
||||
return active;
|
||||
}
|
||||
|
||||
public void setActive(final boolean active) {
|
||||
|
@ -62,7 +92,7 @@ public class Token {
|
|||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return this.failed;
|
||||
return failed;
|
||||
}
|
||||
|
||||
public void setFailed(final boolean failed) {
|
||||
|
@ -108,7 +138,7 @@ public class Token {
|
|||
}
|
||||
|
||||
public String getError() {
|
||||
return this.error;
|
||||
return error;
|
||||
}
|
||||
|
||||
public void setError(final String error) {
|
||||
|
@ -116,7 +146,7 @@ public class Token {
|
|||
}
|
||||
|
||||
public String getErrorStackTrace() {
|
||||
return this.errorStackTrace;
|
||||
return errorStackTrace;
|
||||
}
|
||||
|
||||
public void setErrorStackTrace(final String errorStackTrace) {
|
|
@ -17,11 +17,11 @@ public class WfConfigurationUtils {
|
|||
|
||||
public static boolean isConfigured(final List<WfParam> wfTemplateParams, final WfConfiguration conf) {
|
||||
|
||||
final Map<String, String> confParams = allConfiguredParameters(wfTemplateParams, conf);
|
||||
final Map<String, Object> confParams = allConfiguredParameters(wfTemplateParams, conf);
|
||||
|
||||
final long count = wfTemplateParams.stream()
|
||||
.filter(WfParam::isRequired)
|
||||
.filter(p -> StringUtils.isAllBlank(p.getDefaultValue(), confParams.get(p.getName())))
|
||||
.filter(p -> StringUtils.isNotBlank(p.getDefaultValue()) || (confParams.get(p.getName()) != null))
|
||||
.count();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
|
@ -31,8 +31,8 @@ public class WfConfigurationUtils {
|
|||
return count == 0;
|
||||
}
|
||||
|
||||
public static Map<String, String> allConfiguredParameters(final List<WfParam> wfTemplateParams, final WfConfiguration conf) {
|
||||
final Map<String, String> all = new HashMap<>();
|
||||
public static Map<String, Object> allConfiguredParameters(final List<WfParam> wfTemplateParams, final WfConfiguration conf) {
|
||||
final Map<String, Object> all = new HashMap<>();
|
||||
|
||||
if (wfTemplateParams != null) {
|
||||
wfTemplateParams.stream()
|
||||
|
|
|
@ -24,8 +24,8 @@ import org.springframework.stereotype.Service;
|
|||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
import eu.dnetlib.wfs.annotations.WfNode;
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.nodes.ProcessNode;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
|
||||
@Service
|
||||
public class GraphLoader {
|
||||
|
@ -59,7 +59,7 @@ public class GraphLoader {
|
|||
log.info("************************************");
|
||||
}
|
||||
|
||||
public Graph loadGraph(final WfTemplate wfTemplate, final Map<String, String> globalParams)
|
||||
public Graph loadGraph(final WfTemplate wfTemplate, final Map<String, Object> globalParams)
|
||||
throws WorkflowManagerException {
|
||||
final Graph graph = new Graph();
|
||||
|
||||
|
@ -71,8 +71,8 @@ public class GraphLoader {
|
|||
final boolean isStart = node.isStart();
|
||||
final boolean isJoin = node.isJoin();
|
||||
|
||||
final Map<String, String> params = node.calculateInitialParams(globalParams, environment);
|
||||
final Map<String, String> envParams = node.findEnvParams();
|
||||
final Map<String, Object> params = node.calculateInitialParams(globalParams, environment);
|
||||
final Map<String, Object> envParams = node.findEnvParams();
|
||||
|
||||
if (isStart) {
|
||||
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams));
|
||||
|
@ -87,7 +87,7 @@ public class GraphLoader {
|
|||
} else {
|
||||
for (final WfTemplate.Arc a : node.getArcs()) {
|
||||
final String to = a.getTo();
|
||||
final Function<Env, Boolean> condFunction = generateFunction(a.getCondition());
|
||||
final Function<Token, Boolean> condFunction = generateFunction(a.getCondition());
|
||||
graph.addArc(new Arc(nodeName, to, condFunction));
|
||||
}
|
||||
}
|
||||
|
@ -99,12 +99,12 @@ public class GraphLoader {
|
|||
return graph;
|
||||
}
|
||||
|
||||
private Function<Env, Boolean> generateFunction(final String condition) {
|
||||
private Function<Token, Boolean> generateFunction(final String condition) {
|
||||
if (StringUtils.isBlank(condition)) { return env -> true; }
|
||||
return env -> {
|
||||
return token -> {
|
||||
final ExpressionParser parser = new SpelExpressionParser();
|
||||
|
||||
final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes());
|
||||
final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv());
|
||||
context.addPropertyAccessor(new MapAccessor());
|
||||
|
||||
return parser.parseExpression(condition).getValue(context, Boolean.class);
|
||||
|
|
|
@ -17,7 +17,6 @@ import eu.dnetlib.errors.WorkflowManagerException;
|
|||
import eu.dnetlib.wfs.annotations.WfInputParam;
|
||||
import eu.dnetlib.wfs.annotations.WfNode;
|
||||
import eu.dnetlib.wfs.annotations.WfOutputParam;
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.graph.GraphNode;
|
||||
import eu.dnetlib.wfs.procs.ProcessEngine;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
|
@ -47,7 +46,7 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
@Override
|
||||
public void onSuccess(final Token t) {
|
||||
try {
|
||||
saveOutputParams(t.getEnv());
|
||||
saveOutputParams(t);
|
||||
engine.releaseToken(process, graphNode, t);
|
||||
} catch (final WorkflowManagerException e) {
|
||||
onFail(t);
|
||||
|
@ -100,7 +99,7 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
}
|
||||
}
|
||||
|
||||
private void saveOutputParams(final Env env) throws WorkflowManagerException {
|
||||
private void saveOutputParams(final Token token) throws WorkflowManagerException {
|
||||
for (final Field field : findFields(getClass(), WfOutputParam.class)) {
|
||||
final String annName = field.getAnnotation(WfOutputParam.class).value();
|
||||
final boolean optional = field.getAnnotation(WfOutputParam.class).optional();
|
||||
|
@ -109,10 +108,10 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
|
||||
if (!optional && (value == null)) { throw new WorkflowManagerException("NULL value in OUTPUT parameter: " + field.getName()); }
|
||||
|
||||
env.setAttribute(StringUtils.isNotBlank(annName) ? annName : field.getName(), value);
|
||||
token.setEnvAttribute(StringUtils.isNotBlank(annName) ? annName : field.getName(), value);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
env.getAttributes().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v));
|
||||
token.getEnv().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -91,8 +91,8 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
|
|||
wfConfigurationRepository.save(conf);
|
||||
}
|
||||
|
||||
private Map<String, String> prepareSystemParams() {
|
||||
final Map<String, String> map = new LinkedHashMap<>();
|
||||
private Map<String, Object> prepareSystemParams() {
|
||||
final Map<String, Object> map = new LinkedHashMap<>();
|
||||
map.put("nativeMdStoreId", nativeMdStoreId);
|
||||
map.put("cleanedMdStoreId", cleanedMdStoreId);
|
||||
return map;
|
||||
|
|
|
@ -153,7 +153,7 @@ public class ProcessEngine {
|
|||
|
||||
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(job.getWfTemplateId(), WfTemplate.class);
|
||||
|
||||
final Map<String, String> globalParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
|
||||
final Map<String, Object> globalParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
|
||||
|
||||
final Graph graph = graphLoader.loadGraph(wfTmpl, globalParams);
|
||||
|
||||
|
@ -197,7 +197,7 @@ public class ProcessEngine {
|
|||
|
||||
try {
|
||||
for (final GraphNode graphNode : process.getGraph().startNodes()) {
|
||||
final Token token = process.newToken(process.getEnv().getAttributes());
|
||||
final Token token = process.newToken(process.getGlobalParams());
|
||||
executeNode(process, graphNode, token);
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
|
@ -210,8 +210,8 @@ public class ProcessEngine {
|
|||
process.setLastActivityDate(LocalDateTime.now());
|
||||
|
||||
try {
|
||||
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) {
|
||||
if (graphNode.isJoin() || graphNode.isSucessNode()) {
|
||||
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken)) {
|
||||
if (graphNode.isJoin() || graphNode.isSuccessNode()) {
|
||||
if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
|
||||
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<>());
|
||||
}
|
||||
|
@ -223,14 +223,14 @@ public class ProcessEngine {
|
|||
if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) {
|
||||
final Token token = process.newToken(mergeEnvParams(list.toArray(new Token[list.size()])));
|
||||
|
||||
if (graphNode.isSucessNode()) {
|
||||
if (graphNode.isSuccessNode()) {
|
||||
completeProcess(process, token);
|
||||
} else {
|
||||
executeNode(process, graphNode, token);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final Token token = process.newToken(oldToken.getEnv().getAttributes());
|
||||
final Token token = process.newToken(oldToken.getEnv());
|
||||
executeNode(process, graphNode, token);
|
||||
}
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ public class ProcessEngine {
|
|||
graphNode.getParams().forEach((k, v) -> params.put(k, v));
|
||||
}
|
||||
if (graphNode.getEnvParams() != null) {
|
||||
graphNode.getEnvParams().forEach((k, v) -> params.put(k, token.getEnv().getAttribute(v)));
|
||||
graphNode.getEnvParams().forEach((k, v) -> params.put(k, token.getEnvAttribute(v.toString())));
|
||||
}
|
||||
|
||||
final ProcessNode pNode = newProcessNode(graphNode, process);
|
||||
|
@ -264,7 +264,7 @@ public class ProcessEngine {
|
|||
}
|
||||
|
||||
private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {
|
||||
if (graphNode.isSucessNode()) { return new SuccessNode(); }
|
||||
if (graphNode.isSuccessNode()) { return new SuccessNode(); }
|
||||
if (StringUtils.isBlank(graphNode.getType())) { return new DefaultJobNode(graphNode.getName()); }
|
||||
final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType());
|
||||
if (pnode == null) {
|
||||
|
@ -282,7 +282,7 @@ public class ProcessEngine {
|
|||
|
||||
private Map<String, Object> mergeEnvParams(final Token... tokens) {
|
||||
final Map<String, Object> map = new HashMap<>();
|
||||
Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));
|
||||
Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv()));
|
||||
return map;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import eu.dnetlib.domain.wfs.JobStatus;
|
|||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.graph.Graph;
|
||||
import eu.dnetlib.wfs.utils.ProcessCallback;
|
||||
|
||||
|
@ -34,14 +33,13 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
private final WfConfiguration wfConf;
|
||||
private final Graph graph;
|
||||
private final ProcessCallback callback;
|
||||
private final Env env;
|
||||
private final List<Token> tokens = new CopyOnWriteArrayList<>();
|
||||
private LocalDateTime lastActivityDate;
|
||||
private JobStatus status;
|
||||
private LocalDateTime startDate = LocalDateTime.MIN;
|
||||
private LocalDateTime endDate = LocalDateTime.MIN;
|
||||
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
|
||||
private final Map<String, String> globalParams;
|
||||
private final Map<String, Object> globalParams;
|
||||
private String error;
|
||||
private String errorStacktrace;
|
||||
private final Map<String, String> outputParams = new HashMap<>();
|
||||
|
@ -51,7 +49,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
final SimpleResource wfMetadata,
|
||||
final WfConfiguration wfConf,
|
||||
final Graph graph,
|
||||
final Map<String, String> globalParams,
|
||||
final Map<String, Object> globalParams,
|
||||
final ProcessCallback callback) {
|
||||
this.id = id;
|
||||
this.wfMetadata = wfMetadata;
|
||||
|
@ -59,7 +57,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
this.graph = graph;
|
||||
this.callback = callback;
|
||||
status = JobStatus.created;
|
||||
env = new Env();
|
||||
this.globalParams = globalParams;
|
||||
lastActivityDate = LocalDateTime.now();
|
||||
}
|
||||
|
@ -108,10 +105,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
return pausedJoinNodeTokens;
|
||||
}
|
||||
|
||||
public Env getEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
public JobStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
@ -157,7 +150,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
return NumberUtils.compare(getPriority(), wp.getPriority());
|
||||
}
|
||||
|
||||
public Map<String, String> getGlobalParams() {
|
||||
public Map<String, Object> getGlobalParams() {
|
||||
return globalParams;
|
||||
}
|
||||
|
||||
|
@ -199,7 +192,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
|
||||
public Token newToken(final Map<String, Object> attrs) {
|
||||
final Token token = new Token();
|
||||
token.getEnv().addAttributes(attrs);
|
||||
token.addEnvAttributes(attrs);
|
||||
tokens.add(token);
|
||||
return token;
|
||||
}
|
||||
|
|
|
@ -12,39 +12,39 @@ import org.springframework.expression.ExpressionParser;
|
|||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||
|
||||
import eu.dnetlib.wfs.env.Env;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
|
||||
public class GraphLoaderTest {
|
||||
|
||||
private Env env;
|
||||
private Token token;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
env = new Env();
|
||||
env.setAttribute("author", "Michele Artini");
|
||||
env.setAttribute("age", 47);
|
||||
token = new Token();
|
||||
token.setEnvAttribute("author", "Michele Artini");
|
||||
token.setEnvAttribute("age", 47);
|
||||
}
|
||||
|
||||
@Test
|
||||
final void testExpressions() {
|
||||
assertTrue(evalFunction("age == 47").apply(env));
|
||||
assertTrue(evalFunction("age > 40").apply(env));
|
||||
assertTrue(evalFunction("author == 'Michele Artini'").apply(env));
|
||||
assertTrue(evalFunction("age == 47 && author == 'Michele Artini'").apply(env));
|
||||
assertTrue(evalFunction("age == 47 || author == 'Michele Artini'").apply(env));
|
||||
assertTrue(evalFunction("age == 47 || author == 'Claudio Atzori'").apply(env));
|
||||
assertTrue(evalFunction("age == 22 || author == 'Michele Artini'").apply(env));
|
||||
assertFalse(evalFunction("age != 47").apply(env));
|
||||
assertFalse(evalFunction("age < 40").apply(env));
|
||||
assertFalse(evalFunction("author != 'Michele Artini'").apply(env));
|
||||
assertFalse(evalFunction("age == 47 && author == 'Claudio Atzori'").apply(env));
|
||||
assertTrue(evalFunction("age == 47").apply(token));
|
||||
assertTrue(evalFunction("age > 40").apply(token));
|
||||
assertTrue(evalFunction("author == 'Michele Artini'").apply(token));
|
||||
assertTrue(evalFunction("age == 47 && author == 'Michele Artini'").apply(token));
|
||||
assertTrue(evalFunction("age == 47 || author == 'Michele Artini'").apply(token));
|
||||
assertTrue(evalFunction("age == 47 || author == 'Claudio Atzori'").apply(token));
|
||||
assertTrue(evalFunction("age == 22 || author == 'Michele Artini'").apply(token));
|
||||
assertFalse(evalFunction("age != 47").apply(token));
|
||||
assertFalse(evalFunction("age < 40").apply(token));
|
||||
assertFalse(evalFunction("author != 'Michele Artini'").apply(token));
|
||||
assertFalse(evalFunction("age == 47 && author == 'Claudio Atzori'").apply(token));
|
||||
}
|
||||
|
||||
private Function<Env, Boolean> evalFunction(final String f) {
|
||||
return env -> {
|
||||
private Function<Token, Boolean> evalFunction(final String f) {
|
||||
return token -> {
|
||||
final ExpressionParser parser = new SpelExpressionParser();
|
||||
|
||||
final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes());
|
||||
final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv());
|
||||
context.addPropertyAccessor(new MapAccessor());
|
||||
|
||||
return parser.parseExpression(f).getValue(context, Boolean.class);
|
||||
|
|
Loading…
Reference in New Issue