dnet-docker/dnet-app/apps/dnet-wf-manager/src/main/java/eu/dnetlib/wfs/manager/service/WorkflowManagerService.java

298 lines
11 KiB
Java

package eu.dnetlib.wfs.manager.service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.conf.WfConfiguration;
import eu.dnetlib.domain.wfs.conf.WfSection;
import eu.dnetlib.domain.wfs.graph.Node;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.domain.wfs.subscriptions.WfSubscription;
import eu.dnetlib.domain.wfs.templates.WfRepoHiDesc;
import eu.dnetlib.domain.wfs.templates.WfTemplate;
import eu.dnetlib.domain.wfs.templates.WfTemplateDesc;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
import eu.dnetlib.wfs.repository.WfSectionRepository;
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
import eu.dnetlib.wfs.utils.GraphUtils;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import eu.dnetlib.wfs.utils.WfProcessUtils;
import jakarta.transaction.Transactional;
@Service
public class WorkflowManagerService {
private static final int MAX_HISTORY_SIZE = 50;
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
@Autowired
private WfRunningJobRepository jobRepository;
@Autowired
private WfSectionRepository wfSectionRepository;
@Autowired
private WfConfigurationRepository wfConfigurationRepository;
@Autowired
private WfSubscriptionRepository wfSubscriptionRepository;
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private JdbcTemplate jdbcTemplate;
// NB: the properties should be declared in all the containers (manager and workers)
@Autowired
private Environment environment;
@Transactional
public WfConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
return this.wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
}
@Transactional
public Iterable<WfSection> listSections() {
return this.wfSectionRepository.findAll();
}
@Transactional
public List<WfConfiguration> listWfConfigurationsBySection(final String section) {
return this.wfConfigurationRepository.findBySection(section);
}
@Transactional
public List<WfConfiguration> listWfConfigurationsByApiId(final String apiId) {
return this.wfConfigurationRepository.findByApiId(apiId);
}
@Transactional
public WfConfiguration saveWfConfiguration(final WfConfiguration conf) {
if (StringUtils.isBlank(conf.getId())) {
conf.setId("wfconf-" + UUID.randomUUID());
}
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final List<ImmutablePair<String, WfTemplate>> wfTemplates = conf.getWorkflows()
.stream()
.map(wf -> new ImmutablePair<>(wf, client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, wf, WfTemplate.class)))
.toList();
conf.setConfigured(WfConfigurationUtils.isConfigured(wfTemplates, conf));
return this.wfConfigurationRepository.save(conf);
}
public WfTemplateDesc findWfTemplateDesc(final String name) {
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final SimpleResource metadata = client.findResource(WorkflowsConstants.WF_TEMPLATE, name);
final WfTemplate tmpl = client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
final WfTemplateDesc res = new WfTemplateDesc();
res.setId(metadata.getName());
res.setName(metadata.getName());
res.setDescription(metadata.getDescription());
res.setGraph(tmpl.getGraph());
res.setParameters(tmpl.getParameters());
res.setInfo(tmpl.getInfo());
return res;
}
@Transactional
public void deleteWfConfiguration(final String id) {
this.wfSubscriptionRepository.deleteByWfConfigurationId(id);
this.wfConfigurationRepository.deleteById(id);
}
@Transactional
public List<WfSubscription> listSubscriptions(final String id) {
return this.wfSubscriptionRepository.findByWfConfigurationId(id);
}
@Transactional
public void saveSubscriptions(final String wfConfId, final List<WfSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
this.wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
this.wfSubscriptionRepository.saveAll(subscriptions);
}
public WfRunningJob prepareNewJob(final WfConfiguration conf, final boolean destroy) throws WorkflowManagerException {
final String wfConfId = conf.getId();
final String name = conf.getName();
final String family;
if (destroy) {
if (StringUtils.isNotBlank(conf.getDsId())) {
family = WorkflowsConstants.REPO_BYE_FAMILY;
} else {
family = WorkflowsConstants.GENERIC_DESTROY_WF_FAMILY;
}
} else if (StringUtils.isBlank(conf.getSection())) {
if (StringUtils.isNotBlank(conf.getDsId())) {
family = WorkflowsConstants.AGGREGATION_FAMILY;
} else {
family = WorkflowsConstants.UNKNOWN_FAMILY;
}
} else {
family = conf.getSection();
}
final String dsId = conf.getDsId();
final String dsName = conf.getDsName();
final String apiId = conf.getApiId();
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final List<ImmutablePair<String, WfTemplate>> wfTemplates = (destroy ? Arrays.asList(conf.getDestroyWf()) : conf.getWorkflows())
.stream()
.map(wf -> new ImmutablePair<>(wf, client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, wf, WfTemplate.class)))
.toList();
final List<Node> graph = WfConfigurationUtils.compositeGraph(wfTemplates);
final Map<String, Object> params = WfConfigurationUtils.allConfiguredParameters(wfTemplates, conf);
return prepareNewJob(wfConfId, name, family, dsId, dsName, apiId, graph, params);
}
@Transactional
public WfRunningJob prepareNewJob(final String wfConfId,
final String name,
final String family,
final String dsId,
final String dsName,
final String apiId,
final List<Node> nodes,
final Map<String, Object> params) throws WorkflowManagerException {
final WfRunningJob job = new WfRunningJob();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setName(name);
job.setFamily(family);
job.setWfConfId(wfConfId);
job.setDsId(dsId);
job.setDsName(dsName);
job.setApiId(apiId);
job.setStatus(JobStatus.created);
job.setOutputParams(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
job.setGraph(GraphUtils.prepareRuntineGraph(nodes, params, this.environment));
job.setInputParams(params);
final WfRunningJob saved = this.jobRepository.save(job);
this.jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
return saved;
}
public List<WfJournalEntry> recentHistory() {
return this.wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForDsId(final String dsId) {
return this.wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForApiId(final String apiId) {
return this.wfJournalEntryRepository.findByApiIdOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> findHistory(final int total, final LocalDate from, final LocalDate to) {
final int max = ((total > 0) || (total < MAX_HISTORY_SIZE)) ? total : MAX_HISTORY_SIZE;
if ((from == null) && (to == null)) {
return this.wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList();
}
final LocalDateTime fromTime = from != null ? from.atTime(0, 0, 0) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? to.atTime(23, 59, 59) : LocalDateTime.MAX;
return this.wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
}
public List<WfJournalEntry> recentHistoryForConfiguration(final String wfConfId) {
return this.wfJournalEntryRepository.findByWfConfIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public WfJournalEntry findProcessLog(final String processId) {
return this.wfJournalEntryRepository.findById(processId).get();
}
public List<WfRepoHiDesc> listRepoHis() {
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
return client.findResources(WorkflowsConstants.WF_TEMPLATE, WorkflowsConstants.REPO_HI_FAMILY)
.stream()
.map(r -> {
final WfTemplate tmpl = client.findResourceContent(r.getId(), WfTemplate.class);
final WfRepoHiDesc repohi = new WfRepoHiDesc();
repohi.setId(r.getName());
repohi.setName(r.getName());
repohi.setDescription(r.getDescription());
repohi.setGraph(tmpl.getGraph());
repohi.setParameters(tmpl.getParameters());
repohi.setInfo(tmpl.getInfo());
repohi.setExpectedEoscDsTypes(new LinkedHashSet<>());
repohi.setExpectedCompliances(new LinkedHashSet<>());
tmpl.getParameters().forEach(p -> {
if ("expectedEoscDsTypes".equalsIgnoreCase(p.getName())) {
for (final String s : StringUtils.split(p.getDefaultValue(), ",")) {
if (StringUtils.isNotBlank(s)) {
repohi.getExpectedEoscDsTypes().add(s.trim());
}
}
}
if ("expectedCompliances".equalsIgnoreCase(p.getName())) {
for (final String s : StringUtils.split(p.getDefaultValue(), ",")) {
if (StringUtils.isNotBlank(s)) {
repohi.getExpectedCompliances().add(s.trim());
}
}
}
});
return repohi;
})
.sorted(Comparator.comparing(WfRepoHiDesc::getName))
.toList();
}
}