refactoring

This commit is contained in:
Michele Artini 2023-03-06 14:04:01 +01:00
parent 0bd5e32eed
commit e4443ef2c8
14 changed files with 417 additions and 560 deletions

View File

@ -1,28 +0,0 @@
package eu.dnetlib.manager.wf.model;
import java.io.Serializable;
public class GraphArc implements Serializable {
private static final long serialVersionUID = 7866138976929522262L;
private String name;
private String to;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getTo() {
return to;
}
public void setTo(final String to) {
this.to = to;
}
}

View File

@ -1,65 +0,0 @@
package eu.dnetlib.manager.wf.model;
import java.io.Serializable;
import java.util.List;
public class GraphNode implements Serializable {
private static final long serialVersionUID = -3695762832959801906L;
private String name;
private String type;
private boolean isStart = false;
private boolean isJoin = false;
private List<GraphArc> arcs;
private List<GraphParameter> params;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public boolean isStart() {
return isStart;
}
public void setStart(final boolean isStart) {
this.isStart = isStart;
}
public boolean isJoin() {
return isJoin;
}
public void setJoin(final boolean isJoin) {
this.isJoin = isJoin;
}
public List<GraphArc> getArcs() {
return arcs;
}
public void setArcs(final List<GraphArc> arcs) {
this.arcs = arcs;
}
public List<GraphParameter> getParams() {
return params;
}
public void setParams(final List<GraphParameter> params) {
this.params = params;
}
}

View File

@ -1,39 +0,0 @@
package eu.dnetlib.manager.wf.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public class GraphParameter extends GraphParameterValue implements Serializable {
private static final long serialVersionUID = 1894419948433994453L;
private String name;
private List<GraphParameterValue> values;
private Map<String, GraphParameterValue> map;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public List<GraphParameterValue> getValues() {
return values;
}
public void setValues(final List<GraphParameterValue> values) {
this.values = values;
}
public Map<String, GraphParameterValue> getMap() {
return map;
}
public void setMap(final Map<String, GraphParameterValue> map) {
this.map = map;
}
}

View File

@ -1,45 +0,0 @@
package eu.dnetlib.manager.wf.model;
import java.io.Serializable;
public class GraphParameterValue implements Serializable {
private static final long serialVersionUID = 7815785723401725707L;
private String value;
private String ref;
private String property;
private String env;
public String getValue() {
return value;
}
public void setValue(final String value) {
this.value = value;
}
public String getRef() {
return ref;
}
public void setRef(final String ref) {
this.ref = ref;
}
public String getProperty() {
return property;
}
public void setProperty(final String property) {
this.property = property;
}
public String getEnv() {
return env;
}
public void setEnv(final String env) {
this.env = env;
}
}

View File

