refactoring

This commit is contained in:
Michele Artini 2024-01-11 09:56:20 +01:00
parent f572335a2f
commit 1ddc9be9fa
10 changed files with 238 additions and 197 deletions

View File

@ -4,6 +4,8 @@ public class WorkflowsConstants {
// TODO (LOW PRIORITY): remove unused constants
public static final String SUCCESS_NODE = "success";
public static final String DATASOURCE_PREFIX = "datasource:";
public static final String LOG_WF_NAME = "system:wfName";

View File

@ -0,0 +1,37 @@
package eu.dnetlib.domain.wfs.graph.runtime;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Graph implements Serializable {
private static final long serialVersionUID = 6472378218385082376L;
private Map<String, GraphNode> nodes = new HashMap<>();
private List<GraphArc> arcs = new ArrayList<>();
public Map<String, GraphNode> getNodes() {
return this.nodes;
}
public void setNodes(final Map<String, GraphNode> nodes) {
this.nodes = nodes;
}
public List<GraphArc> getArcs() {
return this.arcs;
}
public void setArcs(final List<GraphArc> arcs) {
this.arcs = arcs;
}
@Override
public String toString() {
return "\n************************\nNodes: " + this.nodes + "\nArcs: " + this.arcs + "\n************************\n";
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.domain.wfs.graph.runtime;
public class GraphArc {
private String from;
private String to;
private String condition;
public GraphArc() {}
public GraphArc(final String from, final String to, final String condition) {
this.from = from;
this.to = to;
this.condition = condition;
}
public String getFrom() {
return this.from;
}
public void setFrom(final String from) {
this.from = from;
}
public String getTo() {
return this.to;
}
public void setTo(final String to) {
this.to = to;
}
public String getCondition() {
return this.condition;
}
public void setCondition(final String condition) {
this.condition = condition;
}
@Override
public String toString() {
return String.format("[ %s -> %s ]", this.from, this.to);
}
}

View File

@ -1,23 +1,28 @@
package eu.dnetlib.wfs.graph;
package eu.dnetlib.domain.wfs.graph.runtime;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
public class GraphNode {
import eu.dnetlib.domain.wfs.WorkflowsConstants;
public static final String SUCCESS_NODE = "success";
public class GraphNode implements Serializable {
private final String name;
private final String type;
private final boolean isStart;
private final boolean isJoin;
private final boolean isSuccessNode;
private final Map<String, Object> params;
private final Map<String, Object> envParams;
private final Map<String, String> outputEnvMap;
private static final long serialVersionUID = 2182058793660686197L;
private String name;
private String type;
private boolean isStart;
private boolean isJoin;
private boolean isSuccessNode;
private Map<String, Object> params;
private Map<String, Object> envParams;
private Map<String, String> outputEnvMap;
public GraphNode() {}
private GraphNode(final String name,
final String type,
@ -62,37 +67,17 @@ public class GraphNode {
}
public static GraphNode newSuccessNode() {
return new GraphNode(SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>(), new HashMap<>());
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public boolean isStart() {
return isStart;
}
public boolean isJoin() {
return isJoin;
}
public boolean isSuccessNode() {
return isSuccessNode;
return new GraphNode(WorkflowsConstants.SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>(), new HashMap<>());
}
@Override
public String toString() {
final StringWriter sw = new StringWriter();
sw.append("[ name: ");
sw.append(name);
if (StringUtils.isNotBlank(type)) {
sw.append(this.name);
if (StringUtils.isNotBlank(this.type)) {
sw.append(", type: ");
sw.append(type);
sw.append(this.type);
}
if (isStart()) {
sw.append(" - isStart");
@ -104,16 +89,68 @@ public class GraphNode {
return sw.toString();
}
public String getName() {
return this.name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return this.type;
}
public void setType(final String type) {
this.type = type;
}
public boolean isStart() {
return this.isStart;
}
public void setStart(final boolean isStart) {
this.isStart = isStart;
}
public boolean isJoin() {
return this.isJoin;
}
public void setJoin(final boolean isJoin) {
this.isJoin = isJoin;
}
public boolean isSuccessNode() {
return this.isSuccessNode;
}
public void setSuccessNode(final boolean isSuccessNode) {
this.isSuccessNode = isSuccessNode;
}
public Map<String, Object> getParams() {
return params;
return this.params;
}
public void setParams(final Map<String, Object> params) {
this.params = params;
}
public Map<String, Object> getEnvParams() {
return envParams;
return this.envParams;
}
public void setEnvParams(final Map<String, Object> envParams) {
this.envParams = envParams;
}
public Map<String, String> getOutputEnvMap() {
return outputEnvMap;
return this.outputEnvMap;
}
public void setOutputEnvMap(final Map<String, String> outputEnvMap) {
this.outputEnvMap = outputEnvMap;
}
}

View File

@ -1,84 +0,0 @@
package eu.dnetlib.wfs.graph;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.wfs.procs.Token;
public class Graph {
private final Map<String, GraphNode> nodes = new HashMap<>();
private List<GraphArc> arcs = new ArrayList<>();
public void addArc(final GraphArc arc) {
arcs.add(arc);
}
public void addNode(final GraphNode node) {
nodes.put(node.getName(), node);
}
public Set<String> nodeNames() {
return nodes.keySet();
}
public Collection<GraphNode> nodes() {
return nodes.values();
}
public GraphNode getNode(final String name) {
return nodes.get(name);
}
public List<GraphArc> getArcs() {
return arcs;
}
public void setArcs(final List<GraphArc> arcs) {
this.arcs = arcs;
}
public Set<GraphNode> startNodes() {
final Set<GraphNode> res = new HashSet<>();
for (final GraphNode n : nodes.values()) {
if (n.isStart()) {
res.add(n);
}
}
return res;
}
public Set<GraphNode> nextNodes(final GraphNode current, final Token token) {
return arcs.stream()
.filter(arc -> StringUtils.equals(arc.getFrom(), current.getName()))
.filter(arc -> arc.isValid(token))
.map(GraphArc::getTo)
.distinct()
.map(to -> nodes.get(to))
.collect(Collectors.toSet());
}
public int getNumberOfIncomingArcs(final GraphNode node) {
int count = 0;
for (final GraphArc arc : arcs) {
if (arc.getTo().equals(node.getName())) {
count++;
}
}
return count;
}
@Override
public String toString() {
return "\n************************\nNodes: " + nodes + "\nArcs: " + arcs + "\n************************\n";
}
}

View File

@ -1,37 +0,0 @@
package eu.dnetlib.wfs.graph;
import java.util.function.Function;
import eu.dnetlib.wfs.procs.Token;
public class GraphArc {
private final String from;
private final String to;
private final Function<Token, Boolean> condFunction;
public GraphArc(final String from, final String to, final Function<Token, Boolean> condFunction) {
this.from = from;
this.to = to;
this.condFunction = condFunction;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public boolean isValid(final Token token) {
if (condFunction != null) { return condFunction.apply(token); }
return true;
}
@Override
public String toString() {
return String.format("[ %s -> %s ]", from, to);
}
}

View File

@ -5,7 +5,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.PostConstruct;
@ -15,19 +14,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.env.Environment;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.graph.Arc;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.graph.Node;
import eu.dnetlib.domain.wfs.graph.runtime.Graph;
import eu.dnetlib.domain.wfs.graph.runtime.GraphArc;
import eu.dnetlib.domain.wfs.graph.runtime.GraphNode;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.procs.Token;
@Service
public class GraphLoader {
@ -65,7 +62,7 @@ public class GraphLoader {
throws WorkflowManagerException {
final Graph graph = new Graph();
graph.addNode(GraphNode.newSuccessNode());
graph.getNodes().put(WorkflowsConstants.SUCCESS_NODE, GraphNode.newSuccessNode());
for (final Node node : nodes) {
final String nodeName = node.getName();
@ -78,21 +75,17 @@ public class GraphLoader {
final Map<String, String> outputEnvMap = node.outputToEnvMap();
if (isStart) {
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams, outputEnvMap));
graph.getNodes().put(nodeName, GraphNode.newStartNode(nodeName, nodeType, params, envParams, outputEnvMap));
} else if (isJoin) {
graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params, envParams, outputEnvMap));
graph.getNodes().put(nodeName, GraphNode.newJoinNode(nodeName, nodeType, params, envParams, outputEnvMap));
} else {
graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams, outputEnvMap));
graph.getNodes().put(nodeName, GraphNode.newNode(nodeName, nodeType, params, envParams, outputEnvMap));
}
if ((node.getArcs() == null) || node.getArcs().isEmpty()) {
graph.addArc(new GraphArc(nodeName, GraphNode.SUCCESS_NODE, generateFunction(null)));
graph.getArcs().add(new GraphArc(nodeName, WorkflowsConstants.SUCCESS_NODE, null));
} else {
for (final Arc a : node.getArcs()) {
final String to = a.getTo();
final Function<Token, Boolean> condFunction = generateFunction(a.getCondition());
graph.addArc(new GraphArc(nodeName, to, condFunction));
}
node.getArcs().forEach(a -> graph.getArcs().add(new GraphArc(nodeName, a.getTo(), a.getCondition())));
}
}
@ -102,18 +95,6 @@ public class GraphLoader {
return graph;
}
private Function<Token, Boolean> generateFunction(final String condition) {
if (StringUtils.isBlank(condition)) { return env -> true; }
return token -> {
final ExpressionParser parser = new SpelExpressionParser();
final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv());
context.addPropertyAccessor(new MapAccessor());
return parser.parseExpression(condition).getValue(context, Boolean.class);
};
}
private void checkValidity(final Graph graph) throws WorkflowManagerException {
final Set<String> nodesFromArcs = new HashSet<>();
@ -125,7 +106,7 @@ public class GraphLoader {
if (StringUtils.isBlank(arc.getFrom()) || StringUtils.isBlank(arc.getFrom())) {
throw new WorkflowManagerException("Invalid arc: missing from e/o to");
}
if (StringUtils.equals(arc.getTo(), GraphNode.SUCCESS_NODE)) {
if (StringUtils.equals(arc.getTo(), WorkflowsConstants.SUCCESS_NODE)) {
foundSuccess = true;
}
nodesFromArcs.add(arc.getFrom());
@ -134,11 +115,11 @@ public class GraphLoader {
if (!foundSuccess) { throw new WorkflowManagerException("Arc to success not found"); }
final Collection<String> diff = CollectionUtils.disjunction(graph.nodeNames(), nodesFromArcs);
final Collection<String> diff = CollectionUtils.disjunction(graph.getNodes().keySet(), nodesFromArcs);
if (!diff.isEmpty()) { throw new WorkflowManagerException("Missing or invalid nodes in arcs: " + diff); }
for (final GraphNode n : graph.nodes()) {
for (final GraphNode n : graph.getNodes().values()) {
if (StringUtils.isBlank(n.getName())) { throw new WorkflowManagerException("Invalid node: missing name"); }
if (n.isStart()) {
foundStart = true;

View File

@ -13,11 +13,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.domain.wfs.graph.runtime.GraphNode;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;

View File

@ -4,11 +4,15 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -19,18 +23,24 @@ import org.postgresql.PGNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.graph.runtime.Graph;
import eu.dnetlib.domain.wfs.graph.runtime.GraphArc;
import eu.dnetlib.domain.wfs.graph.runtime.GraphNode;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.graph.GraphLoader;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
@ -182,7 +192,7 @@ public class ProcessEngine {
updateRunningJob(process, null);
try {
for (final GraphNode graphNode : process.getGraph().startNodes()) {
for (final GraphNode graphNode : startNodes(process.getGraph())) {
final Token token = process.newToken(process.getJobDetails().getInputParams());
executeNode(process, graphNode, token);
}
@ -197,7 +207,7 @@ public class ProcessEngine {
updateRunningJob(process, oldToken);
try {
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken)) {
for (final GraphNode graphNode : nextNodes(process.getGraph(), oldGraphNode, oldToken)) {
if (graphNode.isJoin() || graphNode.isSuccessNode()) {
if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<>());
@ -207,7 +217,7 @@ public class ProcessEngine {
list.add(oldToken);
if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) {
if (list.size() == getNumberOfIncomingArcs(process.getGraph(), graphNode)) {
final Token token = process.newToken(mergeEnvParams(list.toArray(new Token[list.size()])));
if (graphNode.isSuccessNode()) {
@ -229,6 +239,56 @@ public class ProcessEngine {
}
public Set<GraphNode> startNodes(final Graph graph) {
final Set<GraphNode> res = new HashSet<>();
for (final GraphNode n : graph.getNodes().values()) {
if (n.isStart()) {
res.add(n);
}
}
return res;
}
private int getNumberOfIncomingArcs(final Graph graph, final GraphNode node) {
int count = 0;
for (final GraphArc arc : graph.getArcs()) {
if (arc.getTo().equals(node.getName())) {
count++;
}
}
return count;
}
public Set<GraphNode> nextNodes(final Graph graph, final GraphNode current, final Token token) {
return graph.getArcs()
.stream()
.filter(arc -> StringUtils.equals(arc.getFrom(), current.getName()))
.filter(arc -> isValidArc(arc, token))
.map(GraphArc::getTo)
.distinct()
.map(to -> graph.getNodes().get(to))
.collect(Collectors.toSet());
}
private boolean isValidArc(final GraphArc arc, final Token token) {
final Function<Token, Boolean> condFunction = generateFunction(arc.getCondition());
if (condFunction != null) { return condFunction.apply(token); }
return true;
}
private Function<Token, Boolean> generateFunction(final String condition) {
if (StringUtils.isBlank(condition)) { return token -> true; }
return token -> {
final ExpressionParser parser = new SpelExpressionParser();
final StandardEvaluationContext context = new StandardEvaluationContext(token.getEnv());
context.addPropertyAccessor(new MapAccessor());
return parser.parseExpression(condition).getValue(context, Boolean.class);
};
}
private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException {
final Map<String, Object> params = new HashMap<>();

View File

@ -8,9 +8,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.wfs.graph.runtime.Graph;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.wfs.graph.Graph;
import eu.dnetlib.wfs.utils.ProcessCallback;
/**