dnet-docker/dnet-app/apps/dnet-wf-manager/src/main/java/eu/dnetlib/wfs/manager/service/WorkflowManagerService.java

259 lines
8.8 KiB
Java

package eu.dnetlib.wfs.manager.service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfRepoHiDesc;
import eu.dnetlib.domain.wfs.WfSection;
import eu.dnetlib.domain.wfs.WfSubscription;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WfTemplate.WfParam;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.repository.WfSectionRepository;
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import eu.dnetlib.wfs.utils.WfProcessUtils;
import jakarta.transaction.Transactional;
@Service
public class WorkflowManagerService {
private static final int MAX_HISTORY_SIZE = 50;
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
@Autowired
private WfSectionRepository wfSectionRepository;
@Autowired
private WfConfigurationRepository wfConfigurationRepository;
@Autowired
private WfSubscriptionRepository wfSubscriptionRepository;
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public WfConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
return wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
}
@Transactional
public Iterable<WfSection> listSections() {
return wfSectionRepository.findAll();
}
@Transactional
public List<WfConfiguration> listWfConfigurationsBySection(final String section) {
return wfConfigurationRepository.findBySection(section);
}
@Transactional
public List<WfConfiguration> listWfConfigurationsByApiId(final String apiId) {
return wfConfigurationRepository.findByApiId(apiId);
}
@Transactional
public WfConfiguration saveWfConfiguration(final WfConfiguration conf) {
if (StringUtils.isBlank(conf.getId())) {
conf.setId("wfconf-" + UUID.randomUUID());
}
final List<WfParam> wfParams = findWfTemplate(conf.getWorkflow()).getParameters();
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
wfConfigurationRepository.save(conf);
return conf;
}
public WfTemplate findWfTemplate(final String name) {
return clientFactory.getClient(SimpleResourceClient.class).findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
}
public String findWfTemplateId(final String name) {
return clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
}
@Transactional
public void deleteWfConfiguration(final String id) {
wfSubscriptionRepository.deleteByWfConfigurationId(id);
wfConfigurationRepository.deleteById(id);
}
@Transactional
public List<WfSubscription> listSubscriptions(final String id) {
return wfSubscriptionRepository.findByWfConfigurationId(id);
}
@Transactional
public void saveSubscriptions(final String wfConfId, final List<WfSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
wfSubscriptionRepository.saveAll(subscriptions);
}
public WfJournalEntry prepareNewJob(final WfConfiguration conf, final boolean destroy) {
final String wfTemplateId = findWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
final String wfConfId = conf.getId();
final String name = conf.getName();
final String family;
if (destroy) {
if (StringUtils.isNotBlank(conf.getDsId())) {
family = WorkflowsConstants.REPO_BYE_FAMILY;
} else {
family = WorkflowsConstants.GENERIC_DESTROY_WF_FAMILY;
}
} else if (StringUtils.isBlank(conf.getSection())) {
if (StringUtils.isNotBlank(conf.getDsId())) {
family = WorkflowsConstants.AGGREGATION_FAMILY;
} else {
family = WorkflowsConstants.UNKNOWN_FAMILY;
}
} else {
family = conf.getSection();
}
final String dsId = conf.getDsId();
final String dsName = conf.getDsName();
final String apiId = conf.getApiId();
return prepareNewJob(wfTemplateId, wfConfId, name, family, dsId, dsName, apiId);
}
@Transactional
public WfJournalEntry prepareNewJob(final String wfTemplateId,
final String wfConfId,
final String name,
final String family,
final String dsId,
final String dsName,
final String apiId) {
final WfJournalEntry job = new WfJournalEntry();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setWfConfigurationId(null);
job.setName(name);
job.setFamily(family);
job.setWfTemplateId(wfTemplateId);
job.setDsId(dsId);
job.setDsName(dsName);
job.setDsApi(apiId);
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
final WfJournalEntry saved = wfJournalEntryRepository.save(job);
jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
return saved;
}
public List<WfJournalEntry> recentHistory() {
return wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForDsId(final String dsId) {
return wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForApiId(final String apiId) {
return wfJournalEntryRepository.findByDsApiOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> findHistory(final int total, final LocalDate from, final LocalDate to) {
final int max = ((total > 0) || (total < MAX_HISTORY_SIZE)) ? total : MAX_HISTORY_SIZE;
if ((from == null) && (to == null)) { return wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList(); }
final LocalDateTime fromTime = from != null ? from.atTime(0, 0, 0) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? to.atTime(23, 59, 59) : LocalDateTime.MAX;
return wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
}
public List<WfJournalEntry> recentHistoryForConfiguration(final String wfConfId) {
return wfJournalEntryRepository.findByWfConfigurationIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public WfJournalEntry findProcessLog(final String processId) {
return wfJournalEntryRepository.findById(processId).get();
}
public List<WfRepoHiDesc> listRepoHis() {
final SimpleResourceClient client = clientFactory.getClient(SimpleResourceClient.class);
return client.findResources(WorkflowsConstants.WF_TEMPLATE, WorkflowsConstants.REPO_HI_FAMILY)
.stream()
.map(r -> {
final WfTemplate tmpl = client.findResourceContent(r.getId(), WfTemplate.class);
final WfRepoHiDesc repohi = new WfRepoHiDesc();
repohi.setId(r.getId());
repohi.setName(r.getName());
repohi.setDescription(r.getDescription());
repohi.setGraph(tmpl.getGraph());
repohi.setParameters(tmpl.getParameters());
repohi.setInfo(tmpl.getInfo());
repohi.setExpectedEoscDsTypes(new LinkedHashSet<>());
repohi.setExpectedCompliances(new LinkedHashSet<>());
tmpl.getParameters().forEach(p -> {
if ("expectedEoscDsTypes".equalsIgnoreCase(p.getName())) {
for (final String s : StringUtils.split(p.getDefaultValue(), ",")) {
if (StringUtils.isNotBlank(s)) {
repohi.getExpectedEoscDsTypes().add(s.trim());
}
}
}
if ("expectedCompliances".equalsIgnoreCase(p.getName())) {
for (final String s : StringUtils.split(p.getDefaultValue(), ",")) {
if (StringUtils.isNotBlank(s)) {
repohi.getExpectedCompliances().add(s.trim());
}
}
}
});
return repohi;
})
.sorted(Comparator.comparing(WfRepoHiDesc::getName))
.toList();
}
}