new-is-app-with-new-msro #13

Merged
michele.artini merged 19 commits from new-is-app-with-new-msro into new-is-app 2023-03-21 12:15:25 +01:00
8 changed files with 64 additions and 48 deletions
Showing only changes of commit 836c31b6d6 - Show all commits

View File

@ -6,7 +6,6 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
@ -25,6 +24,7 @@ import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
@ -76,8 +76,7 @@ public class WorkflowManagerService implements Stoppable {
public String startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
if (isPaused()) {
@ -105,7 +104,7 @@ public class WorkflowManagerService implements Stoppable {
instance.setSystemParams(new HashMap<>());
instance.setUserParams(new HashMap<>());
return startWorkflowInstance(instance, onSuccess, onFail);
return startWorkflowInstance(instance, callback);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
@ -113,8 +112,7 @@ public class WorkflowManagerService implements Stoppable {
public String startWorkflowInstance(final String wfInstanceId,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) throws Exception {
final ExecutionCallback<WorkflowProcess> callback) throws Exception {
if (isPaused()) {
log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown");
@ -123,12 +121,11 @@ public class WorkflowManagerService implements Stoppable {
final WorkflowInstance instance = findWorkflowInstance(wfInstanceId);
return startWorkflowInstance(instance, onSuccess, onFail);
return startWorkflowInstance(instance, callback);
}
public String startWorkflowInstance(final WorkflowInstance wfInstance,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) {
@ -153,7 +150,7 @@ public class WorkflowManagerService implements Stoppable {
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, onSuccess, onFail);
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, callback);
return processRegistry.registerProcess(process, wfInstance);
}

View File

@ -51,7 +51,7 @@ public class ScheduledWorkflowLauncher {
.filter(this::isReady)
.forEach(instance -> {
try {
wfManagerService.startWorkflowInstance(instance, null, null);
wfManagerService.startWorkflowInstance(instance, null);
} catch (final Exception e) {
log.error("Error launching scheduled wf instance: " + instance.getId(), e);
}

View File

@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@ -49,12 +50,20 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
instance.setSystemParams(process.getGlobalParams());
instance.setUserParams(new HashMap<>());
final String procId = wfManagerService.startWorkflowInstance(instance, (proc) -> {
log.debug("Child workflow has been completed successfully");
token.release();
}, (proc) -> {
log.error("Child workflow is failed");
token.releaseAsFailed("Child workflow is failed");
final String procId = wfManagerService.startWorkflowInstance(instance, new ExecutionCallback<WorkflowProcess>() {
@Override
public void onSuccess(final WorkflowProcess t) {
log.debug("Child workflow has been completed successfully");
token.release();
}
@Override
public void onFail(final WorkflowProcess t) {
log.error("Child workflow is failed");
token.releaseAsFailed("Child workflow is failed");
}
});
if (log.isDebugEnabled()) {

View File

@ -0,0 +1,9 @@
package eu.dnetlib.manager.wf.workflows.procs;
public interface ExecutionCallback<T> {
void onSuccess(T t);
void onFail(T t);
}

View File

@ -106,9 +106,18 @@ public class ProcessEngine {
}
private Token prepareNewToken(final WorkflowProcess process, final GraphNode node) {
return new Token(
token -> releaseToken(process, node, token),
token -> completeProcess(process, token));
return new Token(new ExecutionCallback<Token>() {
@Override
public void onSuccess(final Token t) {
releaseToken(process, node, t);
}
@Override
public void onFail(final Token t) {
completeProcess(process, t);
}
});
}
private Map<String, Object> mergeEnvParams(final Token... tokens) {

View File

@ -4,7 +4,6 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,8 +32,7 @@ public class ProcessFactory {
public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowGraph wfGraph,
final WorkflowInstance wfInstance,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) throws WorkflowManagerException {
final ExecutionCallback<WorkflowProcess> callback) throws WorkflowManagerException {
final Map<String, String> globalParams = new HashMap<>();
globalParams.putAll(wfInstance.getSystemParams());
@ -42,7 +40,7 @@ public class ProcessFactory {
final Graph graph = graphLoader.loadGraph(wfGraph, globalParams);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, onSuccess, onFail);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, callback);
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.manager.wf.workflows.procs;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
@ -18,8 +17,7 @@ public class Token {
private final LocalDateTime startDate;
private LocalDateTime endDate = LocalDateTime.MIN;
private final Consumer<Token> onSuccess;
private final Consumer<Token> onFail;
private final ExecutionCallback<Token> callback;
private boolean failed = false;
@ -27,11 +25,10 @@ public class Token {
private String error = "";
private String errorStackTrace = "";
public Token(final Consumer<Token> onSuccess, final Consumer<Token> onFail) {
public Token(final ExecutionCallback<Token> callback) {
this.id = "token-" + UUID.randomUUID();
this.startDate = LocalDateTime.now();
this.onSuccess = onSuccess;
this.onFail = onFail;
this.callback = callback;
}
public String getId() {
@ -73,8 +70,8 @@ public class Token {
public void release() {
setEndDate(LocalDateTime.now());
setActive(false);
if (this.onSuccess != null) {
this.onSuccess.accept(this);
if (this.callback != null) {
this.callback.onSuccess(this);
}
}
@ -84,8 +81,8 @@ public class Token {
setFailed(true);
setError(e.getMessage());
setErrorStackTrace(Throwables.getStackTraceAsString(e));
if (this.onFail != null) {
this.onFail.accept(this);
if (this.callback != null) {
this.callback.onFail(this);
}
}
@ -94,8 +91,8 @@ public class Token {
setActive(false);
setFailed(true);
setError(error);
if (this.onFail != null) {
this.onFail.accept(this);
if (this.callback != null) {
this.callback.onFail(this);
}
}

View File

@ -6,7 +6,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.apache.commons.lang3.math.NumberUtils;
@ -39,8 +38,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private final SimpleResource wfMetadata;
private final WorkflowInstance wfInstance;
private final Graph graph;
private final Consumer<WorkflowProcess> onSuccess;
private final Consumer<WorkflowProcess> onFail;
private final ExecutionCallback<WorkflowProcess> callback;
private final Env env;
private final List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate;
@ -59,14 +57,12 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
final WorkflowInstance wfInstance,
final Graph graph,
final Map<String, String> globalParams,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) {
final ExecutionCallback<WorkflowProcess> callback) {
this.id = id;
this.wfMetadata = wfMetadata;
this.wfInstance = wfInstance;
this.graph = graph;
this.onSuccess = onSuccess;
this.onFail = onFail;
this.callback = callback;
this.status = Status.CREATED;
this.env = new Env();
this.globalParams = globalParams;
@ -223,11 +219,12 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
setLastActivityDate(LocalDateTime.now());
}
if (token.isFailed() && onFail != null) {
onFail.accept(this);
}
if (!token.isFailed() && onSuccess != null) {
onSuccess.accept(this);;
if (callback != null) {
if (token.isFailed()) {
callback.onFail(this);
} else {
callback.onSuccess(this);;
}
}
}