@ -0,0 +1,270 @@
package eu.dnetlib.manager.wf.model;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
public class WorkflowGraph implements Serializable {
private static final long serialVersionUID = 5919290887480115842L;
public List<WfParam> parameters;
public List<Node> graph;
public List<Node> getGraph() {
return graph;
}
public void setGraph(final List<Node> graph) {
this.graph = graph;
}
public List<WfParam> getParameters() {
return parameters;
}
public void setParameters(final List<WfParam> parameters) {
this.parameters = parameters;
}
class WfParam implements Serializable {
private static final long serialVersionUID = 5885589803738655166L;
private String name;
private String description;
private String type;
private String defaultValue;
private boolean required;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public String getDefaultValue() {
return defaultValue;
}
public void setDefaultValue(final String defaultValue) {
this.defaultValue = defaultValue;
}
public boolean isRequired() {
return required;
}
public void setRequired(final boolean required) {
this.required = required;
}
}
public class Node implements Serializable {
private static final long serialVersionUID = -3695762832959801906L;
private static final String regExRef = "\\$\\{(\\w*)\\}";
private String name;
private String type;
private boolean isStart = false;
private boolean isJoin = false;
private List<Arc> arcs;
private List<NodeParam> input;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public boolean isStart() {
return isStart;
}
public void setStart(final boolean isStart) {
this.isStart = isStart;
}
public boolean isJoin() {
return isJoin;
}
public void setJoin(final boolean isJoin) {
this.isJoin = isJoin;
}
public List<Arc> getArcs() {
return arcs;
}
public void setArcs(final List<Arc> arcs) {
this.arcs = arcs;
}
public List<NodeParam> getInput() {
return input;
}
public void setInput(final List<NodeParam> input) {
this.input = input;
}
public Map<String, String> findEnvParams() {
return input.stream()
.filter(p -> StringUtils.isNotBlank(p.getEnv()))
.collect(Collectors.toMap(NodeParam::getName, NodeParam::getEnv));
}
public Map<String, Object> calculateInitialParams(final Map<String, String> globalParams, final Environment environment) {
final Map<String, Object> map = new HashMap<>();
input.stream()
.filter(p -> StringUtils.isBlank(p.getEnv()))
.forEach(p -> map.put(p.getName(), calculateSimpleValue(p, globalParams, environment)));
return map;
}
private Object calculateSimpleValue(final NodeParam p, final Map<String, String> globalParams, final Environment environment) {
String value = p.getValue();
final String ref = p.getRef();
final String prop = p.getProperty();
if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) {
return globalParams.get(ref);
} else if (StringUtils.isNotBlank(value)) {
final Matcher matcher = Pattern.compile(regExRef, Pattern.MULTILINE).matcher(value);
while (matcher.find()) {
final String rName = matcher.group(1);
final String rValue = globalParams.get(rName);
if (StringUtils.isBlank(rValue)) { return null; }
value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue);
System.out.println("NEW VALUE " + value);
}
return value;
} else if (StringUtils.isNotBlank(prop)) {
return environment.getProperty(prop);
} else {
return null;
}
}
}
public class Arc implements Serializable {
private static final long serialVersionUID = 7866138976929522262L;
private String name;
private String to;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getTo() {
return to;
}
public void setTo(final String to) {
this.to = to;
}
}
class NodeParam implements Serializable {
private static final long serialVersionUID = 7815785723401725707L;
private String name;
private String value;
private String ref;
private String property;
private String env;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getValue() {
return value;
}
public void setValue(final String value) {
this.value = value;
}
public String getRef() {
return ref;
}
public void setRef(final String ref) {
this.ref = ref;
}
public String getProperty() {
return property;
}
public void setProperty(final String property) {
this.property = property;
}
public String getEnv() {
return env;
}
public void setEnv(final String env) {
this.env = env;
}
}
}

View File

