simple wf execution

This commit is contained in:
Michele Artini 2023-04-14 12:48:33 +02:00
parent c4608e68be
commit f4399909a6
8 changed files with 36 additions and 31 deletions

View File

@ -45,7 +45,7 @@ public class WorkflowLogger {
} }
public Optional<WfHistoryEntry> getLastLogForConfiguration(final String id) { public Optional<WfHistoryEntry> getLastLogForConfiguration(final String id) {
return wfHistoryEntryRepository.findOneByWfConfigurationIdOrderByEndDateAsc(id); return wfHistoryEntryRepository.findFirstByWfConfigurationIdOrderByEndDateDesc(id);
} }
} }

View File

@ -12,5 +12,5 @@ public interface WfHistoryEntryRepository extends JpaRepository<WfHistoryEntry,
List<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end); List<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end);
Optional<WfHistoryEntry> findOneByWfConfigurationIdOrderByEndDateAsc(String id); Optional<WfHistoryEntry> findFirstByWfConfigurationIdOrderByEndDateDesc(String id);
} }

View File

@ -123,9 +123,9 @@ public class ScheduledWorkflowLauncher {
if (p != null) { if (p != null) {
switch (p.getStatus()) { switch (p.getStatus()) {
case CREATED: case created:
return false; return false;
case EXECUTING: case running:
return false; return false;
default: default:
break; break;

View File

@ -29,8 +29,8 @@ public class EmailSender {
wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> { wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> {
if (s.getCondition() == NotificationCondition.ALWAYS || if (s.getCondition() == NotificationCondition.ALWAYS ||
s.getCondition() == NotificationCondition.ONLY_FAILED && proc.getStatus() == Status.FAILURE || s.getCondition() == NotificationCondition.ONLY_FAILED && proc.getStatus() == Status.failure ||
s.getCondition() == NotificationCondition.ONLY_SUCCESS && proc.getStatus() == Status.SUCCESS) { s.getCondition() == NotificationCondition.ONLY_SUCCESS && proc.getStatus() == Status.success) {
try { try {
final Map<String, Object> params = new HashMap<>(); final Map<String, Object> params = new HashMap<>();

View File

@ -1,6 +1,6 @@
package eu.dnetlib.manager.wf.workflows.graph; package eu.dnetlib.manager.wf.workflows.graph;
import com.google.common.base.Function; import java.util.function.Function;
import eu.dnetlib.manager.wf.workflows.procs.Env; import eu.dnetlib.manager.wf.workflows.procs.Env;
@ -34,7 +34,7 @@ public class Arc {
@Override @Override
public String toString() { public String toString() {
return String.format("[ %s -> %s, %s ]", this.from, this.to, this.condFunction != null ? "with cond" : "without cond"); return String.format("[ %s -> %s ]", this.from, this.to);
} }
} }

View File

@ -3,6 +3,7 @@ package eu.dnetlib.manager.wf.workflows.graph;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -13,7 +14,6 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.common.base.Function;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.errors.WorkflowManagerException;
@ -68,14 +68,18 @@ public class GraphLoader {
} }
private Function<Env, Boolean> generateFunction(final String condition) { private Function<Env, Boolean> generateFunction(final String condition) {
return env -> { if (StringUtils.isBlank(condition)) {
final ExpressionParser parser = new SpelExpressionParser(); return (env) -> true;
} else {
return env -> {
final ExpressionParser parser = new SpelExpressionParser();
final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes()); final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes());
context.addPropertyAccessor(new MapAccessor()); context.addPropertyAccessor(new MapAccessor());
return parser.parseExpression(condition).getValue(context, Boolean.class); return parser.parseExpression(condition).getValue(context, Boolean.class);
}; };
}
} }
private void checkValidity(final Graph graph) throws WorkflowManagerException { private void checkValidity(final Graph graph) throws WorkflowManagerException {

View File

@ -39,7 +39,7 @@ public class ProcessEngine {
log.info("Starting workflow: " + process); log.info("Starting workflow: " + process);
final LocalDateTime now = LocalDateTime.now(); final LocalDateTime now = LocalDateTime.now();
process.setStatus(WorkflowProcess.Status.EXECUTING); process.setStatus(WorkflowProcess.Status.running);
process.setStartDate(now); process.setStartDate(now);
process.setLastActivityDate(now); process.setLastActivityDate(now);
@ -54,12 +54,12 @@ public class ProcessEngine {
} }
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e); log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(WorkflowProcess.Status.FAILURE); process.setStatus(WorkflowProcess.Status.failure);
} }
} }
private ExecutionCallback<Token> newNodeCallback(final WorkflowProcess process, final GraphNode node) { private ExecutionCallback<Token> newNodeCallback(final WorkflowProcess process, final GraphNode node) {
return new ExecutionCallback<Token>() { return new ExecutionCallback<>() {
@Override @Override
public void onSuccess(final Token t) { public void onSuccess(final Token t) {
@ -113,7 +113,7 @@ public class ProcessEngine {
} }
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e); log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(WorkflowProcess.Status.FAILURE); process.setStatus(WorkflowProcess.Status.failure);
process.setError(e.getMessage()); process.setError(e.getMessage());
process.setErrorStacktrace(Throwables.getStackTraceAsString(e)); process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
process.setLastActivityDate(LocalDateTime.now()); process.setLastActivityDate(LocalDateTime.now());

View File

@ -22,11 +22,11 @@ import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
public class WorkflowProcess implements Comparable<WorkflowProcess> { public class WorkflowProcess implements Comparable<WorkflowProcess> {
public enum Status { public enum Status {
CREATED, created,
EXECUTING, running,
SUCCESS, success,
FAILURE, failure,
KILLED; killed;
} }
public enum StartMode { public enum StartMode {
@ -64,7 +64,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
this.wfConf = wfConf; this.wfConf = wfConf;
this.graph = graph; this.graph = graph;
this.callback = callback; this.callback = callback;
this.status = Status.CREATED; this.status = Status.created;
this.env = new Env(); this.env = new Env();
this.globalParams = globalParams; this.globalParams = globalParams;
this.lastActivityDate = LocalDateTime.now(); this.lastActivityDate = LocalDateTime.now();
@ -135,14 +135,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
} }
public void kill() { public void kill() {
setStatus(Status.KILLED); setStatus(Status.killed);
} }
public boolean isTerminated() { public boolean isTerminated() {
switch (status) { switch (status) {
case SUCCESS: case success:
case FAILURE: case failure:
case KILLED: case killed:
return true; return true;
default: default:
return false; return false;
@ -211,10 +211,10 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
final LocalDateTime now = token.getEndDate(); final LocalDateTime now = token.getEndDate();
setLastActivityDate(now); setLastActivityDate(now);
setEndDate(now); setEndDate(now);
setStatus(token.isFailed() ? WorkflowProcess.Status.FAILURE : WorkflowProcess.Status.SUCCESS); setStatus(token.isFailed() ? WorkflowProcess.Status.failure : WorkflowProcess.Status.success);
if (token.isFailed()) { if (token.isFailed()) {
setStatus(Status.FAILURE); setStatus(Status.failure);
setError(token.getError()); setError(token.getError());
setErrorStacktrace(token.getErrorStackTrace()); setErrorStacktrace(token.getErrorStackTrace());
setLastActivityDate(LocalDateTime.now()); setLastActivityDate(LocalDateTime.now());
@ -244,6 +244,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
final WfHistoryEntry pe = new WfHistoryEntry(); final WfHistoryEntry pe = new WfHistoryEntry();
pe.setProcessId(getId()); pe.setProcessId(getId());
pe.setWfConfigurationId(getWfConfId());
pe.setName(getName()); pe.setName(getName());
pe.setFamily(getFamily()); pe.setFamily(getFamily());