diff --git a/apps/dnet-is-application/src/main/resources/application.properties b/apps/dnet-is-application/src/main/resources/application.properties index a2a29b36..923a4863 100644 --- a/apps/dnet-is-application/src/main/resources/application.properties +++ b/apps/dnet-is-application/src/main/resources/application.properties @@ -2,6 +2,7 @@ server.port=8280 server.public_url = server.public_desc = API Base URL +spring.thymeleaf.check-template-location=false dnet.configuration.infrastructure = LOCAL DEV diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java index b227f7df..07a14b85 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java @@ -1,6 +1,7 @@ package eu.dnetlib.manager.wf.model; import java.io.Serializable; +import java.util.Objects; public class WorkflowSubscriptionPK implements Serializable { @@ -36,4 +37,17 @@ public class WorkflowSubscriptionPK implements Serializable { this.email = email; } + @Override + public int hashCode() { + return Objects.hash(condition, email, wfConfigurationId); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { return true; } + if (obj == null) { return false; } + if (getClass() != obj.getClass()) { return false; } + final WorkflowSubscriptionPK other = (WorkflowSubscriptionPK) obj; + return condition == other.condition && Objects.equals(email, other.email) && Objects.equals(wfConfigurationId, other.wfConfigurationId); + } } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowTemplate.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowTemplate.java index 2da4404c..772cbea0 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowTemplate.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowTemplate.java @@ -2,7 +2,6 @@ package eu.dnetlib.manager.wf.model; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; @@ -205,17 +204,9 @@ public class WorkflowTemplate implements Serializable { } public Map calculateInitialParams(final Map globalParams, final Environment environment) { - final Map map = new HashMap<>(); - - System.out.println("****** " + input.size()); - input.forEach(System.out::println); - - input.stream() + return input.stream() .filter(p -> StringUtils.isBlank(p.getEnv())) - .forEach(p -> map.put(p.getName(), calculateSimpleValue(p, globalParams, environment))); - System.out.println("******"); - - return map; + .collect(Collectors.toMap(p -> p.getName(), p -> calculateSimpleValue(p, globalParams, environment))); } private Object calculateSimpleValue(final NodeParam p, final Map globalParams, final Environment environment) { @@ -232,7 +223,6 @@ public class WorkflowTemplate implements Serializable { final String rValue = globalParams.get(rName); if (StringUtils.isBlank(rValue)) { return null; } value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue); - System.out.println("NEW VALUE " + value); } return value; } else if (StringUtils.isNotBlank(prop)) { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java index 125c43c2..c38ce9d0 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java @@ -35,7 +35,6 @@ public abstract class ProcessNode implements BeanNameAware { } else { fieldValue(field, env.getAttribute(field.getName(), field.getType())); } - }); } @@ -47,12 +46,12 @@ public abstract class ProcessNode implements BeanNameAware { } else { env.setAttribute(field.getName(), fieldValue(field)); } - }); } private final void fieldValue(final Field field, final Object object) { try { + field.setAccessible(true); field.set(this, object); } catch (IllegalArgumentException | IllegalAccessException e) { log.error("Error setting field " + field.getName(), e); @@ -62,6 +61,7 @@ public abstract class ProcessNode implements BeanNameAware { private Object fieldValue(final Field field) { try { + field.setAccessible(true); return field.get(this); } catch (IllegalArgumentException | IllegalAccessException e) { log.error("Error getting field " + field.getName(), e); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/test/Test01Node.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/test/Test01Node.java new file mode 100644 index 00000000..a2dea432 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/test/Test01Node.java @@ -0,0 +1,35 @@ +package eu.dnetlib.manager.wf.nodes.test; + +import eu.dnetlib.manager.wf.annotations.WfInputParam; +import eu.dnetlib.manager.wf.annotations.WfNode; +import eu.dnetlib.manager.wf.annotations.WfOutputParam; +import eu.dnetlib.manager.wf.nodes.AbstractJobNode; + +@WfNode("test01") +public class Test01Node extends AbstractJobNode { + + @WfInputParam + private String message; + + @WfInputParam + private int times; + + @WfOutputParam + private String response; + + public Test01Node() { + super(false); + } + + @Override + protected void execute() { + System.out.println("************************"); + System.out.println("* Instance: " + toString()); + for (int i = 0; i < times; i++) { + System.out.println("* " + i + " - " + message); + } + System.out.println("************************"); + response = "I printed the message [" + message + "]"; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java index e181ea1f..d0bd623b 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java @@ -5,8 +5,13 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import javax.annotation.PostConstruct; + import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.context.expression.MapAccessor; import org.springframework.core.env.Environment; import org.springframework.expression.ExpressionParser; @@ -17,19 +22,43 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.model.WorkflowTemplate; +import eu.dnetlib.manager.wf.nodes.ProcessNode; import eu.dnetlib.manager.wf.workflows.procs.Env; -import eu.dnetlib.manager.wf.workflows.util.NodeHelper; @Service public class GraphLoader { - @Autowired - private NodeHelper nodeHelper; + private static final Log log = LogFactory.getLog(GraphLoader.class); @Autowired private Environment environment; + @Autowired + private ApplicationContext applicationContext; + + private final Set validTypes = new HashSet<>(); + + @PostConstruct + private void init() { + log.info("************************************"); + log.info("* Checking workflow nodes:"); + applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> { + if (ProcessNode.class.isAssignableFrom(bean.getClass())) { + log.info("* Type: " + key + " -> " + bean.getClass()); + validTypes.add(key); + } else { + log.warn("* Type: " + key + " -> " + bean.getClass() + "(ERROR: it is not a ProcessNode)"); + } + + }); + if (validTypes.isEmpty()) { + log.warn("* 0 nodes available"); + } + log.info("************************************"); + } + public Graph loadGraph(final WorkflowTemplate wfTemplate, final Map globalParams) throws WorkflowManagerException { final Graph graph = new Graph(); @@ -110,7 +139,7 @@ public class GraphLoader { if (n.isStart()) { foundStart = true; } - if (!this.nodeHelper.isValid(n)) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); } + if (n.getType() != null && !validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); } } if (!foundStart) { throw new WorkflowManagerException("Start node not found"); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java index d5776c8c..f621b045 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java @@ -7,32 +7,38 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.google.common.base.Throwables; +import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.manager.history.WorkflowLogger; +import eu.dnetlib.manager.wf.nodes.DefaultJobNode; import eu.dnetlib.manager.wf.nodes.ProcessNode; +import eu.dnetlib.manager.wf.nodes.SuccessNode; import eu.dnetlib.manager.wf.notification.EmailSender; import eu.dnetlib.manager.wf.workflows.graph.GraphNode; import eu.dnetlib.manager.wf.workflows.util.NodeCallback; -import eu.dnetlib.manager.wf.workflows.util.NodeHelper; @Service public class ProcessEngine { private static final Log log = LogFactory.getLog(ProcessEngine.class); - @Autowired - private NodeHelper nodeHelper; @Autowired private EmailSender emailSender; + @Autowired private WorkflowLogger wfLogger; + @Autowired + private ApplicationContext applicationContext; + public void startProcess(final WorkflowProcess process) { log.info(process.getGraph()); @@ -45,13 +51,8 @@ public class ProcessEngine { try { for (final GraphNode graphNode : process.getGraph().startNodes()) { - final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); - final Token token = new Token(); - token.getEnv().addAttributes(process.getEnv().getAttributes()); - process.getTokens().add(token); - - pNode.initInputParams(token.getEnv()); - pNode.execute(token, newNodeCallback(process, pNode, graphNode)); + final Token token = process.newToken(process.getEnv().getAttributes()); + executeNode(process, graphNode, token); } } catch (final Throwable e) { log.error("WorkflowProcess node instantiation failed", e); @@ -91,30 +92,22 @@ public class ProcessEngine { list.add(oldToken); if (list.size() == process.getGraph().getNumberOfIncomingArcs(graphNode)) { - final Token token = new Token(); - token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()]))); - final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); - - process.getTokens().add(token); - process.setLastActivityDate(LocalDateTime.now()); + final Token token = process.newToken(mergeEnvParams(list.toArray(new Token[list.size()]))); if (graphNode.isSucessNode()) { completeProcess(process, token); } else { - pNode.execute(token, newNodeCallback(process, pNode, graphNode)); + executeNode(process, graphNode, token); } } } else { - final Token token = new Token(); - token.getEnv().addAttributes(oldToken.getEnv().getAttributes()); - final ProcessNode pNode = nodeHelper.newProcessNode(graphNode, process); - - process.getTokens().add(token); - process.setLastActivityDate(LocalDateTime.now()); - pNode.execute(token, newNodeCallback(process, pNode, graphNode)); + final Token token = process.newToken(oldToken.getEnv().getAttributes()); + executeNode(process, graphNode, token); } } - } catch (final Throwable e) { + } catch ( + + final Throwable e) { log.error("WorkflowProcess node instantiation failed", e); process.setStatus(WorkflowProcess.Status.failure); process.setError(e.getMessage()); @@ -124,6 +117,39 @@ public class ProcessEngine { } + private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException { + process.setLastActivityDate(LocalDateTime.now()); + + final ProcessNode pNode = newProcessNode(graphNode, process); + + // TODO Is it correct? + token.getEnv().addAttributes(graphNode.resolveParams(token.getEnv())); + + pNode.initInputParams(token.getEnv()); + pNode.execute(token, newNodeCallback(process, pNode, graphNode)); + } + + private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException { + if (graphNode.isSucessNode()) { + return new SuccessNode(); + } else if (StringUtils.isBlank(graphNode.getType())) { + return new DefaultJobNode(graphNode.getName()); + } else { + final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType()); + if (pnode != null) { + pnode.setNodeName(graphNode.getName()); + if (pnode instanceof ProcessAware) { + ((ProcessAware) pnode).setProcess(process); + } + return pnode; + } else { + log.error("cannot find bean of type " + graphNode.getType()); + throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType()); + } + } + + } + private Map mergeEnvParams(final Token... tokens) { final Map map = new HashMap<>(); Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes())); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java index 3a260feb..68685ac5 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java @@ -207,6 +207,13 @@ public class WorkflowProcess implements Comparable { return outputParams; } + public Token newToken(final Map attrs) { + final Token token = new Token(); + token.getEnv().addAttributes(attrs); + tokens.add(token); + return token; + } + public void complete(final Token token) { final LocalDateTime now = token.getEndDate(); setLastActivityDate(now); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java deleted file mode 100644 index cfb92be4..00000000 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java +++ /dev/null @@ -1,104 +0,0 @@ -package eu.dnetlib.manager.wf.workflows.util; - -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; -import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; -import org.springframework.core.type.filter.AnnotationTypeFilter; -import org.springframework.stereotype.Component; - -import eu.dnetlib.errors.WorkflowManagerException; -import eu.dnetlib.manager.wf.annotations.WfNode; -import eu.dnetlib.manager.wf.nodes.DefaultJobNode; -import eu.dnetlib.manager.wf.nodes.ProcessNode; -import eu.dnetlib.manager.wf.nodes.SuccessNode; -import eu.dnetlib.manager.wf.workflows.graph.GraphNode; -import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; -import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; - -@Component -public class NodeHelper implements ApplicationContextAware { - - private static final Log log = LogFactory.getLog(NodeHelper.class); - - private ApplicationContext applicationContext; - - public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process) throws WorkflowManagerException { - if (node.isSucessNode()) { - return new SuccessNode(); - } else if (StringUtils.isBlank(node.getType())) { - return new DefaultJobNode(node.getName()); - } else { - try { - Class clazz; - - clazz = findClassForNodeType(node.getType()); - - final ProcessNode pnode = this.applicationContext.getBean(clazz); - - if (pnode != null) { - pnode.setNodeName(node.getName()); - if (pnode instanceof ProcessAware) { - ((ProcessAware) pnode).setProcess(process); - } - return pnode; - } else { - log.error("cannot find bean of type " + node.getType()); - throw new WorkflowManagerException("cannot find bean of type " + node.getType()); - } - } catch (final ClassNotFoundException e) { - throw new WorkflowManagerException("Class not found for node " + node.getType()); - } - } - } - - private Class findClassForNodeType(final String type) throws ClassNotFoundException { - final ClassPathScanningCandidateComponentProvider provider = - new ClassPathScanningCandidateComponentProvider(false); - - provider.addIncludeFilter(new AnnotationTypeFilter(WfNode.class)); - - final Set beanDefs = provider.findCandidateComponents("eu.dnetlib.wf.nodes"); - - for (final BeanDefinition bd : beanDefs) { - if (bd instanceof AnnotatedBeanDefinition) { - final Map annotAttributeMap = ((AnnotatedBeanDefinition) bd) - .getMetadata() - .getAnnotationAttributes(WfNode.class.getCanonicalName()); - - if (type.equals(annotAttributeMap.get("value"))) { - final Class clazz = Class.forName(bd.getBeanClassName()); - if (ProcessNode.class.isAssignableFrom(clazz)) { return (Class) Class.forName(bd.getBeanClassName()); } - } - } - } - - throw new ClassNotFoundException("Class not found for type " + type); - } - - public boolean isValid(final GraphNode n) { - if (n.getType() == null) { - return true; - } else { - try { - return findClassForNodeType(n.getType()) != null; - } catch (final ClassNotFoundException e) { - return false; - } - } - } - - @Override - public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - -}