start wfs using postgres LISTEN/NOTIFY

This commit is contained in:
Michele Artini 2023-12-20 15:35:25 +01:00
parent cc7052ade7
commit aadd9c6871
6 changed files with 105 additions and 50 deletions

View File

@ -136,7 +136,8 @@ public class ApiController extends DnetRestController {
@PostMapping("/repo-hi/{id}/start")
public WfJournalEntry startRepoHi(@PathVariable final String id, @RequestBody final WfRepoHiParams params) {
return wfManagerService
.prepareNewJob(id, WorkflowsConstants.REPO_HI_JOB, WorkflowsConstants.REPO_HI_JOB, params.getDsId(), params.getDsName(), params.getApiId());
.prepareNewJob(id, null, WorkflowsConstants.REPO_HI_JOB, WorkflowsConstants.REPO_HI_JOB, params.getDsId(), params.getDsName(), params
.getApiId());
}
@GetMapping("/template/{id}")

View File

@ -12,6 +12,7 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
@ -49,6 +50,8 @@ public class WorkflowManagerService {
private WfSubscriptionRepository wfSubscriptionRepository;
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public WfConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
@ -107,39 +110,29 @@ public class WorkflowManagerService {
wfSubscriptionRepository.saveAll(subscriptions);
}
@Transactional
public WfJournalEntry prepareNewJob(final WfConfiguration conf, final boolean destroy) {
final WfJournalEntry job = new WfJournalEntry();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setWfConfigurationId(conf.getId());
job.setName(conf.getName());
job.setFamily(conf.getSection());
final String wfTemplateId = destroy ? conf.getDestroyWf() : conf.getWorkflow();
final String wfConfId = conf.getId();
final String name = conf.getName();
final String family = conf.getSection();
final String dsId = conf.getDsId();
final String dsName = conf.getDsName();
final String apiId = conf.getApiId();
job.setWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
job.setDsId(conf.getDsId());
job.setDsName(conf.getDsName());
job.setDsApi(conf.getApiId());
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
return wfJournalEntryRepository.save(job);
return prepareNewJob(wfTemplateId, wfConfId, name, family, dsId, dsName, apiId);
}
@Transactional
public WfJournalEntry prepareNewJob(final String wfTemplateId,
final String wfConfId,
final String name,
final String family,
final String dsId,
final String dsName,
final String apiId) {
final WfJournalEntry job = new WfJournalEntry();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setWfConfigurationId(null);
job.setName(name);
@ -159,7 +152,11 @@ public class WorkflowManagerService {
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
return wfJournalEntryRepository.save(job);
final WfJournalEntry saved = wfJournalEntryRepository.save(job);
jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
return saved;
}
public List<WfJournalEntry> recentHistory() {

View File

@ -43,6 +43,8 @@ public class WorkflowsConstants {
public static final String REPO_HI_JOB = "REPO-HI";
public static final String REPO_BYE_JOB = "REPO-BYE";
public static final String WF_CREATED_NOTIFICATION_CHANNEL = "CREATED_WF";
public enum WorkflowStatus {
EXECUTABLE("Executable", "icon-ok"),

View File

@ -9,6 +9,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfJournalEntry;
@ -30,11 +31,10 @@ public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry,
List<WfJournalEntry> findByStatus(JobStatus status);
List<WfJournalEntry> findByStatusAndWfExecutor(JobStatus status, String wfExecutor);
Optional<WfJournalEntry> findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(String id);
@Modifying
@Transactional
@Query(value = "update WfJournalEntry set wfExecutor = ?2, status = ?3, startDate = ?4, lastUpdate = ?4 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);

View File

@ -50,13 +50,25 @@
<version>1.0.11</version>
</dependency>
<!-- for jpa annotations -->
<!-- for jpa -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>provided</scope>
</dependency>
<!-- for /metrics and /health controllers -->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -7,23 +7,26 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.service.ServiceStatus;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
@ -41,6 +44,7 @@ import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import jakarta.annotation.PostConstruct;
import jakarta.transaction.Transactional;
@Service
@ -57,6 +61,9 @@ public class ProcessEngine {
@Autowired
private WfConfigurationRepository wfConfigurationRepository;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ApplicationContext applicationContext;
@ -72,43 +79,79 @@ public class ProcessEngine {
@Value("${dnet.wf.procs.size:20}")
private int maxSize;
@Transactional
@Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public synchronized void acceptNewWorkflows() throws Exception {
private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
if (processRegistry.countRunningWfs() >= maxSize) {
log.debug("Checking pending wfs - SKIP");
return;
@PostConstruct
public void createNotificationHandler() {
final Thread t1 = new Thread(this::notificationHandler, "wf-created-listener");
t1.start();
final Thread t2 = new Thread(this::consumeQueue, "wf-queue-consumer");
t2.start();
}
private void notificationHandler() {
while ((ServiceStatusRegistry.getStatus() == null) || StringUtils.isBlank(ServiceStatusRegistry.getStatus().getName())) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
final ServiceStatus status = ServiceStatusRegistry.getStatus();
final ConnectionCallback<Integer> action = c -> {
c.createStatement().execute("LISTEN " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL);
final PGConnection pgconn = c.unwrap(PGConnection.class);
while (!Thread.currentThread().isInterrupted()) {
final PGNotification[] nts = pgconn.getNotifications(10000);
if (nts != null) {
for (final PGNotification n : nts) {
final String processId = n.getParameter();
log.debug("NOTIFICATION RECEIVED: " + processId);
queue.add(processId);
}
}
}
return 0;
};
if ((status != null) && StringUtils.isNotBlank(status.getName())) {
wfJournalEntryRepository
.findByStatus(JobStatus.created)
.forEach(job -> wfJournalEntryRepository.tryAssegnment(job.getProcessId(), status.getName(), JobStatus.accepted, LocalDateTime.now()));
jdbcTemplate.execute(action);
}
private void consumeQueue() {
while ((ServiceStatusRegistry.getStatus() == null) || StringUtils.isBlank(ServiceStatusRegistry.getStatus().getName())) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true) {
try {
tryToStartWf(queue.take(), ServiceStatusRegistry.getStatus().getName());
} catch (final Throwable e) {
log.error("Error accepting new wfs", e);
}
}
}
@Transactional
@Scheduled(initialDelay = 20, fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public synchronized void startAcceptedWorkflows() throws Exception {
private void tryToStartWf(final String processId, final String executor) {
log.debug("Trying to start " + processId);
if (processRegistry.countRunningWfs() >= maxSize) {
log.debug("Checking accepted wfs - SKIP");
return;
}
wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
final ServiceStatus status = ServiceStatusRegistry.getStatus();
final Optional<WfJournalEntry> job = wfJournalEntryRepository.findById(processId);
if ((status != null) && StringUtils.isNotBlank(status.getName())) {
wfJournalEntryRepository.findByStatusAndWfExecutor(JobStatus.accepted, status.getName())
.forEach(job -> startWorkflowJob(job, null, null));
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
startWorkflowJob(job.get(), null, null);
}
}
@Transactional
public void startWorkflowJob(final WfJournalEntry job, final WfConfiguration passedConf, final ProcessCallback callback) {
try {