diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java similarity index 87% rename from libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java rename to libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java index 5ec1447f..b5f4ac58 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArc.java @@ -2,7 +2,7 @@ package eu.dnetlib.manager.wf.model; import java.io.Serializable; -public class GraphArcDbEntry implements Serializable { +public class GraphArc implements Serializable { private static final long serialVersionUID = 7866138976929522262L; diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java similarity index 71% rename from libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java rename to libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java index ca1e0f91..c506c8f1 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNode.java @@ -1,8 +1,9 @@ package eu.dnetlib.manager.wf.model; import java.io.Serializable; +import java.util.List; -public class GraphNodeDbEntry implements Serializable { +public class GraphNode implements Serializable { private static final long serialVersionUID = -3695762832959801906L; @@ -10,8 +11,8 @@ public class GraphNodeDbEntry implements Serializable { private String type; private boolean isStart = false; private boolean isJoin = false; - private GraphArcDbEntry[] arcs; - private GraphParameterDbEntry[] params; + private List arcs; + private List params; public String getName() { return name; @@ -45,19 +46,19 @@ public class GraphNodeDbEntry implements Serializable { this.isJoin = isJoin; } - public GraphArcDbEntry[] getArcs() { + public List getArcs() { return arcs; } - public void setArcs(final GraphArcDbEntry[] arcs) { + public void setArcs(final List arcs) { this.arcs = arcs; } - public GraphParameterDbEntry[] getParams() { + public List getParams() { return params; } - public void setParams(final GraphParameterDbEntry[] params) { + public void setParams(final List params) { this.params = params; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java new file mode 100644 index 00000000..f24b12c4 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameter.java @@ -0,0 +1,39 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class GraphParameter extends GraphParameterValue implements Serializable { + + private static final long serialVersionUID = 1894419948433994453L; + + private String name; + private List values; + private Map map; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public List getValues() { + return values; + } + + public void setValues(final List values) { + this.values = values; + } + + public Map getMap() { + return map; + } + + public void setMap(final Map map) { + this.map = map; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java deleted file mode 100644 index 6edcf0f7..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java +++ /dev/null @@ -1,39 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -public class GraphParameterDbEntry extends GraphParameterValueDbEntry implements Serializable { - - private static final long serialVersionUID = 1894419948433994453L; - - private String name; - private List values; - private Map map; - - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; - } - - public List getValues() { - return values; - } - - public void setValues(final List values) { - this.values = values; - } - - public Map getMap() { - return map; - } - - public void setMap(final Map map) { - this.map = map; - } - -} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java similarity index 91% rename from libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java rename to libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java index c04a6e30..ee8267f9 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValue.java @@ -2,7 +2,7 @@ package eu.dnetlib.manager.wf.model; import java.io.Serializable; -public class GraphParameterValueDbEntry implements Serializable { +public class GraphParameterValue implements Serializable { private static final long serialVersionUID = 7815785723401725707L; diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java deleted file mode 100644 index f991bec7..00000000 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java +++ /dev/null @@ -1,83 +0,0 @@ -package eu.dnetlib.manager.wf.model; - -import java.io.Serializable; - -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Table; - -import org.hibernate.annotations.Type; -import org.hibernate.annotations.TypeDef; -import org.hibernate.annotations.TypeDefs; - -import com.vladmihalcea.hibernate.type.json.JsonBinaryType; -import com.vladmihalcea.hibernate.type.json.JsonStringType; - -@Entity -@Table(name = "workflows") -@TypeDefs({ - @TypeDef(name = "json", typeClass = JsonStringType.class), - @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class) -}) -public class WorkflowDbEntry implements Serializable { - - private static final long serialVersionUID = -4684952453322136556L; - - @Id - @Column(name = "id") - private String id; - - @Column(name = "name") - private String name; - - @Column(name = "family") - private String family; - - @Column(name = "description") - private String description; - - @Type(type = "jsonb") - @Column(name = "data", columnDefinition = "jsonb") - private GraphNodeDbEntry[] graph; - - public String getId() { - return id; - } - - public void setId(final String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; - } - - public String getFamily() { - return family; - } - - public void setFamily(final String family) { - this.family = family; - } - - public String getDescription() { - return description; - } - - public void setDescription(final String description) { - this.description = description; - } - - public GraphNodeDbEntry[] getGraph() { - return graph; - } - - public void setGraph(final GraphNodeDbEntry[] graph) { - this.graph = graph; - } -} diff --git a/libs/dnet-is-common/src/main/resources/sql/schema.sql b/libs/dnet-is-common/src/main/resources/sql/schema.sql index 58989441..a4f66ba8 100644 --- a/libs/dnet-is-common/src/main/resources/sql/schema.sql +++ b/libs/dnet-is-common/src/main/resources/sql/schema.sql @@ -131,7 +131,8 @@ INSERT INTO resource_types(id, name, content_type) VALUES ('transformation_rule_legacy', 'Transformation Rules (legacy)', 'text/plain'), ('cleaning_rule', 'Cleaning Rules', 'application/xml'), ('hadoop_job_configuration', 'Hadoop Job Configurations', 'application/xml') - ('dedup_configuration', 'Dedup Configurations', 'application/json'); + ('dedup_configuration', 'Dedup Configurations', 'application/json') + ('workflow', 'Workflows', 'application/json'); CREATE TABLE resources ( id text PRIMARY KEY, @@ -261,14 +262,6 @@ CREATE TABLE emails ( -- Workflows -CREATE TABLE workflows ( - id text PRIMARY KEY, - name text NOT NULL, - family text NOT NULL, - description text, - data jsonb NOT NULL -); - CREATE TABLE workflow_instances ( id text PRIMARY KEY, details jsonb NOT NULL DEFAULT '{}', @@ -281,14 +274,15 @@ CREATE TABLE workflow_instances ( scheduling_enabled boolean NOT NULL DEFAULT false, scheduling_cron text, scheduling_min_interval int, - workflow text REFERENCES workflows(id), - destroy_wf text REFERENCES workflows(id), + workflow text REFERENCES resource(id), + destroy_wf text REFERENCES resource(id), system_params jsonb NOT NULL DEFAULT '{}', user_params jsonb NOT NULL DEFAULT '{}' ); +-- TO DELETE CREATE TABLE workflow_expected_params ( - wf_id text REFERENCES workflows(id), + wf_id text REFERENCES resource(id), name text, description text, type text, @@ -296,6 +290,7 @@ CREATE TABLE workflow_expected_params ( default_value text, PRIMARY KEY (wf_id, name) ); +-- END CREATE TABLE workflow_subscriptions ( wf_instance_id text NOT NULL REFERENCES workflow_instances(id), diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java deleted file mode 100644 index 9e007e54..00000000 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java +++ /dev/null @@ -1,9 +0,0 @@ -package eu.dnetlib.manager.wf.repository; - -import org.springframework.data.jpa.repository.JpaRepository; - -import eu.dnetlib.manager.wf.model.WorkflowDbEntry; - -public interface WorkflowDbEntryRepository extends JpaRepository { - -} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java index 18f5d2d6..3a42b8e4 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java @@ -19,10 +19,9 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.model.GraphArcDbEntry; -import eu.dnetlib.manager.wf.model.GraphNodeDbEntry; -import eu.dnetlib.manager.wf.model.GraphParameterDbEntry; -import eu.dnetlib.manager.wf.model.GraphParameterValueDbEntry; +import eu.dnetlib.manager.wf.model.GraphArc; +import eu.dnetlib.manager.wf.model.GraphParameter; +import eu.dnetlib.manager.wf.model.GraphParameterValue; import eu.dnetlib.manager.wf.workflows.util.NodeHelper; @Service @@ -40,10 +39,11 @@ public class GraphLoader { @Autowired private Environment env; - public Graph loadGraph(final GraphNodeDbEntry[] workflowGraph, final Map globalParams) throws WorkflowManagerException { + public Graph loadGraph(final List workflowGraph, final Map globalParams) + throws WorkflowManagerException { final Graph graph = new Graph(); - for (final GraphNodeDbEntry node : workflowGraph) { + for (final eu.dnetlib.manager.wf.model.GraphNode node : workflowGraph) { final String nodeName = node.getName(); final String nodeType = node.getType(); final boolean isStart = node.isStart(); @@ -60,7 +60,7 @@ public class GraphLoader { } if (graph.getArcs() != null) { - for (final GraphArcDbEntry a : node.getArcs()) { + for (final GraphArc a : node.getArcs()) { final String arcName = a.getName(); final String to = a.getTo(); graph.addArc(new Arc(StringUtils.isNotBlank(arcName) ? arcName : Arc.DEFAULT_ARC, nodeName, to)); @@ -75,12 +75,12 @@ public class GraphLoader { return graph; } - public Map calculateParamsForNode(final GraphNodeDbEntry node, final Map globalParams) { + public Map calculateParamsForNode(final eu.dnetlib.manager.wf.model.GraphNode node, final Map globalParams) { final Map params = new HashMap<>(); if (node.getParams() != null) { - for (final GraphParameterDbEntry p : node.getParams()) { + for (final GraphParameter p : node.getParams()) { final String pName = p.getName(); @@ -116,7 +116,7 @@ public class GraphLoader { return params; } - private GraphNodeParameter calculateSimpleValue(final GraphParameterValueDbEntry graphValue, final Map globalParams) { + private GraphNodeParameter calculateSimpleValue(final GraphParameterValue graphValue, final Map globalParams) { String value = graphValue.getValue(); final String ref = graphValue.getRef(); final String prop = graphValue.getProperty(); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java index 8cf041e3..2aae0a94 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java @@ -3,6 +3,7 @@ package eu.dnetlib.manager.wf.workflows.procs; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -11,7 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.model.WorkflowDbEntry; +import eu.dnetlib.manager.wf.model.GraphNode; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; @@ -28,7 +29,10 @@ public class ProcessFactory { @Autowired private GraphLoader graphLoader; - public WorkflowProcess newProcess(final WorkflowDbEntry wf, + public WorkflowProcess newProcess(final String wfId, + final String wfName, + final String wfFamily, + final List graph, final WorkflowInstance instance, final ProcessCallback processCallback, final String parent) throws WorkflowManagerException { @@ -38,17 +42,16 @@ public class ProcessFactory { globalParams.putAll(instance.getUserParams()); return new WorkflowProcess(generateProcessId(), - wf.getName(), - wf.getFamily(), + wfName, + wfFamily, instance.getDsId(), instance.getDsName(), instance.getApiId(), - graphLoader.loadGraph(wf.getGraph(), globalParams), + graphLoader.loadGraph(graph, globalParams), instance.getPriority(), - wf.getId(), + wfId, instance.getId(), globalParams, - processCallback, parent); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java index 44ea7223..cee2a1c7 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java @@ -1,6 +1,8 @@ package eu.dnetlib.manager.wf.workflows.procs; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -12,12 +14,16 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dsm.DsmService; import eu.dnetlib.errors.DsmException; import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.model.WorkflowDbEntry; +import eu.dnetlib.is.model.resource.SimpleResource; +import eu.dnetlib.is.resource.repository.SimpleResourceRepository; +import eu.dnetlib.manager.wf.model.GraphNode; import eu.dnetlib.manager.wf.model.WorkflowInstance; -import eu.dnetlib.manager.wf.repository.WorkflowDbEntryRepository; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; @@ -39,7 +45,7 @@ public class WorkflowExecutor implements Stoppable { private DsmService dsmService; @Autowired - private WorkflowDbEntryRepository workflowDbEntryRepository; + private SimpleResourceRepository simpleResourceRepository; @Autowired private WorkflowInstanceRepository workflowInstanceRepository; @@ -107,16 +113,34 @@ public class WorkflowExecutor implements Stoppable { public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent) throws WorkflowManagerException { - final WorkflowDbEntry wf = workflowDbEntryRepository.findById(instance.getWorkflow()) - .orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow())); if (!instance.isEnabled() || !instance.isConfigured()) { - log.warn("Wf instance " + instance.getId() + " not launched, because it is not ready to start or it is disabled"); - throw new WorkflowManagerException("Workflow " + instance.getId() + " is not ready to start"); + log.warn("Wf instance " + instance.getId() + " is not ready to start"); + throw new WorkflowManagerException("Wf instance " + instance.getId() + " is not ready to start"); } + final SimpleResource wf = simpleResourceRepository + .findById(instance.getWorkflow()) + .filter(r -> r.getType().equals("workflows")) + .orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow())); + + final List graph = simpleResourceRepository.findContentById(wf.getId()) + .map(s -> { + final ObjectMapper mapper = new ObjectMapper(); + final TypeReference> type = new TypeReference>() {}; + final List list = new ArrayList<>(); + try { + list.addAll(mapper.readValue(s, type)); + } catch (final Exception e) { + log.error("Error parsing json", e); + } + return list; + }) + .filter(list -> !list.isEmpty()) + .orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wf.getId())); + final WorkflowProcess process = - processFactory.newProcess(wf, instance, processCallback, parent); + processFactory.newProcess(wf.getId(), wf.getName(), wf.getSubtype(), graph, instance, processCallback, parent); return processRegistry.registerProcess(process, instance.getId()); }