wf executor accept

This commit is contained in:
Michele Artini 2023-12-04 14:49:49 +01:00
parent fbc618af83
commit 7a6127b1f7
13 changed files with 75 additions and 24 deletions

View File

@ -26,7 +26,6 @@ import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.VocabularyClient;
@ -58,6 +57,7 @@ import eu.dnetlib.services.dsm.utils.DsmBrowsableFields;
import eu.dnetlib.services.dsm.utils.DsmMappingUtils;
import eu.dnetlib.services.dsm.utils.ResponseUtils;
import jakarta.persistence.EntityNotFoundException;
import jakarta.transaction.Transactional;
@Service
public class DsmService {

View File

@ -18,7 +18,7 @@ import eu.dnetlib.common.index.solr.SolrService;
import eu.dnetlib.domain.index.IndexConfiguration;
import eu.dnetlib.errors.DnetException;
import eu.dnetlib.errors.InformationServiceException;
import eu.dnetlib.services.index.repository.service.IndexConfigurationService;
import eu.dnetlib.services.index.service.IndexConfigurationService;
@RestController
@RequestMapping("/api/index")

View File

@ -1,16 +1,16 @@
package eu.dnetlib.services.index.repository.service;
package eu.dnetlib.services.index.service;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.index.IndexConfiguration;
import eu.dnetlib.errors.InformationServiceException;
import eu.dnetlib.services.index.repository.IndexConfigurationRepository;
import eu.dnetlib.services.index.repository.IndexFieldRepository;
import jakarta.transaction.Transactional;
@Service
public class IndexConfigurationService {

View File

@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import eu.dnetlib.common.app.AbstractDnetApp;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
@ -27,6 +28,7 @@ import eu.dnetlib.domain.wfs.WfSubscription;
WfSection.class,
WfSubscription.class
})
@EnableScheduling
public class WfExecutorApplication extends AbstractDnetApp {
@Autowired(required = false)

View File

@ -118,7 +118,7 @@ public class ScheduledWorkflowLauncher {
}
private boolean isNotRunning(final WfConfiguration conf) {
return !(hasStatus(conf, JobStatus.pending) || hasStatus(conf, JobStatus.created) || hasStatus(conf, JobStatus.running));
return !(hasStatus(conf, JobStatus.created) || hasStatus(conf, JobStatus.accepted) || hasStatus(conf, JobStatus.running));
}
private boolean hasStatus(final WfConfiguration conf, final JobStatus status) {

View File

@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
@ -33,6 +32,7 @@ import eu.dnetlib.wfs.repository.WfSectionRepository;
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import eu.dnetlib.wfs.utils.WfProcessUtils;
import jakarta.transaction.Transactional;
@Service
public class WorkflowManagerService {
@ -121,7 +121,7 @@ public class WorkflowManagerService {
job.setDsName(conf.getDsName());
job.setDsApi(conf.getApiId());
job.setStatus(JobStatus.pending);
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);
@ -150,7 +150,7 @@ public class WorkflowManagerService {
job.setDsName(dsName);
job.setDsApi(apiId);
job.setStatus(JobStatus.pending);
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);

View File

@ -41,7 +41,7 @@
<span class="badge-label" [ngClass]="{
'badge-success' : element.status === 'success',
'badge-failure' : element.status === 'failure',
'badge-pending' : element.status === 'pending'
'badge-pending' : element.status === 'created' || element.status === 'accepted'
}">{{element.status}}</span>
</td>
</ng-container>

View File

@ -1,5 +1,10 @@
package eu.dnetlib.domain.wfs;
public enum JobStatus {
pending, created, running, success, failure, killed;
created,
accepted,
running,
success,
failure,
killed;
}

View File

@ -6,13 +6,13 @@ import java.util.stream.Stream;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.common.mdstores.MDStoreBackend;
import eu.dnetlib.domain.mdstore.MDStore;
import eu.dnetlib.domain.mdstore.MDStoreVersion;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.errors.MDStoreManagerException;
import jakarta.transaction.Transactional;
public class MDStoreSqlBackend implements MDStoreBackend {

View File

@ -33,7 +33,7 @@ public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry,
Optional<WfJournalEntry> findFirstByWfConfigurationIdOrderByEndDateDesc(String id);
@Modifying
@Query(value = "update WfJournalEntry set wfExecutor = ?2 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId);
@Query(value = "update WfJournalEntry set wfExecutor = ?2, status = ?3, startDate = ?4 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);
}

View File

@ -14,6 +14,13 @@
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-wf-common</artifactId>

View File

@ -1,18 +1,21 @@
package eu.dnetlib.wfs.service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
@ -36,6 +39,7 @@ import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import jakarta.transaction.Transactional;
@Service
public class WfExecutorService implements Stoppable {
@ -67,35 +71,67 @@ public class WfExecutorService implements Stoppable {
@Transactional
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
public synchronized void startWorkflow() throws Exception {
if (isPaused() || (processRegistry.countRunningWfs() >= maxSize)) {}
public synchronized void catchNewWorkflows() throws Exception {
if (isPaused() || (processRegistry.countRunningWfs() >= maxSize)) {
if (log.isDebugEnabled()) {
log.debug("Checking pending wfs - SKIP");
}
return;
}
final ServiceStatus status = ServiceStatusRegistry.getStatus();
if ((status != null) && StringUtils.isNotBlank(status.getName())) {
for (final WfJournalEntry job : wfJournalEntryRepository.findByStatus(JobStatus.pending)) {
if ((processRegistry.countRunningWfs() < maxSize) && tryAssegnment(job, status.getName())) {
final List<WfJournalEntry> jobs = tryAssignments(status.getName());
for (final WfJournalEntry job : jobs) {
try {
startWorkflowJob(job);
} catch (final Throwable e) {
final Map<String, String> details = new LinkedHashMap<>();
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, e.getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(e));
job.setDetails(details);
job.setStatus(JobStatus.failure);
job.setEndDate(LocalDateTime.now());
wfJournalEntryRepository.save(job);
}
}
}
}
private boolean tryAssegnment(final WfJournalEntry job, final String executorId) {
@Transactional
private List<WfJournalEntry> tryAssignments(final String executor) {
log.debug("Checking pending wfs");
final String id = job.getProcessId();
wfJournalEntryRepository.tryAssegnment(id, executorId);
return wfJournalEntryRepository.findByStatus(JobStatus.created)
.stream()
.filter(job -> trySingleAssignment(job.getProcessId()))
.toList();
}
return wfJournalEntryRepository.findById(id)
@Transactional
private boolean trySingleAssignment(final String processId) {
if (ServiceStatusRegistry.getStatus() == null) { return false; }
final String executor = ServiceStatusRegistry.getStatus().getName();
if (StringUtils.isBlank(executor)) { return false; }
wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
return wfJournalEntryRepository.findById(processId)
.map(WfJournalEntry::getWfExecutor)
.filter(s -> executorId.equals(s))
.filter(s -> executor.equals(s))
.isPresent();
}
private void startWorkflowJob(final WfJournalEntry job) throws WorkflowManagerException {
final WfConfiguration conf;
log.debug("Starting job: " + job.getProcessId());
if (StringUtils.isNotBlank(job.getWfConfigurationId())) {
conf = wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));

View File

@ -149,6 +149,7 @@ services:
- db-mdstores-data
- is-manager
environment:
- LOGGING_LEVEL_EU_DNETLIB=DEBUG
- DNET_IS_URL=http://is-manager:${SPRING_BOOT_PORT}
- SPRING_DATASOURCE_URL=jdbc:postgresql://db-main:${PG_PORT}/${PG_WFS_DB}
- SPRING_DATASOURCE_USERNAME=${PG_USER}