scheduling in wf executor

This commit is contained in:
Michele Artini 2023-12-07 13:06:16 +01:00
parent 2672c9be71
commit 8ddc88074d
3 changed files with 114 additions and 103 deletions

View File

@ -30,6 +30,8 @@ public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry,
List<WfJournalEntry> findByStatus(JobStatus status);
List<WfJournalEntry> findByStatusAndWfExecutor(JobStatus status, String wfExecutor);
Optional<WfJournalEntry> findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(String id);
@Modifying

View File

@ -1,6 +1,8 @@
package eu.dnetlib.wfs.nodes.launch;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@ -8,7 +10,10 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
@ -39,6 +44,8 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
// I obtain the bean from appContext to avoid a circular reference at startup
final WfExecutorService wfExecutorService = appContext.getBean(WfExecutorService.class);
final String procId = WfProcessUtils.generateProcessId();
final WfConfiguration conf = new WfConfiguration();
conf.setId("CHILD_" + UUID.randomUUID());
conf.setParentId(process.getWfConfId());
@ -57,9 +64,27 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
conf.setSystemParams(process.getGlobalParams());
conf.setUserParams(new HashMap<>());
final String procId = WfProcessUtils.generateProcessId();
final WfJournalEntry job = new WfJournalEntry();
job.setProcessId(procId);
job.setWfConfigurationId(null);
job.setName(process.getName() + ":" + getNodeName());
job.setFamily(process.getFamily());
wfExecutorService.startWorkflow(procId, wfId, conf, new ProcessCallback() {
job.setWfTemplateId(wfId);
job.setDsId(process.getDsId());
job.setDsName(process.getDsName());
job.setDsApi(process.getDsInterface());
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
job.setLastUpdate(LocalDateTime.now());
wfExecutorService.startWorkflowJob(job, conf, new ProcessCallback() {
@Override
public void onSuccess(final WorkflowProcess t) {

View File

@ -3,7 +3,6 @@ package eu.dnetlib.wfs.service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -70,119 +69,104 @@ public class WfExecutorService implements Stoppable {
private int maxSize;
@Transactional
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
public synchronized void catchNewWorkflows() throws Exception {
@Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public synchronized void acceptNewWorkflows() throws Exception {
if (isPaused() || (processRegistry.countRunningWfs() >= maxSize)) {
if (log.isDebugEnabled()) {
log.debug("Checking pending wfs - SKIP");
}
log.debug("Checking pending wfs - SKIP");
return;
}
final ServiceStatus status = ServiceStatusRegistry.getStatus();
if ((status != null) && StringUtils.isNotBlank(status.getName())) {
final List<WfJournalEntry> jobs = tryAssignments(status.getName());
wfJournalEntryRepository
.findByStatus(JobStatus.created)
.forEach(job -> wfJournalEntryRepository.tryAssegnment(job.getProcessId(), status.getName(), JobStatus.accepted, LocalDateTime.now()));
for (final WfJournalEntry job : jobs) {
try {
startWorkflowJob(job);
} catch (final Throwable e) {
final Map<String, String> details = new LinkedHashMap<>();
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, e.getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(e));
job.setDetails(details);
job.setStatus(JobStatus.failure);
job.setEndDate(LocalDateTime.now());
wfJournalEntryRepository.save(job);
}
}
}
@Transactional
@Scheduled(initialDelay = 20, fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public synchronized void startAcceptedWorkflows() throws Exception {
if (isPaused() || (processRegistry.countRunningWfs() >= maxSize)) {
log.debug("Checking accepted wfs - SKIP");
return;
}
final ServiceStatus status = ServiceStatusRegistry.getStatus();
if ((status != null) && StringUtils.isNotBlank(status.getName())) {
wfJournalEntryRepository.findByStatusAndWfExecutor(JobStatus.accepted, status.getName())
.forEach(job -> startWorkflowJob(job, null, null));
}
}
public void startWorkflowJob(final WfJournalEntry job, final WfConfiguration passedConf, final ProcessCallback callback) {
try {
log.debug("Starting job: " + job.getProcessId());
final WfConfiguration conf;
if (passedConf != null) {
conf = passedConf;
} else if (StringUtils.isNotBlank(job.getWfConfigurationId())) {
conf = wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
} else {
conf = new WfConfiguration();
conf.setId(job.getFamily() + "::" + UUID.randomUUID());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(job.getDsId());
conf.setDsName(job.getDsName());
conf.setApiId(job.getDsApi());
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(job.getWfTemplateId());
conf.setDestroyWf(null);
conf.setSystemParams(new HashMap<>());
conf.setUserParams(new HashMap<>());
}
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResourceClient simpleResourceClient = clientFactory.getClient(SimpleResourceClient.class);
final SimpleResource wfMetadata = simpleResourceClient.findResource(job.getWfTemplateId());
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(job.getWfTemplateId(), WfTemplate.class);
final WorkflowProcess process = prepareNewProcess(job.getProcessId(), wfMetadata, wfTmpl, conf, callback);
processEngine.startProcess(process);
job.setStatus(JobStatus.running);
} catch (final Throwable e) {
final Map<String, String> details = new LinkedHashMap<>();
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, e.getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(e));
job.setDetails(details);
job.setStatus(JobStatus.failure);
job.setEndDate(LocalDateTime.now());
} finally {
job.setLastUpdate(LocalDateTime.now());
wfJournalEntryRepository.save(job);
}
}
@Transactional
private List<WfJournalEntry> tryAssignments(final String executor) {
log.debug("Checking pending wfs");
return wfJournalEntryRepository.findByStatus(JobStatus.created)
.stream()
.filter(job -> trySingleAssignment(job.getProcessId()))
.toList();
}
@Transactional
private boolean trySingleAssignment(final String processId) {
if (ServiceStatusRegistry.getStatus() == null) { return false; }
final String executor = ServiceStatusRegistry.getStatus().getName();
if (StringUtils.isBlank(executor)) { return false; }
wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
return wfJournalEntryRepository.findById(processId)
.map(WfJournalEntry::getWfExecutor)
.filter(s -> executor.equals(s))
.isPresent();
}
private void startWorkflowJob(final WfJournalEntry job) throws WorkflowManagerException {
final WfConfiguration conf;
log.debug("Starting job: " + job.getProcessId());
if (StringUtils.isNotBlank(job.getWfConfigurationId())) {
conf = wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
} else {
conf = new WfConfiguration();
conf.setId(job.getFamily() + "::" + UUID.randomUUID());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(job.getDsId());
conf.setDsName(job.getDsName());
conf.setApiId(job.getDsApi());
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(job.getWfTemplateId());
conf.setDestroyWf(null);
conf.setSystemParams(new HashMap<>());
conf.setUserParams(new HashMap<>());
}
startWorkflow(job.getProcessId(), job.getWfTemplateId(), conf, null);
job.setStatus(JobStatus.running);
wfJournalEntryRepository.save(job);
}
public void startWorkflow(final String processId,
final String wfId,
final WfConfiguration conf,
final ProcessCallback callback)
throws WorkflowManagerException {
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResourceClient simpleResourceClient = clientFactory.getClient(SimpleResourceClient.class);
final SimpleResource wfMetadata = simpleResourceClient.findResource(wfId);
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(wfId, WfTemplate.class);
final WorkflowProcess process = prepareNewProcess(processId, wfMetadata, wfTmpl, conf, callback);
processEngine.startProcess(process);
}
private WorkflowProcess prepareNewProcess(final String processId,
final SimpleResource wfMetadata,
final WfTemplate wfTemplate,