refactoring

This commit is contained in:
Michele Artini 2023-10-20 09:55:12 +02:00
parent c5210aa4d7
commit cbe0e8c5c5
8 changed files with 88 additions and 103 deletions

View File

@ -13,7 +13,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.common.KeyValue;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
@ -21,37 +20,30 @@ import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.domain.wfs.WorkflowSection;
import eu.dnetlib.domain.wfs.WorkflowSubscription;
import eu.dnetlib.wfs.manager.service.WorkflowManagerService;
import eu.dnetlib.wfs.service.WorkflowJobManager;
@RestController
@RequestMapping({ "/ajax", "/api" })
public class ApiController extends DnetRestController {
@Autowired
private WorkflowJobManager jobManager;
@Autowired
private WorkflowManagerService wfManagerService;
@Autowired
private DnetServiceClientFactory clientFactory;
@GetMapping("/history")
public List<WfHistoryEntry> history(
@RequestParam(required = true) final int total,
@RequestParam(required = false) final Long from,
@RequestParam(required = false) final Long to) {
return jobManager.history(total, from, to);
return wfManagerService.findHistory(total, from, to);
}
@GetMapping("/history/byConf/{wfConfId}")
public List<WfHistoryEntry> history(@PathVariable final String wfConfId) {
return jobManager.history(wfConfId);
return wfManagerService.findHistoryForConfiguration(wfConfId);
}
@GetMapping("/proc/{processId}")
public WfHistoryEntry getProcessExecution(@PathVariable final String processId) {
return jobManager.getLog(processId);
return wfManagerService.findProcessLog(processId);
}
@GetMapping("/sections")

View File

@ -11,32 +11,28 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.repository.WorkflowConfigurationRepository;
import eu.dnetlib.wfs.service.WorkflowJobManager;
@Service
public class ScheduledWorkflowLauncher {
private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private WorkflowConfigurationRepository workflowConfigurationRepository;
@Autowired
private WorkflowJobManager jobManager;
@Autowired
private WorkflowManagerService workflowManagerService;
@Value("${dnet.workflow.scheduler.windowSize:1800000}")
private int windowSize; // 1800000 are 30 minutes
@Autowired
private WfHistoryEntryRepository wfHistoryEntryRepository;
@Scheduled(fixedRateString = "${dnet.workflow.scheduler.fixedRate:900000}") // 900000 are 5 minutes
public void verifySheduledWorkflows() {
log.debug("Verifying scheduled workflows - START");
@ -94,7 +90,7 @@ public class ScheduledWorkflowLauncher {
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return jobManager.getLastLogForConfiguration(id)
return wfHistoryEntryRepository.findFirstByWfConfigurationIdOrderByEndDateDesc(id)
.map(WfHistoryEntry::getEndDate)
.orElse(LocalDateTime.MIN);
}

View File

@ -1,21 +1,71 @@
package eu.dnetlib.wfs.manager.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.TimeZone;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.service.AbstractWfService;
import eu.dnetlib.wfs.utils.WorkflowfProcessUtils;
@Service
public class WorkflowManagerService extends AbstractWfService {
private static final Log log = LogFactory.getLog(WorkflowManagerService.class);
@Autowired
private WfHistoryEntryRepository wfHistoryEntryRepository;
@Transactional
public WfHistoryEntry prepareNewJob(final WorkflowConfiguration conf, final boolean destroy) {
// TODO (HIGH PRIORITY): inserisce il job nella history usando WorkflowProcessUtils.generateProcessId()
return null;
final WfHistoryEntry job = new WfHistoryEntry();
job.setProcessId(WorkflowfProcessUtils.generateProcessId());
job.setWfConfigurationId(conf.getId());
job.setName(conf.getName());
job.setFamily(conf.getSection());
job.setWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
job.setDsId(conf.getDsId());
job.setDsName(conf.getDsName());
job.setDsApi(conf.getApiId());
job.setStatus("PENDING");
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(null);
return wfHistoryEntryRepository.save(job);
}
public List<WfHistoryEntry> findHistory(final int total, final Long from, final Long to) {
if ((from == null) && (to == null)) { return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); }
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(to), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MAX;
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
}
public List<WfHistoryEntry> findHistoryForConfiguration(final String wfConfId) {
return wfHistoryEntryRepository.findByWfConfigurationIdOrderByEndDateDesc(wfConfId);
}
public WfHistoryEntry findProcessLog(final String processId) {
return wfHistoryEntryRepository.findById(processId).get();
}
}

View File

@ -1,67 +0,0 @@
package eu.dnetlib.wfs.service;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
@Service
public class WorkflowJobManager {
@Autowired
private WfHistoryEntryRepository wfHistoryEntryRepository;
public List<WfHistoryEntry> history(final int total, final Long from, final Long to) {
if ((from == null) && (to == null)) { return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); }
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(to), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MAX;
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
}
public List<WfHistoryEntry> history(final String wfConfId) {
return wfHistoryEntryRepository.findByWfConfigurationIdOrderByEndDateDesc(wfConfId);
}
public WfHistoryEntry getLog(final String processId) {
return wfHistoryEntryRepository.findById(processId).get();
}
public void saveLog(final WfHistoryEntry pe) {
wfHistoryEntryRepository.save(pe);
}
public Optional<WfHistoryEntry> getLastLogForConfiguration(final String id) {
return wfHistoryEntryRepository.findFirstByWfConfigurationIdOrderByEndDateDesc(id);
}
public List<WfHistoryEntry> findPendingJobs() {
return wfHistoryEntryRepository.findPendingJobs();
}
@Transactional
public boolean tryAssegnment(final WfHistoryEntry job, final String workerId) {
final String id = job.getProcessId();
wfHistoryEntryRepository.tryAssegnment(id, workerId);
return wfHistoryEntryRepository.findById(id)
.map(WfHistoryEntry::getWfExecutor)
.filter(s -> workerId.equals(s))
.isPresent();
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.wfs.service;
package eu.dnetlib.wfs.utils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

View File

@ -15,9 +15,9 @@ import eu.dnetlib.wfs.procs.ProcessAware;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.service.WfExecutorService;
import eu.dnetlib.wfs.service.WorkflowfProcessUtils;
import eu.dnetlib.wfs.utils.NodeCallback;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WorkflowfProcessUtils;
@WfNode("launchWorkflow")
public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {

View File

@ -21,7 +21,7 @@ import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.service.WorkflowJobManager;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.NodeCallback;
@ -34,7 +34,7 @@ public class ProcessEngine {
private EmailSender emailSender;
@Autowired
private WorkflowJobManager jobManager;
private WfHistoryEntryRepository wfHistoryEntryRepository;
@Autowired
private ApplicationContext applicationContext;
@ -166,7 +166,7 @@ public class ProcessEngine {
private void completeProcess(final WorkflowProcess process, final Token token) {
token.checkStatus();
process.complete(token);
jobManager.saveLog(process.asLog());
wfHistoryEntryRepository.save(process.asLog());
emailSender.sendMails(process);
}

View File

@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
@ -25,6 +26,7 @@ import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.ProcessFactory;
import eu.dnetlib.wfs.procs.ProcessRegistry;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.utils.ProcessCallback;
@Service
@ -44,10 +46,10 @@ public class WfExecutorService extends AbstractWfService implements Stoppable {
@Autowired
private DnetServiceClientFactory clientFactory;
private boolean paused = false;
@Autowired
private WorkflowJobManager jobManager;
private WfHistoryEntryRepository wfHistoryEntryRepository;
private boolean paused = false;
@Value("${dnet.wf.procs.size:20}")
private int maxSize;
@ -58,17 +60,29 @@ public class WfExecutorService extends AbstractWfService implements Stoppable {
executorId = "wf-exec-" + UUID.randomUUID();
}
@Transactional
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
public synchronized void startWorkflow() throws Exception {
if (isPaused() || (processRegistry.countRunningWfs() >= maxSize)) {}
for (final WfHistoryEntry job : jobManager.findPendingJobs()) {
if ((processRegistry.countRunningWfs() < maxSize) && jobManager.tryAssegnment(job, executorId)) {
for (final WfHistoryEntry job : wfHistoryEntryRepository.findPendingJobs()) {
if ((processRegistry.countRunningWfs() < maxSize) && tryAssegnment(job)) {
startWorkflowJob(job);
}
}
}
private boolean tryAssegnment(final WfHistoryEntry job) {
final String id = job.getProcessId();
wfHistoryEntryRepository.tryAssegnment(id, executorId);
return wfHistoryEntryRepository.findById(id)
.map(WfHistoryEntry::getWfExecutor)
.filter(s -> executorId.equals(s))
.isPresent();
}
private void startWorkflowJob(final WfHistoryEntry job) throws WorkflowManagerException {
final WorkflowConfiguration conf;
@ -117,7 +131,7 @@ public class WfExecutorService extends AbstractWfService implements Stoppable {
processRegistry.registerProcess(process, conf);
jobManager.saveLog(process.asLog());
wfHistoryEntryRepository.save(process.asLog());
processEngine.startProcess(process);
}