new-is-app-with-new-msro #13

Merged
michele.artini merged 19 commits from new-is-app-with-new-msro into new-is-app 2023-03-21 12:15:25 +01:00
9 changed files with 48 additions and 59 deletions
Showing only changes of commit 30991cabbf - Show all commits

View File

@ -194,16 +194,8 @@ public class WorkflowGraph implements Serializable {
private static final long serialVersionUID = 7866138976929522262L;
private String name;
private String to;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
private String condition;
public String getTo() {
return to;
@ -213,6 +205,13 @@ public class WorkflowGraph implements Serializable {
this.to = to;
}
public String getCondition() {
return condition;
}
public void setCondition(final String condition) {
this.condition = condition;
}
}
class NodeParam implements Serializable {

View File

@ -14,12 +14,9 @@ public abstract class AbstractJobNode extends ProcessNode {
try {
log.debug("START NODE: " + getBeanName());
beforeStart(token);
final String arc = execute(token.getEnv());
execute(token.getEnv());
beforeCompleted(token);
log.debug("END NODE (SUCCESS): " + getBeanName());
token.release(arc);
} catch (final Throwable e) {
log.error("got exception while executing workflow node", e);
log.debug("END NODE (FAILED): " + getBeanName());
@ -28,7 +25,7 @@ public abstract class AbstractJobNode extends ProcessNode {
}
}
abstract protected String execute(final Env env) throws Exception;
abstract protected void execute(final Env env) throws Exception;
protected void beforeStart(final Token token) {
// For optional overwrites

View File

@ -1,6 +1,5 @@
package eu.dnetlib.manager.wf.nodes;
import eu.dnetlib.manager.wf.workflows.graph.Arc;
import eu.dnetlib.manager.wf.workflows.procs.Token;
/**
@ -15,7 +14,6 @@ public final class DefaultJobNode extends ProcessNode {
@Override
public void execute(final Token token) {
token.setNextArc(Arc.DEFAULT_ARC);
token.release();
}
}

View File

@ -4,7 +4,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.workflows.graph.Arc;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@ -39,7 +38,6 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
@Override
public void onSuccess() {
log.debug("Child workflow has been completed successfully");
token.setNextArc(Arc.DEFAULT_ARC);
token.release();
}

View File

@ -1,23 +1,19 @@
package eu.dnetlib.manager.wf.workflows.graph;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Function;
import eu.dnetlib.manager.wf.workflows.procs.Env;
public class Arc {
public static final String DEFAULT_ARC = null;
private final String name;
private final String from;
private final String to;
private final Function<Env, Boolean> condFunction;
public Arc(final String name, final String from, final String to) {
this.name = name;
public Arc(final String from, final String to, final Function<Env, Boolean> condFunction) {
this.from = from;
this.to = to;
}
public String getName() {
return this.name;
this.condFunction = condFunction;
}
public String getFrom() {
@ -28,8 +24,17 @@ public class Arc {
return this.to;
}
public boolean isValid(final Env env) {
if (condFunction != null) {
return condFunction.apply(env);
} else {
return true;
}
}
@Override
public String toString() {
return String.format("[ %s: %s -> %s ]", StringUtils.isBlank(this.name) ? "DEFAULT" : this.name, this.from, this.to);
return String.format("[ %s -> %s, %s ]", this.from, this.to, this.condFunction != null ? "with cond" : "without cond");
}
}

View File

@ -7,9 +7,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.manager.wf.workflows.procs.Env;
public class Graph {
private final Map<String, GraphNode> nodes = new HashMap<>();
@ -53,14 +56,15 @@ public class Graph {
return res;
}
public Set<GraphNode> nextNodes(final GraphNode current, final String arcName) {
final Set<GraphNode> res = new HashSet<>();
for (final Arc arc : this.arcs) {
if (StringUtils.equals(arc.getFrom(), current.getName()) && StringUtils.equals(arc.getName(), arcName)) {
res.add(this.nodes.get(arc.getTo()));
}
}
return res;
public Set<GraphNode> nextNodes(final GraphNode current, final Env env) {
return arcs.stream()
.filter(arc -> StringUtils.equals(arc.getFrom(), current.getName()))
.filter(arc -> arc.isValid(env))
.map(arc -> arc.getTo())
.distinct()
.map(to -> nodes.get(to))
.collect(Collectors.toSet());
}
public int getNumberOfIncomingArcs(final GraphNode node) {

View File

@ -9,10 +9,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.util.NodeHelper;
@Service
@ -47,9 +49,9 @@ public class GraphLoader {
if (node.getArcs() != null) {
for (final WorkflowGraph.Arc a : node.getArcs()) {
final String arcName = a.getName();
final String to = a.getTo();
graph.addArc(new Arc(StringUtils.isNotBlank(arcName) ? arcName : Arc.DEFAULT_ARC, nodeName, to));
final Function<Env, Boolean> condFunction = generateFunction(a.getCondition());
graph.addArc(new Arc(nodeName, to, condFunction));
}
}
@ -61,6 +63,11 @@ public class GraphLoader {
return graph;
}
private Function<Env, Boolean> generateFunction(final String condition) {
// TODO Auto-generated method stub
return null;
}
private void checkValidity(final Graph graph) throws WorkflowManagerException {
final Set<String> nodesFromArcs = new HashSet<>();

View File

@ -68,7 +68,7 @@ public class ProcessEngine {
process.setLastActivityDate(LocalDateTime.now());
try {
for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) {
for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) {
if (node.isJoin() || node.isSucessNode()) {
if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) {
process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList<Token>());

View File

@ -5,7 +5,6 @@ import java.util.UUID;
import com.google.common.base.Throwables;
import eu.dnetlib.manager.wf.workflows.graph.Arc;
import eu.dnetlib.manager.wf.workflows.util.NodeTokenCallback;
import eu.dnetlib.manager.wf.workflows.util.ProgressProvider;
@ -21,7 +20,6 @@ public class Token {
private final NodeTokenCallback callback;
private boolean failed = false;
private LocalDateTime endDate = LocalDateTime.MIN;
private String nextArc = Arc.DEFAULT_ARC;
private boolean active = true;
private String error = "";
private String errorStackTrace = "";
@ -54,14 +52,6 @@ public class Token {
this.endDate = endDate;
}
public String getNextArc() {
return this.nextArc;
}
public void setNextArc(final String nextArc) {
this.nextArc = nextArc;
}
public boolean isActive() {
return this.active;
}
@ -78,15 +68,6 @@ public class Token {
this.failed = failed;
}
public void release(final String arcName) {
setNextArc(arcName);
setEndDate(LocalDateTime.now());
setActive(false);
if (this.callback != null) {
this.callback.onSuccess(this);
}
}
public void release() {
setEndDate(LocalDateTime.now());
setActive(false);