diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java deleted file mode 100644 index b5f4ac58..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java +++ /dev/null @@ -1,28 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; - -public class GraphArc implements Serializable { - - private static final long serialVersionUID = 7866138976929522262L; - - private String name; - private String to; - - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; - } - - public String getTo() { - return to; - } - - public void setTo(final String to) { - this.to = to; - } - -} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java deleted file mode 100644 index c506c8f1..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java +++ /dev/null @@ -1,65 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; -import java.util.List; - -public class GraphNode implements Serializable { - - private static final long serialVersionUID = -3695762832959801906L; - - private String name; - private String type; - private boolean isStart = false; - private boolean isJoin = false; - private List arcs; - private List params; - - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(final String type) { - this.type = type; - } - - public boolean isStart() { - return isStart; - } - - public void setStart(final boolean isStart) { - this.isStart = isStart; - } - - public boolean isJoin() { - return isJoin; - } - - public void setJoin(final boolean isJoin) { - this.isJoin = isJoin; - } - - public List getArcs() { - return arcs; - } - - public void setArcs(final List arcs) { - this.arcs = arcs; - } - - public List getParams() { - return params; - } - - public void setParams(final List params) { - this.params = params; - } - -} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java deleted file mode 100644 index f24b12c4..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java +++ /dev/null @@ -1,39 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -public class GraphParameter extends GraphParameterValue implements Serializable { - - private static final long serialVersionUID = 1894419948433994453L; - - private String name; - private List values; - private Map map; - - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; - } - - public List getValues() { - return values; - } - - public void setValues(final List values) { - this.values = values; - } - - public Map getMap() { - return map; - } - - public void setMap(final Map map) { - this.map = map; - } - -} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java deleted file mode 100644 index ee8267f9..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java +++ /dev/null @@ -1,45 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; - -public class GraphParameterValue implements Serializable { - - private static final long serialVersionUID = 7815785723401725707L; - - private String value; - private String ref; - private String property; - private String env; - - public String getValue() { - return value; - } - - public void setValue(final String value) { - this.value = value; - } - - public String getRef() { - return ref; - } - - public void setRef(final String ref) { - this.ref = ref; - } - - public String getProperty() { - return property; - } - - public void setProperty(final String property) { - this.property = property; - } - - public String getEnv() { - return env; - } - - public void setEnv(final String env) { - this.env = env; - } -} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowGraph.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowGraph.java new file mode 100644 index 00000000..eae06aa1 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowGraph.java @@ -0,0 +1,270 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.env.Environment; + +public class WorkflowGraph implements Serializable { + + private static final long serialVersionUID = 5919290887480115842L; + + public List parameters; + public List graph; + + public List getGraph() { + return graph; + } + + public void setGraph(final List graph) { + this.graph = graph; + } + + public List getParameters() { + return parameters; + } + + public void setParameters(final List parameters) { + this.parameters = parameters; + } + + class WfParam implements Serializable { + + private static final long serialVersionUID = 5885589803738655166L; + + private String name; + private String description; + private String type; + private String defaultValue; + private boolean required; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(final String description) { + this.description = description; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(final String defaultValue) { + this.defaultValue = defaultValue; + } + + public boolean isRequired() { + return required; + } + + public void setRequired(final boolean required) { + this.required = required; + } + + } + + public class Node implements Serializable { + + private static final long serialVersionUID = -3695762832959801906L; + + private static final String regExRef = "\\$\\{(\\w*)\\}"; + + private String name; + private String type; + private boolean isStart = false; + private boolean isJoin = false; + private List arcs; + private List input; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public boolean isStart() { + return isStart; + } + + public void setStart(final boolean isStart) { + this.isStart = isStart; + } + + public boolean isJoin() { + return isJoin; + } + + public void setJoin(final boolean isJoin) { + this.isJoin = isJoin; + } + + public List getArcs() { + return arcs; + } + + public void setArcs(final List arcs) { + this.arcs = arcs; + } + + public List getInput() { + return input; + } + + public void setInput(final List input) { + this.input = input; + } + + public Map findEnvParams() { + return input.stream() + .filter(p -> StringUtils.isNotBlank(p.getEnv())) + .collect(Collectors.toMap(NodeParam::getName, NodeParam::getEnv)); + } + + public Map calculateInitialParams(final Map globalParams, final Environment environment) { + final Map map = new HashMap<>(); + + input.stream() + .filter(p -> StringUtils.isBlank(p.getEnv())) + .forEach(p -> map.put(p.getName(), calculateSimpleValue(p, globalParams, environment))); + + return map; + } + + private Object calculateSimpleValue(final NodeParam p, final Map globalParams, final Environment environment) { + String value = p.getValue(); + final String ref = p.getRef(); + final String prop = p.getProperty(); + + if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) { + return globalParams.get(ref); + } else if (StringUtils.isNotBlank(value)) { + final Matcher matcher = Pattern.compile(regExRef, Pattern.MULTILINE).matcher(value); + while (matcher.find()) { + final String rName = matcher.group(1); + final String rValue = globalParams.get(rName); + if (StringUtils.isBlank(rValue)) { return null; } + value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue); + System.out.println("NEW VALUE " + value); + } + return value; + } else if (StringUtils.isNotBlank(prop)) { + return environment.getProperty(prop); + } else { + return null; + } + + } + + } + + public class Arc implements Serializable { + + private static final long serialVersionUID = 7866138976929522262L; + + private String name; + private String to; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getTo() { + return to; + } + + public void setTo(final String to) { + this.to = to; + } + + } + + class NodeParam implements Serializable { + + private static final long serialVersionUID = 7815785723401725707L; + + private String name; + private String value; + private String ref; + private String property; + private String env; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + public String getRef() { + return ref; + } + + public void setRef(final String ref) { + this.ref = ref; + } + + public String getProperty() { + return property; + } + + public void setProperty(final String property) { + this.property = property; + } + + public String getEnv() { + return env; + } + + public void setEnv(final String env) { + this.env = env; + } + + } + +} diff --git a/libs/dnet-is-common/src/main/resources/sql/schema.sql b/libs/dnet-is-common/src/main/resources/sql/schema.sql index a4f66ba8..2952dc26 100644 --- a/libs/dnet-is-common/src/main/resources/sql/schema.sql +++ b/libs/dnet-is-common/src/main/resources/sql/schema.sql @@ -280,18 +280,6 @@ CREATE TABLE workflow_instances ( user_params jsonb NOT NULL DEFAULT '{}' ); --- TO DELETE -CREATE TABLE workflow_expected_params ( - wf_id text REFERENCES resource(id), - name text, - description text, - type text, - required boolean, - default_value text, - PRIMARY KEY (wf_id, name) -); --- END - CREATE TABLE workflow_subscriptions ( wf_instance_id text NOT NULL REFERENCES workflow_instances(id), condition text NOT NULL, diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java index 3711a3e1..0eec4f58 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java @@ -122,7 +122,9 @@ public class ScheduledWorkflowLauncher { } private boolean isNotRunning(final WorkflowInstance instance) { - for (final WorkflowProcess p : processRegistry.findProcsByOtherId(instance.getId())) { + final WorkflowProcess p = processRegistry.findProcsByInstanceId(instance.getId()); + + if (p != null) { switch (p.getStatus()) { case CREATED: return false; @@ -132,6 +134,7 @@ public class ScheduledWorkflowLauncher { break; } } + return true; } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java index 3a42b8e4..6af1f94d 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java @@ -1,17 +1,10 @@ package eu.dnetlib.manager.wf.workflows.graph; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; @@ -19,48 +12,41 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.model.GraphArc; -import eu.dnetlib.manager.wf.model.GraphParameter; -import eu.dnetlib.manager.wf.model.GraphParameterValue; +import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.workflows.util.NodeHelper; @Service public class GraphLoader { - private static final Log log = LogFactory.getLog(GraphLoader.class); - - private final String regExRef = "\\$\\{(\\w*)\\}"; - - final Pattern pattern = Pattern.compile(regExRef, Pattern.MULTILINE); - @Autowired private NodeHelper nodeHelper; @Autowired - private Environment env; + private Environment environment; - public Graph loadGraph(final List workflowGraph, final Map globalParams) + public Graph loadGraph(final WorkflowGraph workflowGraph, final Map globalParams) throws WorkflowManagerException { final Graph graph = new Graph(); - for (final eu.dnetlib.manager.wf.model.GraphNode node : workflowGraph) { + for (final WorkflowGraph.Node node : workflowGraph.getGraph()) { final String nodeName = node.getName(); final String nodeType = node.getType(); final boolean isStart = node.isStart(); final boolean isJoin = node.isJoin(); - final Map params = calculateParamsForNode(node, globalParams); + final Map params = node.calculateInitialParams(globalParams, environment); + final Map envParams = node.findEnvParams(); if (isStart) { - graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params)); + graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params, envParams)); } else if (isJoin) { - graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params)); + graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params, envParams)); } else { - graph.addNode(GraphNode.newNode(nodeName, nodeType, params)); + graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams)); } - if (graph.getArcs() != null) { - for (final GraphArc a : node.getArcs()) { + if (node.getArcs() != null) { + for (final WorkflowGraph.Arc a : node.getArcs()) { final String arcName = a.getName(); final String to = a.getTo(); graph.addArc(new Arc(StringUtils.isNotBlank(arcName) ? arcName : Arc.DEFAULT_ARC, nodeName, to)); @@ -75,75 +61,6 @@ public class GraphLoader { return graph; } - public Map calculateParamsForNode(final eu.dnetlib.manager.wf.model.GraphNode node, final Map globalParams) { - - final Map params = new HashMap<>(); - - if (node.getParams() != null) { - for (final GraphParameter p : node.getParams()) { - - final String pName = p.getName(); - - final GraphNodeParameter pValue = calculateSimpleValue(p, globalParams); - - if (pValue != null) { - params.put(pName, pValue); - } else if (p.getMap() != null) { - - final Map map = p.getMap() - .entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> { - final GraphNodeParameter gnp = calculateSimpleValue(e.getValue(), globalParams); - if (gnp == null) { - final String msg = String.format("missing value for param: \"%s\"", e.getKey()); - log.debug(msg); - return GraphNodeParameter.newNullParam(); - } - return gnp; - })); - params.put(pName, GraphNodeParameter.newMapParam(map)); - } else if (p.getValues() != null) { - final List list = p.getValues() - .stream() - .map(e -> calculateSimpleValue(e, globalParams)) - .collect(Collectors.toList()); - params.put(pName, GraphNodeParameter.newListParam(list)); - } - } - } - - return params; - } - - private GraphNodeParameter calculateSimpleValue(final GraphParameterValue graphValue, final Map globalParams) { - String value = graphValue.getValue(); - final String ref = graphValue.getRef(); - final String prop = graphValue.getProperty(); - final String envRef = graphValue.getEnv(); - - if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) { - return GraphNodeParameter.newSimpleParam(globalParams.get(ref)); - } else if (StringUtils.isNotBlank(envRef)) { - return GraphNodeParameter.newEnvParam(envRef); - } else if (StringUtils.isNotBlank(value)) { - final Matcher matcher = pattern.matcher(value); - while (matcher.find()) { - final String rName = matcher.group(1); - final String rValue = globalParams.get(rName); - if (StringUtils.isBlank(rValue)) { return null; } - value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue); - System.out.println("NEW VALUE " + value); - } - return GraphNodeParameter.newSimpleParam(value); - } else if (StringUtils.isNotBlank(prop)) { - return GraphNodeParameter.newSimpleParam(env.getProperty(prop)); - } else { - return null; - } - - } - private void checkValidity(final Graph graph) throws WorkflowManagerException { final Set nodesFromArcs = new HashSet<>(); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java index db9c5c67..9f3429e2 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java @@ -2,9 +2,7 @@ package eu.dnetlib.manager.wf.workflows.graph; import java.io.StringWriter; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -19,42 +17,48 @@ public class GraphNode { private final boolean isStart; private final boolean isJoin; private final boolean isSucessNode; - private final Map nodeParams; + private final Map params; + private final Map envParams; private GraphNode(final String name, final String type, final boolean isStart, final boolean isJoin, final boolean isSuccessNode, - final Map nodeParams) { + final Map params, + final Map envParams) { this.name = name; this.type = type; this.isStart = isStart; this.isJoin = isJoin; this.isSucessNode = isSuccessNode; - this.nodeParams = nodeParams; + this.params = params; + this.envParams = envParams; } public static GraphNode newNode(final String name, final String type, - final Map nodeParams) { - return new GraphNode(name, type, false, false, false, nodeParams); + final Map params, + final Map envParams) { + return new GraphNode(name, type, false, false, false, params, envParams); } public static GraphNode newStartNode(final String name, final String type, - final Map nodeParams) { - return new GraphNode(name, type, true, false, false, nodeParams); + final Map params, + final Map envParams) { + return new GraphNode(name, type, true, false, false, params, envParams); } public static GraphNode newJoinNode(final String name, final String type, - final Map nodeParams) { - return new GraphNode(name, type, false, true, false, nodeParams); + final Map params, + final Map envParams) { + return new GraphNode(name, type, false, true, false, params, envParams); } public static GraphNode newSuccessNode() { - return new GraphNode(SUCCESS_NODE, null, false, true, true, null); + return new GraphNode(SUCCESS_NODE, null, false, true, true, new HashMap<>(), new HashMap<>()); } public String getName() { @@ -96,54 +100,30 @@ public class GraphNode { return sw.toString(); } - public Map getNodeParams() { - return this.nodeParams; + public Map getParams() { + return this.params; + } + + public Map getEnvParams() { + return this.envParams; } public Map resolveParamsWithNoEnv() { return resolveParams(null); } - @SuppressWarnings("unchecked") public Map resolveParams(final Env env) { - final Map params = new HashMap<>(); + final Map map = new HashMap<>(); - if (this.nodeParams != null) { - - for (final Map.Entry e : this.nodeParams.entrySet()) { - final String pName = e.getKey(); - final GraphNodeParameter param = e.getValue(); - - if (param.isEnvParam()) { - params.put(pName, resolveFromEnv(param, env)); - } else if (param.isMap()) { - - final Map map = new HashMap<>(); - - for (final Map.Entry e1 : ((Map) param.getValue()).entrySet()) { - map.put(e1.getKey(), e1.getValue().isEnvParam() ? resolveFromEnv(e1.getValue(), env) : e1.getValue().getValue()); - } - - params.put(pName, map); - - } else if (param.isList()) { - params.put(pName, ((List) param.getValue()) - .stream() - .map(p -> p.isEnvParam() ? resolveFromEnv(p, env) : p.getValue()) - .collect(Collectors.toList())); - - } else { - params.put(pName, param.getValue()); - } - - } + if (this.params != null) { + this.params.forEach((k, v) -> map.put(k, v)); } - return params; - } + if (this.envParams != null && env != null) { + this.envParams.forEach((k, v) -> map.put(k, env.getAttribute(v))); + } - private Object resolveFromEnv(final GraphNodeParameter param, final Env env) { - return env != null ? env.getAttribute(param.getEnvVariable()) : "[this value will be resolved using the runtime ENV]"; + return map; } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java deleted file mode 100644 index bbe054eb..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java +++ /dev/null @@ -1,56 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.graph; - -import java.util.List; -import java.util.Map; - -public class GraphNodeParameter { - - private final Object value; - private final String envVariable; - - private GraphNodeParameter(final Object value, final String envVariable) { - this.value = value; - this.envVariable = envVariable; - } - - public static GraphNodeParameter newNullParam() { - return new GraphNodeParameter(null, null); - } - - public static GraphNodeParameter newSimpleParam(final Object value) { - return new GraphNodeParameter(value, null); - } - - public static GraphNodeParameter newMapParam(final Map map) { - return new GraphNodeParameter(map, null); - } - - public static GraphNodeParameter newListParam(final List list) { - return new GraphNodeParameter(list, null); - } - - public static GraphNodeParameter newEnvParam(final String envVariable) { - return new GraphNodeParameter(null, envVariable); - } - - public Object getValue() { - return this.value; - } - - public boolean isEnvParam() { - return this.envVariable != null; - } - - public String getEnvVariable() { - return this.envVariable; - } - - public boolean isMap() { - return this.value != null && (this.value instanceof Map); - } - - public boolean isList() { - return this.value != null && (this.value instanceof List); - } - -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java index 2aae0a94..c7c07ace 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java @@ -3,7 +3,6 @@ package eu.dnetlib.manager.wf.workflows.procs; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -12,8 +11,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.model.GraphNode; +import eu.dnetlib.is.model.resource.SimpleResource; +import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @@ -29,30 +30,19 @@ public class ProcessFactory { @Autowired private GraphLoader graphLoader; - public WorkflowProcess newProcess(final String wfId, - final String wfName, - final String wfFamily, - final List graph, - final WorkflowInstance instance, + public WorkflowProcess newProcess(final SimpleResource wfMetadata, + final WorkflowGraph wfGraph, + final WorkflowInstance wfInstance, final ProcessCallback processCallback, final String parent) throws WorkflowManagerException { final Map globalParams = new HashMap<>(); - globalParams.putAll(instance.getSystemParams()); - globalParams.putAll(instance.getUserParams()); + globalParams.putAll(wfInstance.getSystemParams()); + globalParams.putAll(wfInstance.getUserParams()); - return new WorkflowProcess(generateProcessId(), - wfName, - wfFamily, - instance.getDsId(), - instance.getDsName(), - instance.getApiId(), - graphLoader.loadGraph(graph, globalParams), - instance.getPriority(), - wfId, - instance.getId(), - globalParams, - processCallback, parent); + final Graph graph = graphLoader.loadGraph(wfGraph, globalParams); + + return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, processCallback, parent); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java index 45533072..fd1180b6 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java @@ -1,11 +1,10 @@ package eu.dnetlib.manager.wf.workflows.procs; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Set; +import java.util.Optional; import java.util.concurrent.PriorityBlockingQueue; import org.apache.commons.logging.Log; @@ -13,18 +12,16 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; - import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; @Service public class ProcessRegistry { private static final Log log = LogFactory.getLog(ProcessRegistry.class); - private final BiMap procs = HashBiMap.create(); - private final Map> byOtherId = new HashMap<>(); + private final Map procs = new HashMap<>(); + private final Map byInstanceId = new HashMap<>(); private final PriorityBlockingQueue pendingProcs = new PriorityBlockingQueue<>(); @@ -46,18 +43,15 @@ public class ProcessRegistry { return this.procs.get(procId); } - public Set listProcesses() { + public Collection listProcesses() { return this.procs.values(); } - public Collection findProcsByOtherId(final String id) { - synchronized (this) { - final Collection res = this.byOtherId.get(id); - return res != null ? res : new ArrayList<>(); - } + public WorkflowProcess findProcsByInstanceId(final String id) { + return this.byInstanceId.get(id); } - public String registerProcess(final WorkflowProcess process, final String... ids) throws WorkflowManagerException { + public String registerProcess(final WorkflowProcess process, final WorkflowInstance wfInstance) throws WorkflowManagerException { if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) { log.error("Already registerd process: " + process); throw new WorkflowManagerException("Already registerd process: " + process); @@ -68,14 +62,7 @@ public class ProcessRegistry { } this.procs.put(process.getId(), process); - for (final String id : ids) { - synchronized (this) { - if (!this.byOtherId.containsKey(id)) { - this.byOtherId.put(id, new ArrayList()); - } - this.byOtherId.get(id).add(process); - } - } + this.byInstanceId.put(wfInstance.getId(), process); synchronized (this.pendingProcs) { if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) { @@ -116,8 +103,13 @@ public class ProcessRegistry { synchronized (this) { final WorkflowProcess process = this.procs.remove(procId); if (process != null) { - for (final Collection processes : this.byOtherId.values()) { - processes.remove(process); + final Optional instanceId = this.byInstanceId.entrySet() + .stream() + .filter(e -> e.getValue().getId().equals(process.getId())) + .map(e -> e.getKey()) + .findFirst(); + if (instanceId.isPresent()) { + this.byInstanceId.remove(instanceId, process); } } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java index cee2a1c7..e83add35 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java @@ -1,8 +1,7 @@ package eu.dnetlib.manager.wf.workflows.procs; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -14,7 +13,6 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dsm.DsmService; @@ -22,7 +20,7 @@ import eu.dnetlib.errors.DsmException; import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.is.model.resource.SimpleResource; import eu.dnetlib.is.resource.repository.SimpleResourceRepository; -import eu.dnetlib.manager.wf.model.GraphNode; +import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @@ -111,38 +109,34 @@ public class WorkflowExecutor implements Stoppable { return startWorkflowInstance(instance, processCallback, parent); } - public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent) + public String startWorkflowInstance(final WorkflowInstance wfInstance, final ProcessCallback processCallback, final String parent) throws WorkflowManagerException { - if (!instance.isEnabled() || !instance.isConfigured()) { - log.warn("Wf instance " + instance.getId() + " is not ready to start"); - throw new WorkflowManagerException("Wf instance " + instance.getId() + " is not ready to start"); + if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) { + log.warn("Wf instance " + wfInstance.getId() + " is not ready to start"); + throw new WorkflowManagerException("Wf instance " + wfInstance.getId() + " is not ready to start"); } - final SimpleResource wf = simpleResourceRepository - .findById(instance.getWorkflow()) + final SimpleResource wfMetadata = simpleResourceRepository + .findById(wfInstance.getWorkflow()) .filter(r -> r.getType().equals("workflows")) - .orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow())); + .orElseThrow(() -> new WorkflowManagerException("WF not found: " + wfInstance.getWorkflow())); - final List graph = simpleResourceRepository.findContentById(wf.getId()) + final WorkflowGraph wfGraph = simpleResourceRepository.findContentById(wfMetadata.getId()) .map(s -> { - final ObjectMapper mapper = new ObjectMapper(); - final TypeReference> type = new TypeReference>() {}; - final List list = new ArrayList<>(); try { - list.addAll(mapper.readValue(s, type)); + return new ObjectMapper().readValue(s, WorkflowGraph.class); } catch (final Exception e) { - log.error("Error parsing json", e); + return (WorkflowGraph) null; } - return list; }) - .filter(list -> !list.isEmpty()) - .orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wf.getId())); + .filter(Objects::nonNull) + .orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId())); final WorkflowProcess process = - processFactory.newProcess(wf.getId(), wf.getName(), wf.getSubtype(), graph, instance, processCallback, parent); + processFactory.newProcess(wfMetadata, wfGraph, wfInstance, processCallback, parent); - return processRegistry.registerProcess(process, instance.getId()); + return processRegistry.registerProcess(process, wfInstance); } @Override diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java index 9a479806..a62bc6bd 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java @@ -8,6 +8,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.lang3.math.NumberUtils; +import eu.dnetlib.is.model.resource.SimpleResource; +import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.graph.Graph; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @@ -16,104 +18,96 @@ import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; */ public class WorkflowProcess implements Comparable { + public enum Status { + CREATED, + EXECUTING, + SUCCESS, + FAILURE, + KILLED; + } + + public enum StartMode { + AUTO, + MANUAL, + DISABLED + } + private final String id; - private final String name; - private final String family; - private String dsId; - private String dsName; - private String dsInterface; + private final SimpleResource wfMetadata; + private final WorkflowInstance wfInstance; private final Graph graph; private final ProcessCallback callback; - private final int priority; private final Env env; private final List tokens = new CopyOnWriteArrayList<>(); private LocalDateTime lastActivityDate; private Status status; private LocalDateTime startDate = LocalDateTime.MIN; private LocalDateTime endDate = LocalDateTime.MIN; - private final String wfId; - private final String wfInstanceId; - private Map> pausedJoinNodeTokens = new HashMap<>(); - private Map globalParams; + private final Map> pausedJoinNodeTokens = new HashMap<>(); + private final Map globalParams; private String error; private String errorStacktrace; - private Map outputParams = new HashMap<>(); - private String parentProfileId; + private final Map outputParams = new HashMap<>(); + private final String parentProfileId; public WorkflowProcess( final String id, - final String name, - final String family, - final String dsId, - final String dsName, - final String dsInterface, + final SimpleResource wfMetadata, + final WorkflowInstance wfInstance, final Graph graph, - final int priority, - final String wfId, - final String wfInstanceId, final Map globalParams, final ProcessCallback callback, final String parentProfileId) { this.id = id; - this.name = name; - this.family = family; - this.dsId = dsId; - this.dsName = dsName; - this.dsInterface = dsInterface; + this.wfMetadata = wfMetadata; + this.wfInstance = wfInstance; this.graph = graph; - this.priority = priority; this.callback = callback; this.status = Status.CREATED; this.env = new Env(); - this.wfId = wfId; - this.wfInstanceId = wfInstanceId; this.globalParams = globalParams; this.lastActivityDate = LocalDateTime.now(); this.parentProfileId = parentProfileId; } - public void setParentProfileId(final String parentProfileId) { - this.parentProfileId = parentProfileId; - } - - public String getDsId() { - return dsId; - } - - public void setDsId(final String dsId) { - this.dsId = dsId; - } - - public String getDsName() { - return dsName; - } - - public void setDsName(final String dsName) { - this.dsName = dsName; - } - - public String getDsInterface() { - return dsInterface; - } - - public void setDsInterface(final String dsInterface) { - this.dsInterface = dsInterface; - } - - public Map> getPausedJoinNodeTokens() { - return pausedJoinNodeTokens; - } - - public void setPausedJoinNodeTokens(final Map> pausedJoinNodeTokens) { - this.pausedJoinNodeTokens = pausedJoinNodeTokens; - } - public String getId() { return id; } public String getName() { - return name; + return wfMetadata.getName(); + } + + public String getFamily() { + return wfMetadata.getSubtype(); + } + + public String getWfId() { + return wfMetadata.getId(); + } + + public String getWfInstanceId() { + return wfInstance.getId(); + } + + public int getPriority() { + return wfInstance.getPriority(); + } + + public String getDsId() { + return wfInstance.getId(); + } + + public String getDsName() { + return wfInstance.getDsName(); + } + + public String getDsInterface() { + return wfInstance.getApiId(); + } + + public Map> getPausedJoinNodeTokens() { + return pausedJoinNodeTokens; } public Env getEnv() { @@ -144,10 +138,6 @@ public class WorkflowProcess implements Comparable { return callback; } - public int getPriority() { - return priority; - } - public boolean isTerminated() { switch (status) { case SUCCESS: @@ -169,7 +159,7 @@ public class WorkflowProcess implements Comparable { @Override public String toString() { - return String.format("[process id='%s' name='%s']", id, name); + return String.format("[process id='%s' name='%s']", id, wfMetadata.getName()); } @Override @@ -181,22 +171,6 @@ public class WorkflowProcess implements Comparable { return globalParams; } - public void setGlobalParams(final Map globalParams) { - this.globalParams = globalParams; - } - - public String getFamily() { - return family; - } - - public String getWfId() { - return wfId; - } - - public String getWfInstanceId() { - return wfInstanceId; - } - public void setStartDate(final LocalDateTime startDate) { this.startDate = startDate; } @@ -213,20 +187,6 @@ public class WorkflowProcess implements Comparable { return endDate; } - public enum Status { - CREATED, - EXECUTING, - SUCCESS, - FAILURE, - KILLED; - } - - public enum StartMode { - AUTO, - MANUAL, - DISABLED - } - public String getError() { return error; } @@ -243,10 +203,6 @@ public class WorkflowProcess implements Comparable { this.errorStacktrace = errorStacktrace; } - public void setOutputParams(final Map outputParams) { - this.outputParams = outputParams; - } - public Map getOutputParams() { return outputParams; }