refactoring

This commit is contained in:
Michele Artini 2024-01-10 10:27:58 +01:00
parent 2422d4fe35
commit 0ef83b9900
6 changed files with 68 additions and 119 deletions

View File

@ -54,7 +54,7 @@ CREATE VIEW wf_journal_view AS SELECT
coalesce(r.ds_name, h.ds_name) AS ds_name,
coalesce(r.ds_api, h.ds_api) AS ds_api,
coalesce(r.details, h.details) AS details
FROM wf_runtime r JOIN wf_history h ON (r.process_id = h.process_id);
FROM wf_runtime r FULL OUTER JOIN wf_history h ON (r.process_id = h.process_id);
-- Workflows

View File

@ -69,7 +69,6 @@ public abstract class ProcessNode implements BeanNameAware {
}
} catch (final Throwable e) {
token.releaseAsFailed(e);
this.engine.updateRunningJob(this.process, token);
this.engine.completeProcess(this.process, token);
}
}

View File

@ -32,9 +32,6 @@ public class LaunchWorkflowJobNode extends ProcessNode {
@WfInputParam
private String wfId;
@WfInputParam(optional = true)
private String wfConfId;
@WfInputParam(optional = true)
private Datasource ds;
@ -57,7 +54,7 @@ public class LaunchWorkflowJobNode extends ProcessNode {
final WfRunningJob job = new WfRunningJob();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setPriority(process.getPriority());
job.setPriority(process.getJobDetails().getPriority());
job.setStartDate(null);
job.setLastUpdate(LocalDateTime.now());
job.setEndDate(null);
@ -86,7 +83,7 @@ public class LaunchWorkflowJobNode extends ProcessNode {
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [conf: " + this.wfConfId + "] is starting with procId: " + job.getProcessId());
log.debug("The child workflow [conf: " + process.getJobDetails().getWfConfId() + "] is starting with procId: " + job.getProcessId());
}
token.setProgressMessage("Launched sub workflow, proc: " + job.getProcessId());

View File

@ -203,9 +203,10 @@ public class ProcessEngine {
log.debug(process.getGraph());
final LocalDateTime now = LocalDateTime.now();
process.setStatus(JobStatus.running);
process.setStartDate(now);
process.setLastActivityDate(now);
process.getJobDetails().setStatus(JobStatus.running);
process.getJobDetails().setStartDate(now);
updateRunningJob(process, null);
try {
for (final GraphNode graphNode : process.getGraph().startNodes()) {
@ -214,12 +215,13 @@ public class ProcessEngine {
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(JobStatus.failure);
process.setError(e);
updateRunningJob(process, null);
}
}
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
process.setLastActivityDate(LocalDateTime.now());
updateRunningJob(process, oldToken);
try {
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken)) {
@ -246,20 +248,15 @@ public class ProcessEngine {
executeNode(process, graphNode, token);
}
}
} catch (
final Throwable e) {
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(JobStatus.failure);
process.setError(e.getMessage());
process.setErrorStacktrace(ExceptionUtils.getStackTrace(e));
process.setLastActivityDate(LocalDateTime.now());
process.setError(e);
updateRunningJob(process, oldToken);
}
}
private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException {
process.setLastActivityDate(LocalDateTime.now());
final Map<String, Object> params = new HashMap<>();
if (graphNode.getParams() != null) {
@ -310,6 +307,7 @@ public class ProcessEngine {
process.complete(token);
updateRunningJob(process, token);
saveHistoryJob(process);
final String wfConfId = (String) process.getGlobalParams().get("wfConfId");
@ -319,30 +317,25 @@ public class ProcessEngine {
}
private void saveHistoryJob(final WorkflowProcess process) {
final String wfConfId = (String) process.getGlobalParams().get("wfConfId");
final String dsId = (String) process.getGlobalParams().get("dsId");
final String dsName = (String) process.getGlobalParams().get("dsName");
final String apiId = (String) process.getGlobalParams().get("apiId");
log.debug("Process completed " + process.getId());
final WfHistoryJob historyJob = new WfHistoryJob();
// TODO (HIGH PRIORITY): fill the following values
historyJob.setProcessId(process.getId());
historyJob.setName(process.getName());
historyJob.setFamily(process.getFamily());
historyJob.setWfConfId(wfConfId);
historyJob.setDsId(dsId);
historyJob.setDsName(dsName);
historyJob.setApiId(apiId);
historyJob.setStartDate(process.getStartDate());
historyJob.setEndDate(process.getEndDate());
historyJob.setName(process.getJobDetails().getName());
historyJob.setFamily(process.getJobDetails().getFamily());
historyJob.setWfConfId(process.getJobDetails().getWfConfId());
historyJob.setDsId(process.getJobDetails().getDsId());
historyJob.setDsName(process.getJobDetails().getDsName());
historyJob.setApiId(process.getJobDetails().getApiId());
historyJob.setStartDate(process.getJobDetails().getStartDate());
historyJob.setEndDate(process.getJobDetails().getEndDate());
final Map<String, String> details = new LinkedHashMap<>();
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
historyJob.setStatus(JobStatus.failure);
} else {
historyJob.setStatus(JobStatus.success);
@ -354,9 +347,27 @@ public class ProcessEngine {
this.historyJobRepository.save(historyJob);
}
public void updateRunningJob(final WorkflowProcess process, final Token token) {
// TODO (HIGH PRIORITY): UPDATE THE DB
private void updateRunningJob(final WorkflowProcess process, final Token token) {
log.debug("UPDATING JOB USING TOKEN " + token.getId());
if (process.isTerminated()) {
final Map<String, String> details = new LinkedHashMap<>();
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
process.getJobDetails().setStatus(JobStatus.failure);
} else {
process.getJobDetails().setStatus(JobStatus.success);
}
} else {
process.getJobDetails().setStatus(JobStatus.running);
}
process.getJobDetails().setLastUpdate(LocalDateTime.now());
this.jobRepository.save(process.getJobDetails());
}
}

View File

@ -6,7 +6,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
@ -26,63 +25,36 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
private final String id;
private final String name;
private final String family;
private final int priority;
private final WfRunningJob jobDetails;
private Graph graph;
private ProcessCallback callback;
private List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate;
private JobStatus status;
private LocalDateTime startDate = LocalDateTime.MIN;
private LocalDateTime endDate = LocalDateTime.MIN;
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
private final Map<String, Object> globalParams = new HashMap<>();
private final Map<String, String> outputParams = new HashMap<>();
private String error;
private String errorStacktrace;
private Throwable error;
public WorkflowProcess(final WfRunningJob job) {
this.id = job.getProcessId();
this.name = job.getName();
this.family = job.getFamily();
this.priority = job.getPriority();
this.status = JobStatus.created;
this.lastActivityDate = LocalDateTime.now();
public WorkflowProcess(final WfRunningJob jobDetails) {
this.id = jobDetails.getProcessId();
this.jobDetails = jobDetails;
}
public String getId() {
return this.id;
}
public String getName() {
return this.name;
}
public String getFamily() {
return this.family;
}
public int getPriority() {
return this.priority;
public WfRunningJob getJobDetails() {
return this.jobDetails;
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
return this.pausedJoinNodeTokens;
}
public JobStatus getStatus() {
return this.status;
}
public void setStatus(final JobStatus status) {
this.status = status;
}
public Graph getGraph() {
return this.graph;
}
@ -92,70 +64,41 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public void kill() {
setStatus(JobStatus.killed);
// TODO (MEDIUM PRIORITY)
this.jobDetails.setStatus(JobStatus.killed);
}
public boolean isTerminated() {
return switch (this.status) {
return switch (this.jobDetails.getStatus()) {
case success, failure, killed -> true;
default -> false;
};
}
public LocalDateTime getLastActivityDate() {
return this.lastActivityDate;
}
public void setLastActivityDate(final LocalDateTime lastActivityDate) {
this.lastActivityDate = lastActivityDate;
}
@Override
public String toString() {
return String.format("[process id='%s']", this.id);
}
@Override
public int compareTo(final WorkflowProcess wp) {
return NumberUtils.compare(getPriority(), wp.getPriority());
public int compareTo(final WorkflowProcess other) {
final int n1 = this.jobDetails.getPriority();
final int n2 = other.jobDetails.getPriority();
return NumberUtils.compare(n1, n2);
}
public Map<String, Object> getGlobalParams() {
return this.globalParams;
}
public void setStartDate(final LocalDateTime startDate) {
this.startDate = startDate;
}
public void setEndDate(final LocalDateTime endDate) {
this.endDate = endDate;
}
public LocalDateTime getStartDate() {
return this.startDate;
}
public LocalDateTime getEndDate() {
return this.endDate;
}
public String getError() {
public Throwable getError() {
return this.error;
}
public void setError(final String error) {
public void setError(final Throwable error) {
this.error = error;
}
public String getErrorStacktrace() {
return this.errorStacktrace;
}
public void setErrorStacktrace(final String errorStacktrace) {
this.errorStacktrace = errorStacktrace;
}
public Map<String, String> getOutputParams() {
return this.outputParams;
}
@ -169,15 +112,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
public void complete(final Token token) {
final LocalDateTime now = token.getEndDate();
setLastActivityDate(now);
setEndDate(now);
setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
this.jobDetails.setLastUpdate(now);
this.jobDetails.setEndDate(now);
this.jobDetails.setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
if (token.isFailed()) {
setStatus(JobStatus.failure);
setError(token.getError().getMessage());
setErrorStacktrace(ExceptionUtils.getStackTrace(token.getError()));
setLastActivityDate(LocalDateTime.now());
setError(token.getError());
}
if (this.callback != null) {

View File

@ -37,8 +37,8 @@ public class EmailSender {
this.wfSubscriptionRepository.findByWfConfigurationId(wfConfId).forEach(s -> {
if ((s.getCondition() == NotificationCondition.ALWAYS) ||
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getStatus() == JobStatus.failure)) ||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getStatus() == JobStatus.success))) {
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getJobDetails().getStatus() == JobStatus.failure)) ||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getJobDetails().getStatus() == JobStatus.success))) {
try {
final Map<String, Object> params = new HashMap<>();