refactoring

This commit is contained in:
Michele Artini 2023-03-08 10:10:03 +01:00
parent cd5e1ab957
commit 0782c2f536
6 changed files with 55 additions and 33 deletions

View File

@ -8,6 +8,7 @@ import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Transient;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
@ -75,6 +76,9 @@ public class WorkflowInstance implements Serializable {
@Column(name = "user_params", columnDefinition = "jsonb")
private Map<String, String> userParams = new LinkedHashMap<>();
@Transient
private String parentId;
public String getId() {
return id;
}
@ -194,4 +198,12 @@ public class WorkflowInstance implements Serializable {
public void setUserParams(final Map<String, String> userParams) {
this.userParams = userParams;
}
public String getParentId() {
return parentId;
}
public void setParentId(final String parentId) {
this.parentId = parentId;
}
}

View File

@ -51,7 +51,7 @@ public class ScheduledWorkflowLauncher {
.filter(this::isReady)
.forEach(instance -> {
try {
workflowExecutor.startWorkflowInstance(instance, null, null, null);
workflowExecutor.startWorkflowInstance(instance, null, null);
} catch (final Exception e) {
log.error("Error launching scheduled wf instance: " + instance.getId(), e);
}

View File

@ -1,11 +1,14 @@
package eu.dnetlib.manager.wf.nodes;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@ -17,21 +20,36 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
private static final Log log = LogFactory.getLog(LaunchWorkflowJobNode.class);
private String wfInstanceId;
private String wfId;
@Autowired
private WorkflowExecutor executor;
@Autowired
private ProcessRegistry processRegistry;
private WorkflowProcess process;
@Override
public final void execute(final Token token) {
try {
final String procId = executor.startWorkflowInstance(getWfInstanceId(), process.getWfInstanceId(), (proc) -> {
final WorkflowInstance instance = new WorkflowInstance();
instance.setId("CHILD_" + UUID.randomUUID());
instance.setParentId(process.getWfInstanceId());
instance.setDetails(new HashMap<>());
instance.setPriority(100);
instance.setDsId(process.getDsId());
instance.setDsName(process.getDsName());
instance.setApiId(process.getDsInterface());
instance.setEnabled(true);
instance.setConfigured(true);
instance.setSchedulingEnabled(false);
instance.setCronExpression("");
instance.setCronMinInterval(0);
instance.setWorkflow(wfId);
instance.setDestroyWf(null);
instance.setSystemParams(process.getGlobalParams());
instance.setUserParams(new HashMap<>());
final String procId = executor.startWorkflowInstance(instance, (proc) -> {
log.debug("Child workflow has been completed successfully");
token.release();
}, (proc) -> {
@ -40,30 +58,28 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [instance: " + getWfInstanceId() + "] is starting with procId: " + procId);
log.debug("The child workflow [instance: " + instance.getId() + "] is starting with procId: " + procId);
}
setProgressMessage("Launched sub workflow, proc: " + procId);
} catch (
final Throwable e) {
} catch (final Throwable e) {
log.error("got exception while launching child workflow", e);
token.releaseAsFailed(e);
}
}
public String getWfInstanceId() {
return wfInstanceId;
}
public void setWfInstanceId(final String wfInstanceId) {
this.wfInstanceId = wfInstanceId;
}
@Override
public void setProcess(final WorkflowProcess process) {
this.process = process;
}
public String getWfId() {
return wfId;
}
public void setWfId(final String wfId) {
this.wfId = wfId;
}
}

View File

@ -33,7 +33,6 @@ public class ProcessFactory {
public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowGraph wfGraph,
final WorkflowInstance wfInstance,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) throws WorkflowManagerException {
@ -43,7 +42,7 @@ public class ProcessFactory {
final Graph graph = graphLoader.loadGraph(wfGraph, globalParams);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, parent, onSuccess, onFail);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, onSuccess, onFail);
}

View File

@ -66,7 +66,6 @@ public class WorkflowExecutor implements Stoppable {
public String startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
throws WorkflowManagerException {
@ -96,7 +95,7 @@ public class WorkflowExecutor implements Stoppable {
instance.setSystemParams(new HashMap<>());
instance.setUserParams(new HashMap<>());
return startWorkflowInstance(instance, parent, onSuccess, onFail);
return startWorkflowInstance(instance, onSuccess, onFail);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
@ -114,11 +113,10 @@ public class WorkflowExecutor implements Stoppable {
final WorkflowInstance instance =
workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId));
return startWorkflowInstance(instance, parent, onSuccess, onFail);
return startWorkflowInstance(instance, onSuccess, onFail);
}
public String startWorkflowInstance(final WorkflowInstance wfInstance,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
throws WorkflowManagerException {
@ -145,7 +143,7 @@ public class WorkflowExecutor implements Stoppable {
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, parent, onSuccess, onFail);
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, onSuccess, onFail);
return processRegistry.registerProcess(process, wfInstance);
}

View File

@ -52,7 +52,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private String error;
private String errorStacktrace;
private final Map<String, String> outputParams = new HashMap<>();
private final String parentProfileId;
public WorkflowProcess(
final String id,
@ -60,7 +59,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
final WorkflowInstance wfInstance,
final Graph graph,
final Map<String, String> globalParams,
final String parentProfileId,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) {
this.id = id;
@ -73,7 +71,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
this.env = new Env();
this.globalParams = globalParams;
this.lastActivityDate = LocalDateTime.now();
this.parentProfileId = parentProfileId;
}
public String getId() {
@ -96,6 +93,10 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return wfInstance.getId();
}
public String getParentId() {
return wfInstance.getParentId();
}
public int getPriority() {
return wfInstance.getPriority();
}
@ -209,10 +210,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return outputParams;
}
public String getParentProfileId() {
return parentProfileId;
}
public void complete(final Token token) {
final LocalDateTime now = token.getEndDate();
setLastActivityDate(now);