@ -280,18 +280,6 @@ CREATE TABLE workflow_instances (
user_params jsonb NOT NULL DEFAULT '{}'
);
-- TO DELETE
CREATE TABLE workflow_expected_params (
wf_id text REFERENCES resource(id),
name text,
description text,
type text,
required boolean,
default_value text,
PRIMARY KEY (wf_id, name)
);
-- END
CREATE TABLE workflow_subscriptions (
wf_instance_id text NOT NULL REFERENCES workflow_instances(id),
condition text NOT NULL,

View File

@ -122,7 +122,9 @@ public class ScheduledWorkflowLauncher {
}
private boolean isNotRunning(final WorkflowInstance instance) {
for (final WorkflowProcess p : processRegistry.findProcsByOtherId(instance.getId())) {
final WorkflowProcess p = processRegistry.findProcsByInstanceId(instance.getId());
if (p != null) {
switch (p.getStatus()) {
case CREATED:
return false;
@ -132,6 +134,7 @@ public class ScheduledWorkflowLauncher {
break;
}
}
return true;
}

View File

@ -1,17 +1,10 @@
package eu.dnetlib.manager.wf.workflows.graph;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
@ -19,48 +12,41 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.GraphArc;
import eu.dnetlib.manager.wf.model.GraphParameter;
import eu.dnetlib.manager.wf.model.GraphParameterValue;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.workflows.util.NodeHelper;
@Service
public class GraphLoader {
private static final Log log = LogFactory.getLog(GraphLoader.class);
private final String regExRef = "\\$\\{(\\w*)\\}";
final Pattern pattern = Pattern.compile(regExRef, Pattern.MULTILINE);
@Autowired
private NodeHelper nodeHelper;
@Autowired
private Environment env;
private Environment environment;
public Graph loadGraph(final List<eu.dnetlib.manager.wf.model.GraphNode> workflowGraph, final Map<String, String> globalParams)
public Graph loadGraph(final WorkflowGraph workflowGraph, final Map<String, String> globalParams)
throws WorkflowManagerException {
final Graph graph = new Graph();
for (final eu.dnetlib.manager.wf.model.GraphNode node : workflowGraph) {
for (final WorkflowGraph.Node node : workflowGraph.getGraph()) {
final String nodeName = node.getName();
final String nodeType = node.getType();
final boolean isStart = node.isStart();
final boolean isJoin = node.isJoin();
final Map<String, GraphNodeParameter> params = calculateParamsForNode(node, globalParams);
final Map<String, Object> params = node.calculateInitialParams(globalParams, environment);
final Map<String, String> envParams = node.findEnvParams();
if (isStart) {
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params));
graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams));
} else if (isJoin) {
graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params));
graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params, envParams));
} else {
graph.addNode(GraphNode.newNode(nodeName, nodeType, params));
graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams));
}
if (graph.getArcs() != null) {
for (final GraphArc a : node.getArcs()) {
if (node.getArcs() != null) {
for (final WorkflowGraph.Arc a : node.getArcs()) {
final String arcName = a.getName();
final String to = a.getTo();
graph.addArc(new Arc(StringUtils.isNotBlank(arcName) ? arcName : Arc.DEFAULT_ARC, nodeName, to));
@ -75,75 +61,6 @@ public class GraphLoader {
return graph;
}
public Map<String, GraphNodeParameter> calculateParamsForNode(final eu.dnetlib.manager.wf.model.GraphNode node, final Map<String, String> globalParams) {
final Map<String, GraphNodeParameter> params = new HashMap<>();
if (node.getParams() != null) {
for (final GraphParameter p : node.getParams()) {
final String pName = p.getName();
final GraphNodeParameter pValue = calculateSimpleValue(p, globalParams);
if (pValue != null) {
params.put(pName, pValue);
} else if (p.getMap() != null) {
final Map<String, GraphNodeParameter> map = p.getMap()
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> {
final GraphNodeParameter gnp = calculateSimpleValue(e.getValue(), globalParams);
if (gnp == null) {
final String msg = String.format("missing value for param: \"%s\"", e.getKey());
log.debug(msg);
return GraphNodeParameter.newNullParam();
}
return gnp;
}));
params.put(pName, GraphNodeParameter.newMapParam(map));
} else if (p.getValues() != null) {
final List<GraphNodeParameter> list = p.getValues()
.stream()
.map(e -> calculateSimpleValue(e, globalParams))
.collect(Collectors.toList());
params.put(pName, GraphNodeParameter.newListParam(list));
}
}
}
return params;
}
private GraphNodeParameter calculateSimpleValue(final GraphParameterValue graphValue, final Map<String, String> globalParams) {
String value = graphValue.getValue();
final String ref = graphValue.getRef();
final String prop = graphValue.getProperty();
final String envRef = graphValue.getEnv();
if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) {
return GraphNodeParameter.newSimpleParam(globalParams.get(ref));
} else if (StringUtils.isNotBlank(envRef)) {
return GraphNodeParameter.newEnvParam(envRef);
} else if (StringUtils.isNotBlank(value)) {
final Matcher matcher = pattern.matcher(value);
while (matcher.find()) {
final String rName = matcher.group(1);
final String rValue = globalParams.get(rName);
if (StringUtils.isBlank(rValue)) { return null; }
value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue);
System.out.println("NEW VALUE " + value);
}
return GraphNodeParameter.newSimpleParam(value);
} else if (StringUtils.isNotBlank(prop)) {
return GraphNodeParameter.newSimpleParam(env.getProperty(prop));
} else {
return null;
}
}
private void checkValidity(final Graph graph) throws WorkflowManagerException {
final Set<String> nodesFromArcs = new HashSet<>();

View File

@ -2,9 +2,7 @@ package eu.dnetlib.manager.wf.workflows.graph;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -19,42 +17,48 @@ public class GraphNode {
private final boolean isStart;
private final boolean isJoin;
private final boolean isSucessNode;
private final Map<String, GraphNodeParameter> nodeParams;
private final Map<String, Object> params;
private final Map<String, String> envParams;
private GraphNode(final String name,
final String type,
final boolean isStart,
final boolean isJoin,
final boolean isSuccessNode,
final Map<String, GraphNodeParameter> nodeParams) {
final Map<String, Object> params,
final Map<String, String> envParams) {
this.name = name;
this.type = type;
this.isStart = isStart;
this.isJoin = isJoin;
this.isSucessNode = isSuccessNode;
this.nodeParams = nodeParams;
this.params = params;
this.envParams = envParams;
}
public static GraphNode newNode(final String name,
final String type,
final Map<String, GraphNodeParameter> nodeParams) {
return new GraphNode(name, type, false, false, false, nodeParams);
final Map<String, Object> params,
final Map<String, String> envParams) {
return new GraphNode(name, type, false, false, false, params, envParams);
}
public static GraphNode newStartNode(final String name,
final String type,
final Map<String, GraphNodeParameter> nodeParams) {
return new GraphNode(name, type, true, false, false, nodeParams);
final Map<String, Object> params,
final Map<String, String> envParams) {
return new GraphNode(name, type, true, false, false, params, envParams);
}
public static GraphNode newJoinNode(final String name,
final String type,
final Map<String, GraphNodeParameter> nodeParams) {
return new GraphNode(name, type, false, true, false, nodeParams);
final Map<String, Object> params,
final Map<String, String> envParams) {
return new GraphNode(name, type, false, true, false, params, envParams);
}
public static GraphNode newSuccessNode() {
return new GraphNode(SUCCESS_NODE, null, false, true, true, null);
return new GraphNode(SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>());
}
public String getName() {
@ -96,54 +100,30 @@ public class GraphNode {
return sw.toString();
}
public Map<String, GraphNodeParameter> getNodeParams() {
return this.nodeParams;
public Map<String, Object> getParams() {
return this.params;
}
public Map<String, String> getEnvParams() {
return this.envParams;
}
public Map<String, Object> resolveParamsWithNoEnv() {
return resolveParams(null);
}
@SuppressWarnings("unchecked")
public Map<String, Object> resolveParams(final Env env) {
final Map<String, Object> params = new HashMap<>();
final Map<String, Object> map = new HashMap<>();
if (this.nodeParams != null) {
for (final Map.Entry<String, GraphNodeParameter> e : this.nodeParams.entrySet()) {
final String pName = e.getKey();
final GraphNodeParameter param = e.getValue();
if (param.isEnvParam()) {
params.put(pName, resolveFromEnv(param, env));
} else if (param.isMap()) {
final Map<String, Object> map = new HashMap<>();
for (final Map.Entry<String, GraphNodeParameter> e1 : ((Map<String, GraphNodeParameter>) param.getValue()).entrySet()) {
map.put(e1.getKey(), e1.getValue().isEnvParam() ? resolveFromEnv(e1.getValue(), env) : e1.getValue().getValue());
}
params.put(pName, map);
} else if (param.isList()) {
params.put(pName, ((List<GraphNodeParameter>) param.getValue())
.stream()
.map(p -> p.isEnvParam() ? resolveFromEnv(p, env) : p.getValue())
.collect(Collectors.toList()));
} else {
params.put(pName, param.getValue());
}
}
if (this.params != null) {
this.params.forEach((k, v) -> map.put(k, v));
}
return params;
}
if (this.envParams != null && env != null) {
this.envParams.forEach((k, v) -> map.put(k, env.getAttribute(v)));
}
private Object resolveFromEnv(final GraphNodeParameter param, final Env env) {
return env != null ? env.getAttribute(param.getEnvVariable()) : "[this value will be resolved using the runtime ENV]";
return map;
}
}

View File

@ -1,56 +0,0 @@
package eu.dnetlib.manager.wf.workflows.graph;
import java.util.List;
import java.util.Map;
public class GraphNodeParameter {
private final Object value;
private final String envVariable;
private GraphNodeParameter(final Object value, final String envVariable) {
this.value = value;
this.envVariable = envVariable;
}
public static GraphNodeParameter newNullParam() {
return new GraphNodeParameter(null, null);
}
public static GraphNodeParameter newSimpleParam(final Object value) {
return new GraphNodeParameter(value, null);
}
public static GraphNodeParameter newMapParam(final Map<String, GraphNodeParameter> map) {
return new GraphNodeParameter(map, null);
}
public static GraphNodeParameter newListParam(final List<GraphNodeParameter> list) {
return new GraphNodeParameter(list, null);
}
public static GraphNodeParameter newEnvParam(final String envVariable) {
return new GraphNodeParameter(null, envVariable);
}
public Object getValue() {
return this.value;
}
public boolean isEnvParam() {
return this.envVariable != null;
}
public String getEnvVariable() {
return this.envVariable;
}
public boolean isMap() {
return this.value != null && (this.value instanceof Map);
}
public boolean isList() {
return this.value != null && (this.value instanceof List);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.manager.wf.workflows.procs;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@ -12,8 +11,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.GraphNode;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
@ -29,30 +30,19 @@ public class ProcessFactory {
@Autowired
private GraphLoader graphLoader;
public WorkflowProcess newProcess(final String wfId,
final String wfName,
final String wfFamily,
final List<GraphNode> graph,
final WorkflowInstance instance,
public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowGraph wfGraph,
final WorkflowInstance wfInstance,
final ProcessCallback processCallback,
final String parent) throws WorkflowManagerException {
final Map<String, String> globalParams = new HashMap<>();
globalParams.putAll(instance.getSystemParams());
globalParams.putAll(instance.getUserParams());
globalParams.putAll(wfInstance.getSystemParams());
globalParams.putAll(wfInstance.getUserParams());
return new WorkflowProcess(generateProcessId(),
wfName,
wfFamily,
instance.getDsId(),
instance.getDsName(),
instance.getApiId(),
graphLoader.loadGraph(graph, globalParams),
instance.getPriority(),
wfId,
instance.getId(),
globalParams,
processCallback, parent);
final Graph graph = graphLoader.loadGraph(wfGraph, globalParams);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, processCallback, parent);
}

View File

@ -1,11 +1,10 @@
package eu.dnetlib.manager.wf.workflows.procs;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
@ -13,18 +12,16 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
@Service
public class ProcessRegistry {
private static final Log log = LogFactory.getLog(ProcessRegistry.class);
private final BiMap<String, WorkflowProcess> procs = HashBiMap.create();
private final Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<>();
private final Map<String, WorkflowProcess> procs = new HashMap<>();
private final Map<String, WorkflowProcess> byInstanceId = new HashMap<>();
private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<>();
@ -46,18 +43,15 @@ public class ProcessRegistry {
return this.procs.get(procId);
}
public Set<WorkflowProcess> listProcesses() {
public Collection<WorkflowProcess> listProcesses() {
return this.procs.values();
}
public Collection<WorkflowProcess> findProcsByOtherId(final String id) {
synchronized (this) {
final Collection<WorkflowProcess> res = this.byOtherId.get(id);
return res != null ? res : new ArrayList<>();
}
public WorkflowProcess findProcsByInstanceId(final String id) {
return this.byInstanceId.get(id);
}
public String registerProcess(final WorkflowProcess process, final String... ids) throws WorkflowManagerException {
public String registerProcess(final WorkflowProcess process, final WorkflowInstance wfInstance) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registerd process: " + process);
@ -68,14 +62,7 @@ public class ProcessRegistry {
}
this.procs.put(process.getId(), process);
for (final String id : ids) {
synchronized (this) {
if (!this.byOtherId.containsKey(id)) {
this.byOtherId.put(id, new ArrayList<WorkflowProcess>());
}
this.byOtherId.get(id).add(process);
}
}
this.byInstanceId.put(wfInstance.getId(), process);
synchronized (this.pendingProcs) {
if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
@ -116,8 +103,13 @@ public class ProcessRegistry {
synchronized (this) {
final WorkflowProcess process = this.procs.remove(procId);
if (process != null) {
for (final Collection<WorkflowProcess> processes : this.byOtherId.values()) {
processes.remove(process);
final Optional<String> instanceId = this.byInstanceId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(e -> e.getKey())
.findFirst();
if (instanceId.isPresent()) {
this.byInstanceId.remove(instanceId, process);
}
}
}

View File

@ -1,8 +1,7 @@
package eu.dnetlib.manager.wf.workflows.procs;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -14,7 +13,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dsm.DsmService;
@ -22,7 +20,7 @@ import eu.dnetlib.errors.DsmException;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.GraphNode;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
@ -111,38 +109,34 @@ public class WorkflowExecutor implements Stoppable {
return startWorkflowInstance(instance, processCallback, parent);
}
public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent)
public String startWorkflowInstance(final WorkflowInstance wfInstance, final ProcessCallback processCallback, final String parent)
throws WorkflowManagerException {
if (!instance.isEnabled() || !instance.isConfigured()) {
log.warn("Wf instance " + instance.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf instance " + instance.getId() + " is not ready to start");
if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) {
log.warn("Wf instance " + wfInstance.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf instance " + wfInstance.getId() + " is not ready to start");
}
final SimpleResource wf = simpleResourceRepository
.findById(instance.getWorkflow())
final SimpleResource wfMetadata = simpleResourceRepository
.findById(wfInstance.getWorkflow())
.filter(r -> r.getType().equals("workflows"))
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow()));
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + wfInstance.getWorkflow()));
final List<GraphNode> graph = simpleResourceRepository.findContentById(wf.getId())
final WorkflowGraph wfGraph = simpleResourceRepository.findContentById(wfMetadata.getId())
.map(s -> {
final ObjectMapper mapper = new ObjectMapper();
final TypeReference<List<GraphNode>> type = new TypeReference<List<GraphNode>>() {};
final List<GraphNode> list = new ArrayList<>();
try {
list.addAll(mapper.readValue(s, type));
return new ObjectMapper().readValue(s, WorkflowGraph.class);
} catch (final Exception e) {
log.error("Error parsing json", e);
return (WorkflowGraph) null;
}
return list;
})
.filter(list -> !list.isEmpty())
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wf.getId()));
.filter(Objects::nonNull)
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wf.getId(), wf.getName(), wf.getSubtype(), graph, instance, processCallback, parent);
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, processCallback, parent);
return processRegistry.registerProcess(process, instance.getId());
return processRegistry.registerProcess(process, wfInstance);
}
@Override

View File

@ -8,6 +8,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
@ -16,104 +18,96 @@ import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
*/
public class WorkflowProcess implements Comparable<WorkflowProcess> {
public enum Status {
CREATED,
EXECUTING,
SUCCESS,
FAILURE,
KILLED;
}
public enum StartMode {
AUTO,
MANUAL,
DISABLED
}
private final String id;
private final String name;
private final String family;
private String dsId;
private String dsName;
private String dsInterface;
private final SimpleResource wfMetadata;
private final WorkflowInstance wfInstance;
private final Graph graph;
private final ProcessCallback callback;
private final int priority;
private final Env env;
private final List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate;
private Status status;
private LocalDateTime startDate = LocalDateTime.MIN;
private LocalDateTime endDate = LocalDateTime.MIN;
private final String wfId;
private final String wfInstanceId;
private Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
private Map<String, String> globalParams;
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
private final Map<String, String> globalParams;
private String error;
private String errorStacktrace;
private Map<String, String> outputParams = new HashMap<>();
private String parentProfileId;
private final Map<String, String> outputParams = new HashMap<>();
private final String parentProfileId;
public WorkflowProcess(
final String id,
final String name,
final String family,
final String dsId,
final String dsName,
final String dsInterface,
final SimpleResource wfMetadata,
final WorkflowInstance wfInstance,
final Graph graph,
final int priority,
final String wfId,
final String wfInstanceId,
final Map<String, String> globalParams,
final ProcessCallback callback,
final String parentProfileId) {
this.id = id;
this.name = name;
this.family = family;
this.dsId = dsId;
this.dsName = dsName;
this.dsInterface = dsInterface;
this.wfMetadata = wfMetadata;
this.wfInstance = wfInstance;
this.graph = graph;
this.priority = priority;
this.callback = callback;
this.status = Status.CREATED;
this.env = new Env();
this.wfId = wfId;
this.wfInstanceId = wfInstanceId;
this.globalParams = globalParams;
this.lastActivityDate = LocalDateTime.now();
this.parentProfileId = parentProfileId;
}
public void setParentProfileId(final String parentProfileId) {
this.parentProfileId = parentProfileId;
}
public String getDsId() {
return dsId;
}
public void setDsId(final String dsId) {
this.dsId = dsId;
}
public String getDsName() {
return dsName;
}
public void setDsName(final String dsName) {
this.dsName = dsName;
}
public String getDsInterface() {
return dsInterface;
}
public void setDsInterface(final String dsInterface) {
this.dsInterface = dsInterface;
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
return pausedJoinNodeTokens;
}
public void setPausedJoinNodeTokens(final Map<String, List<Token>> pausedJoinNodeTokens) {
this.pausedJoinNodeTokens = pausedJoinNodeTokens;
}
public String getId() {
return id;
}
public String getName() {
return name;
return wfMetadata.getName();
}
public String getFamily() {
return wfMetadata.getSubtype();
}
public String getWfId() {
return wfMetadata.getId();
}
public String getWfInstanceId() {
return wfInstance.getId();
}
public int getPriority() {
return wfInstance.getPriority();
}
public String getDsId() {
return wfInstance.getId();
}
public String getDsName() {
return wfInstance.getDsName();
}
public String getDsInterface() {
return wfInstance.getApiId();
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
return pausedJoinNodeTokens;
}
public Env getEnv() {
@ -144,10 +138,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return callback;
}
public int getPriority() {
return priority;
}
public boolean isTerminated() {
switch (status) {
case SUCCESS:
@ -169,7 +159,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
@Override
public String toString() {
return String.format("[process id='%s' name='%s']", id, name);
return String.format("[process id='%s' name='%s']", id, wfMetadata.getName());
}
@Override
@ -181,22 +171,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return globalParams;
}
public void setGlobalParams(final Map<String, String> globalParams) {
this.globalParams = globalParams;
}
public String getFamily() {
return family;
}
public String getWfId() {
return wfId;
}
public String getWfInstanceId() {
return wfInstanceId;
}
public void setStartDate(final LocalDateTime startDate) {
this.startDate = startDate;
}
@ -213,20 +187,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return endDate;
}
public enum Status {
CREATED,
EXECUTING,
SUCCESS,
FAILURE,
KILLED;
}
public enum StartMode {
AUTO,
MANUAL,
DISABLED
}
public String getError() {
return error;
}
@ -243,10 +203,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
this.errorStacktrace = errorStacktrace;
}
public void setOutputParams(final Map<String, String> outputParams) {
this.outputParams = outputParams;
}
public Map<String, String> getOutputParams() {
return outputParams;
}