dnet-docker/dnet-app/apps/dnet-wf-manager/src/main/java/eu/dnetlib/wfs/manager/service/ScheduledWorkflowLauncher.java

140 lines
4.8 KiB
Java

package eu.dnetlib.wfs.manager.service;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.conf.WfConfiguration;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
@Service
public class ScheduledWorkflowLauncher {
private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
@Autowired
private WfConfigurationRepository wfConfigurationRepository;
@Autowired
private WorkflowManagerService workflowManagerService;
@Value("${dnet.workflow.scheduler.windowSize:1800000}")
private int windowSize; // 1800000 are 30 minutes
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
@Scheduled(fixedRateString = "${dnet.workflow.scheduler.fixedRate:900000}") // 900000 are 5 minutes
public void verifySheduledWorkflows() {
log.debug("Verifying scheduled workflows - START");
this.wfConfigurationRepository.findAll()
.stream()
.filter(WfConfiguration::isEnabled)
.filter(WfConfiguration::isConfigured)
.filter(WfConfiguration::isSchedulingEnabled)
.filter(this::isNotRunning)
.filter(this::isReady)
.forEach(conf -> {
try {
final String family = WfConfigurationUtils.calculateFamily(conf, false);
final String[] wfTemplateIds = conf.getWorkflows().toArray(new String[conf.getWorkflows().size()]);
this.workflowManagerService.prepareNewJob(conf, family, wfTemplateIds);
} catch (final Exception e) {
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
}
});
log.debug("Verifying scheduled workflows - END");
}
private boolean isReady(final WfConfiguration conf) {
final LocalDateTime lastExecutionDate = calculateLastExecutionDate(conf.getId());
final LocalDateTime now = LocalDateTime.now();
final String cron = conf.getCronExpression();
if (CronExpression.isValidExpression(cron)) {
final int minInterval = conf.getCronMinInterval(); // in minutes
final boolean res;
if (lastExecutionDate != null) {
final long elapsed = ChronoUnit.MINUTES.between(lastExecutionDate, now);
res = (elapsed > minInterval) && verifyCron(cron, now);
} else {
res = verifyCron(cron, now);
}
if (log.isDebugEnabled()) {
log.debug("**************************************************************");
log.debug("WORKFLOW CONFIGURATION ID : " + conf.getId());
log.debug("NOW : " + now);
log.debug("LAST EXECUTION DATE : " + lastExecutionDate);
log.debug("MIN INTERVAL (minutes) : " + minInterval);
log.debug("WINDOW SIZE (ms) : " + this.windowSize);
log.debug("MUST BE EXECUTED : " + res);
log.debug("**************************************************************");
}
return res;
}
return false;
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return this.wfJournalEntryRepository.findFirstByWfConfIdAndEndDateNotNullOrderByEndDateDesc(id)
.map(WfJournalEntry::getEndDate)
.orElse(LocalDateTime.MIN);
}
private boolean verifyCron(final String cronExpression, final LocalDateTime now) {
try {
final CronExpression cron = CronExpression.parse(cronExpression);
final LocalDateTime date = now.minus(this.windowSize, ChronoUnit.MINUTES);
final LocalDateTime nextDate = cron.next(date);
if (nextDate != null) {
if (log.isDebugEnabled()) {
log.debug("NEXT EXECUTION DATE: " + nextDate);
log.debug("FIRED : " + nextDate.isBefore(now));
}
return nextDate.isBefore(now);
}
} catch (final Exception e) {
log.error("Error calculating next cron event: " + cronExpression, e);
}
return false;
}
private boolean isNotRunning(final WfConfiguration conf) {
return !(hasStatus(conf, JobStatus.created) || hasStatus(conf, JobStatus.accepted) || hasStatus(conf, JobStatus.running));
}
private boolean hasStatus(final WfConfiguration conf, final JobStatus status) {
final String id = conf.getId();
return this.wfJournalEntryRepository.findByStatus(status)
.stream()
.map(WfJournalEntry::getWfConfId)
.filter(StringUtils::isNotBlank)
.filter(s -> s.equals(id))
.map(s -> true)
.findFirst()
.orElse(false);
}
}