refactoring

This commit is contained in:
Michele Artini 2024-01-05 15:38:24 +01:00
parent 5bc7e2e383
commit 912bf32ef6
4 changed files with 179 additions and 119 deletions

View File

@ -42,11 +42,14 @@ public class WfConfigurationUtils {
.filter(p -> StringUtils.isNotBlank(p.getDefaultValue()))
.forEach(p -> all.put(p.getName(), p.getDefaultValue()));
}
if (conf.getUserParams() != null) {
all.putAll(conf.getSystemParams());
}
if (conf.getSystemParams() != null) {
all.putAll(conf.getUserParams());
if (conf != null) {
if (conf.getUserParams() != null) {
all.putAll(conf.getSystemParams());
}
if (conf.getSystemParams() != null) {
all.putAll(conf.getUserParams());
}
}
if (log.isDebugEnabled()) {

View File

@ -33,7 +33,6 @@ import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.graph.Graph;
import eu.dnetlib.wfs.graph.GraphLoader;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
@ -109,14 +108,14 @@ public class ProcessEngine {
for (final PGNotification n : nts) {
final String processId = n.getParameter();
log.debug("NOTIFICATION RECEIVED: " + processId);
queue.add(processId);
this.queue.add(processId);
}
}
}
return 0;
};
jdbcTemplate.execute(action);
this.jdbcTemplate.execute(action);
}
private void consumeQueue() {
@ -130,7 +129,7 @@ public class ProcessEngine {
while (true) {
try {
tryToStartWf(queue.take(), ServiceStatusRegistry.getStatus().getName());
tryToStartWf(this.queue.take(), ServiceStatusRegistry.getStatus().getName());
} catch (final Throwable e) {
log.error("Error accepting new wfs", e);
}
@ -141,9 +140,9 @@ public class ProcessEngine {
private void tryToStartWf(final String processId, final String executor) {
log.debug("Trying to start " + processId);
wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
this.wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
final Optional<WfJournalEntry> job = wfJournalEntryRepository.findById(processId);
final Optional<WfJournalEntry> job = this.wfJournalEntryRepository.findById(processId);
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
startWorkflowJob(job.get(), null, null);
@ -155,59 +154,62 @@ public class ProcessEngine {
public void startWorkflowJob(final WfJournalEntry job, final WfConfiguration passedConf, final ProcessCallback callback) {
try {
log.debug("Configure process " + job.getProcessId() + " using wf template " + job.getWfTemplateId());
final WfConfiguration conf;
if (passedConf != null) {
conf = passedConf;
} else if (StringUtils.isNotBlank(job.getWfConfigurationId())) {
conf = wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
} else {
conf = new WfConfiguration();
conf.setId(null);
conf.setName(job.getFamily());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(job.getDsId());
conf.setDsName(job.getDsName());
conf.setApiId(job.getApiId());
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(job.getWfTemplateId());
conf.setDestroyWf(null);
conf.setSystemParams(new HashMap<>());
conf.setUserParams(new HashMap<>());
}
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResourceClient simpleResourceClient = clientFactory.getClient(SimpleResourceClient.class);
final SimpleResourceClient simpleResourceClient = this.clientFactory.getClient(SimpleResourceClient.class);
final SimpleResource wfMetadata = simpleResourceClient.findResource(job.getWfTemplateId());
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + job.getWfTemplateId()); }
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(job.getWfTemplateId(), WfTemplate.class);
final Map<String, Object> globalParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
final Map<String, Object> globalParams = new HashMap<>();
final Graph graph = graphLoader.loadGraph(wfTmpl, globalParams);
final WorkflowProcess process = new WorkflowProcess(job.getProcessId());
final WorkflowProcess process = new WorkflowProcess(job.getProcessId(), wfMetadata, conf, graph, globalParams, callback);
if ((passedConf == null) && StringUtils.isBlank(job.getWfConfigurationId())) {
process.setName(job.getFamily());
process.setWfConfId(null);
process.setParentId(null);
process.setPriority(100);
processRegistry.registerProcess(process, conf);
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null));
} else {
final WfConfiguration conf = passedConf != null ? passedConf
: this.wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
process.setName(conf.getName());
process.setWfConfId(conf.getId());
process.setParentId(conf.getParentId());
process.setPriority(conf.getPriority());
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf));
}
process.setWfId(job.getWfTemplateId());
process.setFamily(wfMetadata.getSubtype());
process.setDsId(job.getDsId());
process.setDsName(job.getDsName());
process.setApiId(job.getApiId());
process.getGlobalParams().putAll(globalParams);
process.setGraph(this.graphLoader.loadGraph(wfTmpl, globalParams));
process.setCallback(callback);
this.processRegistry.registerProcess(process);
job.setStatus(JobStatus.running);
job.setLastUpdate(LocalDateTime.now());
wfJournalEntryRepository.save(job);
this.wfJournalEntryRepository.save(job);
startProcess(process);
@ -220,12 +222,12 @@ public class ProcessEngine {
job.setEndDate(LocalDateTime.now());
job.setLastUpdate(LocalDateTime.now());
wfJournalEntryRepository.save(job);
this.wfJournalEntryRepository.save(job);
}
}
public final void killProcess(final String procId) {
processRegistry.findProcess(procId).kill();
this.processRegistry.findProcess(procId).kill();
}
public void startProcess(final WorkflowProcess process) {
@ -309,7 +311,7 @@ public class ProcessEngine {
private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {
if (graphNode.isSuccessNode()) { return new SuccessNode(); }
if (StringUtils.isBlank(graphNode.getType())) { return new DefaultJobNode(graphNode.getName()); }
final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType());
final ProcessNode pnode = (ProcessNode) this.applicationContext.getBean(graphNode.getType());
if (pnode == null) {
log.error("cannot find bean of type " + graphNode.getType());
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
@ -335,9 +337,9 @@ public class ProcessEngine {
log.debug("Process completed " + process.getId());
final WfJournalEntry job = process.asLog();
wfJournalEntryRepository.save(job);
this.wfJournalEntryRepository.save(job);
emailSender.sendMails(process);
this.emailSender.sendMails(process);
}
}

View File

@ -11,7 +11,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.errors.WorkflowManagerException;
@Service
@ -23,7 +22,7 @@ public class ProcessRegistry {
synchronized public int countRunningWfs() {
int count = 0;
for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
final WorkflowProcess proc = e.getValue();
if (!proc.isTerminated()) {
count++;
@ -33,29 +32,32 @@ public class ProcessRegistry {
}
public WorkflowProcess findProcess(final String procId) {
return procs.get(procId);
return this.procs.get(procId);
}
public Collection<WorkflowProcess> listProcesses() {
return procs.values();
return this.procs.values();
}
public WorkflowProcess findProcsByConfigurationId(final String id) {
return byConfId.get(id);
return this.byConfId.get(id);
}
public void registerProcess(final WorkflowProcess process, final WfConfiguration conf) throws WorkflowManagerException {
if (procs.containsValue(process) || procs.containsKey(process.getId())) {
public void registerProcess(final WorkflowProcess process) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registered process: " + process);
}
procs.put(process.getId(), process);
byConfId.put(conf.getId(), process);
this.procs.put(process.getId(), process);
if (process.getWfConfId() != null) {
this.byConfId.put(process.getWfConfId(), process);
}
}
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
public void removeTerminatedProcess() {
for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
final WorkflowProcess proc = e.getValue();
if (proc.isTerminated()) {
unregisterProcess(proc.getId());
@ -65,13 +67,13 @@ public class ProcessRegistry {
public void unregisterProcess(final String procId) {
synchronized (this) {
final WorkflowProcess process = procs.remove(procId);
final WorkflowProcess process = this.procs.remove(procId);
if (process != null) {
byConfId.entrySet()
this.byConfId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(Entry::getKey)
.forEach(confId -> byConfId.remove(confId, process));
.forEach(confId -> this.byConfId.remove(confId, process));
}
}
}

View File

@ -10,9 +10,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.wfs.graph.Graph;
@ -30,84 +28,87 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
private final String id;
private final SimpleResource wfMetadata;
private final WfConfiguration wfConf;
private final Graph graph;
private final ProcessCallback callback;
private final List<Token> tokens = new CopyOnWriteArrayList<>();
private String family;
private String wfId;
private String name;
private String wfConfId;
private String parentId;
private int priority = 100;
private String dsId;
private String dsName;
private String apiId;
private Graph graph;
private ProcessCallback callback;
private List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate;
private JobStatus status;
private LocalDateTime startDate = LocalDateTime.MIN;
private LocalDateTime endDate = LocalDateTime.MIN;
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
private final Map<String, Object> globalParams;
private String error;
private String errorStacktrace;
private final Map<String, Object> globalParams = new HashMap<>();
private final Map<String, String> outputParams = new HashMap<>();
public WorkflowProcess(
final String id,
final SimpleResource wfMetadata,
final WfConfiguration wfConf,
final Graph graph,
final Map<String, Object> globalParams,
final ProcessCallback callback) {
private String error;
private String errorStacktrace;
public WorkflowProcess(final String id) {
this.id = id;
this.wfMetadata = wfMetadata;
this.wfConf = wfConf;
this.graph = graph;
this.callback = callback;
status = JobStatus.created;
this.globalParams = globalParams;
lastActivityDate = LocalDateTime.now();
this.status = JobStatus.created;
this.lastActivityDate = LocalDateTime.now();
}
public String getId() {
return id;
return this.id;
}
public String getName() {
return wfConf.getName();
return this.name;
}
public String getFamily() {
return wfMetadata.getSubtype();
return this.family;
}
public String getWfId() {
return wfMetadata.getId();
return this.wfId;
}
public String getWfConfId() {
return wfConf.getId();
return this.wfConfId;
}
public String getParentId() {
return wfConf.getParentId();
return this.parentId;
}
public int getPriority() {
return wfConf.getPriority();
return this.priority;
}
public String getDsId() {
return wfConf.getDsId();
return this.dsId;
}
public String getDsName() {
return wfConf.getDsName();
return this.dsName;
}
public String getApiId() {
return wfConf.getApiId();
return this.apiId;
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
return pausedJoinNodeTokens;
return this.pausedJoinNodeTokens;
}
public JobStatus getStatus() {
return status;
return this.status;
}
public void setStatus(final JobStatus status) {
@ -115,11 +116,11 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public Graph getGraph() {
return graph;
return this.graph;
}
public List<Token> getTokens() {
return tokens;
return this.tokens;
}
public void kill() {
@ -127,14 +128,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public boolean isTerminated() {
return switch (status) {
return switch (this.status) {
case success, failure, killed -> true;
default -> false;
};
}
public LocalDateTime getLastActivityDate() {
return lastActivityDate;
return this.lastActivityDate;
}
public void setLastActivityDate(final LocalDateTime lastActivityDate) {
@ -143,7 +144,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
@Override
public String toString() {
return String.format("[process id='%s' name='%s']", id, wfMetadata.getName());
return String.format("[process id='%s' name='%s']", this.id, this.name);
}
@Override
@ -152,7 +153,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public Map<String, Object> getGlobalParams() {
return globalParams;
return this.globalParams;
}
public void setStartDate(final LocalDateTime startDate) {
@ -164,15 +165,15 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public LocalDateTime getStartDate() {
return startDate;
return this.startDate;
}
public LocalDateTime getEndDate() {
return endDate;
return this.endDate;
}
public String getError() {
return error;
return this.error;
}
public void setError(final String error) {
@ -180,7 +181,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public String getErrorStacktrace() {
return errorStacktrace;
return this.errorStacktrace;
}
public void setErrorStacktrace(final String errorStacktrace) {
@ -188,13 +189,13 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public Map<String, String> getOutputParams() {
return outputParams;
return this.outputParams;
}
public Token newToken(final Map<String, Object> attrs) {
final Token token = new Token();
token.addEnvAttributes(attrs);
tokens.add(token);
this.tokens.add(token);
return token;
}
@ -211,11 +212,11 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
setLastActivityDate(LocalDateTime.now());
}
if (callback != null) {
if (this.callback != null) {
if (token.isFailed()) {
callback.onFail(this, token.getError());
this.callback.onFail(this, token.getError());
} else {
callback.onSuccess(this);
this.callback.onSuccess(this);
}
}
@ -253,4 +254,56 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return pe;
}
public ProcessCallback getCallback() {
return this.callback;
}
public void setCallback(final ProcessCallback callback) {
this.callback = callback;
}
public void setFamily(final String family) {
this.family = family;
}
public void setWfId(final String wfId) {
this.wfId = wfId;
}
public void setName(final String name) {
this.name = name;
}
public void setWfConfId(final String wfConfId) {
this.wfConfId = wfConfId;
}
public void setParentId(final String parentId) {
this.parentId = parentId;
}
public void setPriority(final int priority) {
this.priority = priority;
}
public void setDsId(final String dsId) {
this.dsId = dsId;
}
public void setDsName(final String dsName) {
this.dsName = dsName;
}
public void setApiId(final String apiId) {
this.apiId = apiId;
}
public void setGraph(final Graph graph) {
this.graph = graph;
}
public void setTokens(final List<Token> tokens) {
this.tokens = tokens;
}
}