refactoring

This commit is contained in:
Michele Artini 2023-10-19 09:05:36 +02:00
parent 0890350a8d
commit 9638978a1e
11 changed files with 36 additions and 20 deletions

View File

@ -10,6 +10,7 @@ BEGIN;
CREATE TABLE wf_history (
process_id text PRIMARY KEY,
wf_conf_id text NOT NULL,
wf_executor text,
name text NOT NULL,
family text NOT NULL,
status text NOT NULL,

View File

@ -23,14 +23,14 @@ import eu.dnetlib.domain.wfs.WorkflowSection;
import eu.dnetlib.domain.wfs.WorkflowSubscription;
import eu.dnetlib.wfs.common.ExecutionStatus;
import eu.dnetlib.wfs.manager.service.WorkflowManagerService;
import eu.dnetlib.wfs.service.WorkflowLogger;
import eu.dnetlib.wfs.service.WorkflowJobManager;
@RestController
@RequestMapping({ "/ajax", "/api" })
public class ApiController extends DnetRestController {
@Autowired
private WorkflowLogger logger;
private WorkflowJobManager jobManager;
@Autowired
private WorkflowManagerService wfManagerService;
@ -43,17 +43,17 @@ public class ApiController extends DnetRestController {
@RequestParam(required = true) final int total,
@RequestParam(required = false) final Long from,
@RequestParam(required = false) final Long to) {
return logger.history(total, from, to);
return jobManager.history(total, from, to);
}
@GetMapping("/history/byConf/{wfConfId}")
public List<WfHistoryEntry> history(@PathVariable final String wfConfId) {
return logger.history(wfConfId);
return jobManager.history(wfConfId);
}
@GetMapping("/proc/{processId}")
public WfHistoryEntry getProcessExecution(@PathVariable final String processId) {
return logger.getLog(processId);
return jobManager.getLog(processId);
}
@GetMapping("/sections")

View File

@ -16,7 +16,7 @@ import eu.dnetlib.common.clients.WfExecutorClient;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.wfs.repository.WorkflowConfigurationRepository;
import eu.dnetlib.wfs.service.WorkflowLogger;
import eu.dnetlib.wfs.service.WorkflowJobManager;
@Service
public class ScheduledWorkflowLauncher {
@ -30,7 +30,7 @@ public class ScheduledWorkflowLauncher {
private WorkflowConfigurationRepository workflowConfigurationRepository;
@Autowired
private WorkflowLogger logger;
private WorkflowJobManager jobManager;
@Value("${dnet.workflow.scheduler.windowSize:1800000}")
private int windowSize; // 1800000 are 30 minutes
@ -92,7 +92,7 @@ public class ScheduledWorkflowLauncher {
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return logger.getLastLogForConfiguration(id)
return jobManager.getLastLogForConfiguration(id)
.map(WfHistoryEntry::getEndDate)
.orElse(LocalDateTime.MIN);
}
@ -104,20 +104,22 @@ public class ScheduledWorkflowLauncher {
final LocalDateTime date = now.minus(windowSize, ChronoUnit.MINUTES);
final LocalDateTime nextDate = cron.next(date);
if (log.isDebugEnabled()) {
log.debug("NEXT EXECUTION DATE: " + nextDate);
log.debug("FIRED : " + nextDate.isBefore(now));
if (nextDate != null) {
if (log.isDebugEnabled()) {
log.debug("NEXT EXECUTION DATE: " + nextDate);
log.debug("FIRED : " + nextDate.isBefore(now));
}
return nextDate.isBefore(now);
}
return nextDate.isBefore(now);
} catch (final Exception e) {
log.error("Error calculating next cron event: " + cronExpression, e);
return false;
}
return false;
}
private boolean isNotRunning(final WorkflowConfiguration conf) {
//TODO (HIGH PRIORITY)
// TODO (HIGH PRIORITY)
// final WorkflowProcess p = processRegistry.findProcsByConfigurationId(conf.getId());
//
// if (p != null) {

View File

@ -25,6 +25,9 @@ public class WfHistoryEntry implements Serializable {
@Column(name = "wf_conf_id")
private String wfConfigurationId;
@Column(name = "wf_executor")
private String wf_executor;
@Column(name = "name")
private String name;
@ -69,6 +72,14 @@ public class WfHistoryEntry implements Serializable {
this.wfConfigurationId = wfConfigurationId;
}
public String getWf_executor() {
return wf_executor;
}
public void setWf_executor(final String wf_executor) {
this.wf_executor = wf_executor;
}
public String getName() {
return name;
}

View File

@ -15,7 +15,7 @@ import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
@Service
public class WorkflowLogger {
public class WorkflowJobManager {
@Autowired
private WfHistoryEntryRepository wfHistoryEntryRepository;

View File

@ -21,7 +21,7 @@ import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.service.WorkflowLogger;
import eu.dnetlib.wfs.service.WorkflowJobManager;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.NodeCallback;
@ -34,7 +34,7 @@ public class ProcessEngine {
private EmailSender emailSender;
@Autowired
private WorkflowLogger wfLogger;
private WorkflowJobManager jobManager;
@Autowired
private ApplicationContext applicationContext;
@ -84,7 +84,7 @@ public class ProcessEngine {
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken.getEnv())) {
if (graphNode.isJoin() || graphNode.isSucessNode()) {
if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<Token>());
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<>());
}
final List<Token> list = process.getPausedJoinNodeTokens().get(graphNode.getName());
@ -166,7 +166,7 @@ public class ProcessEngine {
private void completeProcess(final WorkflowProcess process, final Token token) {
token.checkStatus();
process.complete(token);
wfLogger.saveLog(process.asLog());
jobManager.saveLog(process.asLog());
emailSender.sendMails(process);
}

View File

@ -19,6 +19,8 @@ import eu.dnetlib.wfs.utils.WorkflowsConstants;
@Service
public class ProcessRegistry {
// TODO (HIGH PRIORITY) : lo stato e' mantenuto nel database, deve usare JobManager (new field wf_executor)
private static final Log log = LogFactory.getLog(ProcessRegistry.class);
private final Map<String, WorkflowProcess> procs = new HashMap<>();
private final Map<String, WorkflowProcess> byConfId = new HashMap<>();