new_history_table #1

Merged
michele.artini merged 6 commits from new_history_table into main 2024-01-10 12:17:16 +01:00
28 changed files with 554 additions and 588 deletions

View File

@ -7,11 +7,11 @@ BEGIN;
-- WF History
CREATE TABLE wf_journal (
process_id text PRIMARY KEY,
wf_template_id text NOT NULL,
wf_conf_id text,
wf_executor text,
CREATE TABLE wf_runtime (
process_id text PRIMARY KEY,
wf_conf_id text,
wf_executor text,
priority int NOT NULL DEFAULT 100,
name text NOT NULL,
family text NOT NULL,
status text NOT NULL,
@ -21,9 +21,41 @@ CREATE TABLE wf_journal (
ds_id text,
ds_name text,
ds_api text,
graph jsonb,
params jsonb,
details jsonb
);
CREATE TABLE wf_history (
process_id text PRIMARY KEY,
wf_conf_id text,
name text NOT NULL,
family text NOT NULL,
status text NOT NULL,
start_date timestamp,
end_date timestamp,
ds_id text,
ds_name text,
ds_api text,
details jsonb
);
CREATE VIEW wf_journal_view AS SELECT
coalesce(r.process_id, h.process_id) AS process_id,
coalesce(r.wf_conf_id, h.wf_conf_id) AS wf_conf_id,
coalesce(r.name, h.name) AS name,
coalesce(r.family, h.family) AS family,
coalesce(r.status, h.status) AS status,
coalesce(r.start_date, h.start_date) AS start_date,
coalesce(r.end_date, h.end_date) AS end_date,
coalesce(r.last_update, h.end_date) AS last_update,
coalesce(r.ds_id, h.ds_id) AS ds_id,
coalesce(r.ds_name, h.ds_name) AS ds_name,
coalesce(r.ds_api, h.ds_api) AS ds_api,
coalesce(r.details, h.details) AS details
FROM wf_runtime r FULL OUTER JOIN wf_history h ON (r.process_id = h.process_id);
-- Workflows
CREATE TABLE wf_sections (

View File

@ -17,9 +17,9 @@ import eu.dnetlib.common.mapping.xslt.XsltTransformFactory;
import eu.dnetlib.common.mdstores.backends.sql.MDStoreSqlBackend;
import eu.dnetlib.domain.service.ServiceType;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfSection;
import eu.dnetlib.domain.wfs.WfSubscription;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
@SpringBootApplication
@EntityScan(basePackageClasses = {

View File

@ -13,9 +13,9 @@ import eu.dnetlib.common.mapping.xslt.DnetXsltFunction;
import eu.dnetlib.common.mapping.xslt.XsltTransformFactory;
import eu.dnetlib.domain.service.ServiceType;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfSection;
import eu.dnetlib.domain.wfs.WfSubscription;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
@SpringBootApplication
@EntityScan(basePackageClasses = {

View File

@ -19,9 +19,9 @@ import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
import eu.dnetlib.wfs.repository.WfHistoryJobRepository;
@Service
public class WfHistoryImporter {
@ -29,7 +29,7 @@ public class WfHistoryImporter {
private static final Log log = LogFactory.getLog(WfHistoryImporter.class);
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
private WfHistoryJobRepository historyJobRepository;
public void load(final String path) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
@ -41,7 +41,7 @@ public class WfHistoryImporter {
private void saveWf(final JsonNode node) {
final WfJournalEntry wf = new WfJournalEntry();
final WfHistoryJob wf = new WfHistoryJob();
wf.setProcessId(node.get("system:processId").asText());
wf.setName(node.get("system:wfName").asText());
wf.setFamily(node.get("system:profileFamily").asText());
@ -78,7 +78,7 @@ public class WfHistoryImporter {
wf.setDetails(details);
wfJournalEntryRepository.save(wf);
this.historyJobRepository.save(wf);
log.info("Wf saved with id: " + wf.getProcessId());

View File

@ -3,6 +3,7 @@ package eu.dnetlib.wfs.manager.controller;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -16,17 +17,22 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.common.KeyValue;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfRepoHiDesc;
import eu.dnetlib.domain.wfs.WfRepoHiParams;
import eu.dnetlib.domain.wfs.WfSection;
import eu.dnetlib.domain.wfs.WfSubscription;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WfTemplate.Node;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.wfs.manager.service.WorkflowManagerService;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
@RestController
@RequestMapping("/api")
@ -35,42 +41,45 @@ public class ApiController extends DnetRestController {
@Autowired
private WorkflowManagerService wfManagerService;
@Autowired
private DnetServiceClientFactory clientFactory;
@GetMapping("/history")
public List<WfJournalEntry> history(
@RequestParam(required = true) final int total,
@RequestParam(required = false) final LocalDate from,
@RequestParam(required = false) final LocalDate to) {
return wfManagerService.findHistory(total, from, to);
return this.wfManagerService.findHistory(total, from, to);
}
@GetMapping("/history/byDsId/{dsId}")
public List<WfJournalEntry> historyByDsId(@PathVariable final String dsId) {
return wfManagerService.recentHistoryForDsId(dsId);
return this.wfManagerService.recentHistoryForDsId(dsId);
}
@GetMapping("/history/byApiId/{apiId}")
public List<WfJournalEntry> historyByApiId(@PathVariable final String apiId) {
return wfManagerService.recentHistoryForApiId(apiId);
return this.wfManagerService.recentHistoryForApiId(apiId);
}
@GetMapping("/history/byConf/{wfConfId}")
public List<WfJournalEntry> history(@PathVariable final String wfConfId) {
return wfManagerService.recentHistoryForConfiguration(wfConfId);
return this.wfManagerService.recentHistoryForConfiguration(wfConfId);
}
@GetMapping("/proc/{processId}")
public List<WfJournalEntry> getProcessExecution(@PathVariable final String processId) {
return Arrays.asList(wfManagerService.findProcessLog(processId));
return Arrays.asList(this.wfManagerService.findProcessLog(processId));
}
@GetMapping("/sections")
public Iterable<WfSection> listWfSections() throws Exception {
return wfManagerService.listSections();
return this.wfManagerService.listSections();
}
@GetMapping("/sections/{section}")
public List<KeyValue<String>> listWfConfigurations(@PathVariable final String section) throws Exception {
return wfManagerService.listWfConfigurationsBySection(section)
return this.wfManagerService.listWfConfigurationsBySection(section)
.stream()
.map(x -> new KeyValue<>(x.getId(), x.getName()))
.collect(Collectors.toList());
@ -78,76 +87,87 @@ public class ApiController extends DnetRestController {
@GetMapping("/conf/{id}")
public WfConfiguration getWfConfiguration(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowConfiguration(id);
return this.wfManagerService.findWorkflowConfiguration(id);
}
@PostMapping("/conf")
public WfConfiguration saveWfConfiguration(@RequestBody final WfConfiguration conf) throws Exception {
return wfManagerService.saveWfConfiguration(conf);
return this.wfManagerService.saveWfConfiguration(conf);
}
@DeleteMapping("/conf/{id}")
public void deleteWfConfiguration(@PathVariable final String id) throws Exception {
wfManagerService.deleteWfConfiguration(id);
this.wfManagerService.deleteWfConfiguration(id);
}
@GetMapping("/conf/{id}/start")
public List<WfJournalEntry> startWorkflowConfiguration(@PathVariable final String id) throws Exception {
final WfConfiguration conf = wfManagerService.findWorkflowConfiguration(id);
final WfConfiguration conf = this.wfManagerService.findWorkflowConfiguration(id);
wfManagerService.prepareNewJob(conf, false);
this.wfManagerService.prepareNewJob(conf, false);
if (StringUtils.isNotBlank(conf.getApiId())) { return wfManagerService.recentHistoryForApiId(conf.getApiId()); }
if (StringUtils.isNotBlank(conf.getDsId())) { return wfManagerService.recentHistoryForDsId(conf.getDsId()); }
if (StringUtils.isNotBlank(conf.getId())) { return wfManagerService.recentHistoryForConfiguration(conf.getId()); }
if (StringUtils.isNotBlank(conf.getApiId())) { return this.wfManagerService.recentHistoryForApiId(conf.getApiId()); }
if (StringUtils.isNotBlank(conf.getDsId())) { return this.wfManagerService.recentHistoryForDsId(conf.getDsId()); }
if (StringUtils.isNotBlank(conf.getId())) { return this.wfManagerService.recentHistoryForConfiguration(conf.getId()); }
return wfManagerService.recentHistory();
return this.wfManagerService.recentHistory();
}
@GetMapping("/conf/{id}/destroy")
public List<WfJournalEntry> destroyWorkflowConfiguration(@PathVariable final String id) throws Exception {
final WfConfiguration conf = wfManagerService.findWorkflowConfiguration(id);
final WfConfiguration conf = this.wfManagerService.findWorkflowConfiguration(id);
wfManagerService.prepareNewJob(conf, true);
this.wfManagerService.prepareNewJob(conf, true);
if (StringUtils.isNotBlank(conf.getApiId())) { return wfManagerService.recentHistoryForApiId(conf.getApiId()); }
if (StringUtils.isNotBlank(conf.getDsId())) { return wfManagerService.recentHistoryForDsId(conf.getDsId()); }
if (StringUtils.isNotBlank(conf.getId())) { return wfManagerService.recentHistoryForConfiguration(conf.getId()); }
if (StringUtils.isNotBlank(conf.getApiId())) { return this.wfManagerService.recentHistoryForApiId(conf.getApiId()); }
if (StringUtils.isNotBlank(conf.getDsId())) { return this.wfManagerService.recentHistoryForDsId(conf.getDsId()); }
if (StringUtils.isNotBlank(conf.getId())) { return this.wfManagerService.recentHistoryForConfiguration(conf.getId()); }
return wfManagerService.recentHistory();
return this.wfManagerService.recentHistory();
}
@GetMapping("/conf/{id}/subscriptions")
public List<WfSubscription> listWorkflowSubscriptions(@PathVariable final String id) throws Exception {
return wfManagerService.listSubscriptions(id);
return this.wfManagerService.listSubscriptions(id);
}
@PostMapping("/conf/{id}/subscriptions")
public void saveWorkflowSubscriptions(@PathVariable final String id, @RequestBody final List<WfSubscription> subscriptions) throws Exception {
wfManagerService.saveSubscriptions(id, subscriptions);
this.wfManagerService.saveSubscriptions(id, subscriptions);
}
@GetMapping("/repo-his")
public List<WfRepoHiDesc> listRepoHIs() {
return wfManagerService.listRepoHis();
return this.wfManagerService.listRepoHis();
}
@PostMapping("/repo-hi/{id}/start")
public WfJournalEntry startRepoHi(@PathVariable final String id, @RequestBody final WfRepoHiParams params) {
return wfManagerService
.prepareNewJob(id, null, WorkflowsConstants.REPO_HI_FAMILY, WorkflowsConstants.REPO_HI_FAMILY, params.getDsId(), params.getDsName(), params
.getApiId());
public WfRunningJob startRepoHi(@PathVariable final String id, @RequestBody final WfRepoHiParams params) {
final WfTemplate wfTmpl = this.clientFactory
.getClient(SimpleResourceClient.class)
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, id, WfTemplate.class);
final List<Node> graph = wfTmpl.getGraph();
// Prepare the parameters using their default values
final Map<String, Object> wfParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null);
final String wfName = String.format("Aggregation of '%s' (api: %s)", params.getDsName(), params.getApiId());
return this.wfManagerService
.prepareNewJob(null, wfName, WorkflowsConstants.REPO_HI_FAMILY, params.getDsId(), params.getDsName(), params
.getApiId(), graph, wfParams);
}
@GetMapping("/template/{id}")
public WfTemplate findWfTemplate(@PathVariable final String id) {
return wfManagerService.findWfTemplate(id);
return this.wfManagerService.findWfTemplate(id);
}
@GetMapping("/confs/byApi/{id}")
public List<WfConfiguration> listWfConfigurationsByApi(@PathVariable final String id) {
return wfManagerService.listWfConfigurationsByApiId(id);
return this.wfManagerService.listWfConfigurationsByApiId(id);
}
}

View File

@ -12,9 +12,9 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
@ -39,7 +39,7 @@ public class ScheduledWorkflowLauncher {
public void verifySheduledWorkflows() {
log.debug("Verifying scheduled workflows - START");
wfConfigurationRepository.findAll()
this.wfConfigurationRepository.findAll()
.stream()
.filter(WfConfiguration::isEnabled)
.filter(WfConfiguration::isConfigured)
@ -48,7 +48,7 @@ public class ScheduledWorkflowLauncher {
.filter(this::isReady)
.forEach(conf -> {
try {
workflowManagerService.prepareNewJob(conf, false);
this.workflowManagerService.prepareNewJob(conf, false);
} catch (final Exception e) {
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
}
@ -80,7 +80,7 @@ public class ScheduledWorkflowLauncher {
log.debug("NOW : " + now);
log.debug("LAST EXECUTION DATE : " + lastExecutionDate);
log.debug("MIN INTERVAL (minutes) : " + minInterval);
log.debug("WINDOW SIZE (ms) : " + windowSize);
log.debug("WINDOW SIZE (ms) : " + this.windowSize);
log.debug("MUST BE EXECUTED : " + res);
log.debug("**************************************************************");
}
@ -92,7 +92,7 @@ public class ScheduledWorkflowLauncher {
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return wfJournalEntryRepository.findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(id)
return this.wfJournalEntryRepository.findFirstByWfConfIdAndEndDateNotNullOrderByEndDateDesc(id)
.map(WfJournalEntry::getEndDate)
.orElse(LocalDateTime.MIN);
}
@ -101,7 +101,7 @@ public class ScheduledWorkflowLauncher {
try {
final CronExpression cron = CronExpression.parse(cronExpression);
final LocalDateTime date = now.minus(windowSize, ChronoUnit.MINUTES);
final LocalDateTime date = now.minus(this.windowSize, ChronoUnit.MINUTES);
final LocalDateTime nextDate = cron.next(date);
if (nextDate != null) {
@ -123,9 +123,9 @@ public class ScheduledWorkflowLauncher {
private boolean hasStatus(final WfConfiguration conf, final JobStatus status) {
final String id = conf.getId();
return wfJournalEntryRepository.findByStatus(status)
return this.wfJournalEntryRepository.findByStatus(status)
.stream()
.map(WfJournalEntry::getWfConfigurationId)
.map(WfJournalEntry::getWfConfId)
.filter(StringUtils::isNotBlank)
.filter(s -> s.equals(id))
.map(s -> true)

View File

@ -6,6 +6,7 @@ import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
@ -17,18 +18,21 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfRepoHiDesc;
import eu.dnetlib.domain.wfs.WfSection;
import eu.dnetlib.domain.wfs.WfSubscription;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WfTemplate.Node;
import eu.dnetlib.domain.wfs.WfTemplate.WfParam;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
import eu.dnetlib.wfs.repository.WfSectionRepository;
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
@ -42,6 +46,9 @@ public class WorkflowManagerService {
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
@Autowired
private WfRunningJobRepository jobRepository;
@Autowired
private WfSectionRepository wfSectionRepository;
@Autowired
@ -55,22 +62,22 @@ public class WorkflowManagerService {
@Transactional
public WfConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
return wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
return this.wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
}
@Transactional
public Iterable<WfSection> listSections() {
return wfSectionRepository.findAll();
return this.wfSectionRepository.findAll();
}
@Transactional
public List<WfConfiguration> listWfConfigurationsBySection(final String section) {
return wfConfigurationRepository.findBySection(section);
return this.wfConfigurationRepository.findBySection(section);
}
@Transactional
public List<WfConfiguration> listWfConfigurationsByApiId(final String apiId) {
return wfConfigurationRepository.findByApiId(apiId);
return this.wfConfigurationRepository.findByApiId(apiId);
}
@Transactional
@ -83,38 +90,38 @@ public class WorkflowManagerService {
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
wfConfigurationRepository.save(conf);
this.wfConfigurationRepository.save(conf);
return conf;
}
public WfTemplate findWfTemplate(final String name) {
return clientFactory.getClient(SimpleResourceClient.class).findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
return this.clientFactory.getClient(SimpleResourceClient.class).findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
}
public String findWfTemplateId(final String name) {
return clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
return this.clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
}
@Transactional
public void deleteWfConfiguration(final String id) {
wfSubscriptionRepository.deleteByWfConfigurationId(id);
wfConfigurationRepository.deleteById(id);
this.wfSubscriptionRepository.deleteByWfConfigurationId(id);
this.wfConfigurationRepository.deleteById(id);
}
@Transactional
public List<WfSubscription> listSubscriptions(final String id) {
return wfSubscriptionRepository.findByWfConfigurationId(id);
return this.wfSubscriptionRepository.findByWfConfigurationId(id);
}
@Transactional
public void saveSubscriptions(final String wfConfId, final List<WfSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
wfSubscriptionRepository.saveAll(subscriptions);
this.wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
this.wfSubscriptionRepository.saveAll(subscriptions);
}
public WfJournalEntry prepareNewJob(final WfConfiguration conf, final boolean destroy) {
public WfRunningJob prepareNewJob(final WfConfiguration conf, final boolean destroy) {
final String wfTemplateId = findWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
final String wfConfId = conf.getId();
@ -142,26 +149,30 @@ public class WorkflowManagerService {
final String dsName = conf.getDsName();
final String apiId = conf.getApiId();
return prepareNewJob(wfTemplateId, wfConfId, name, family, dsId, dsName, apiId);
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final WfTemplate wfTmpl = client.findResourceContent(wfTemplateId, WfTemplate.class);
final Map<String, Object> params = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
return prepareNewJob(wfConfId, name, family, dsId, dsName, apiId, wfTmpl.getGraph(), params);
}
@Transactional
public WfJournalEntry prepareNewJob(final String wfTemplateId,
final String wfConfId,
public WfRunningJob prepareNewJob(final String wfConfId,
final String name,
final String family,
final String dsId,
final String dsName,
final String apiId) {
final WfJournalEntry job = new WfJournalEntry();
final String apiId,
final List<Node> graph,
final Map<String, Object> params) {
final WfRunningJob job = new WfRunningJob();
job.setProcessId(WfProcessUtils.generateProcessId());
job.setWfConfigurationId(wfConfId);
job.setName(name);
job.setFamily(family);
job.setWfTemplateId(wfTemplateId);
job.setWfConfId(wfConfId);
job.setDsId(dsId);
job.setDsName(dsName);
@ -175,46 +186,51 @@ public class WorkflowManagerService {
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
final WfJournalEntry saved = wfJournalEntryRepository.save(job);
job.setGraph(graph);
job.setParams(params);
jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
final WfRunningJob saved = this.jobRepository.save(job);
this.jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
return saved;
}
public List<WfJournalEntry> recentHistory() {
return wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
return this.wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForDsId(final String dsId) {
return wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
return this.wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> recentHistoryForApiId(final String apiId) {
return wfJournalEntryRepository.findByApiIdOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
return this.wfJournalEntryRepository.findByApiIdOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfJournalEntry> findHistory(final int total, final LocalDate from, final LocalDate to) {
final int max = ((total > 0) || (total < MAX_HISTORY_SIZE)) ? total : MAX_HISTORY_SIZE;
if ((from == null) && (to == null)) { return wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList(); }
if ((from == null) && (to == null)) {
return this.wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList();
}
final LocalDateTime fromTime = from != null ? from.atTime(0, 0, 0) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? to.atTime(23, 59, 59) : LocalDateTime.MAX;
return wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
return this.wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
}
public List<WfJournalEntry> recentHistoryForConfiguration(final String wfConfId) {
return wfJournalEntryRepository.findByWfConfigurationIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
return this.wfJournalEntryRepository.findByWfConfIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public WfJournalEntry findProcessLog(final String processId) {
return wfJournalEntryRepository.findById(processId).get();
return this.wfJournalEntryRepository.findById(processId).get();
}
public List<WfRepoHiDesc> listRepoHis() {
final SimpleResourceClient client = clientFactory.getClient(SimpleResourceClient.class);
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
return client.findResources(WorkflowsConstants.WF_TEMPLATE, WorkflowsConstants.REPO_HI_FAMILY)
.stream()
@ -222,7 +238,7 @@ public class WorkflowManagerService {
final WfTemplate tmpl = client.findResourceContent(r.getId(), WfTemplate.class);
final WfRepoHiDesc repohi = new WfRepoHiDesc();
repohi.setId(r.getId());
repohi.setId(r.getName());
repohi.setName(r.getName());
repohi.setDescription(r.getDescription());
repohi.setGraph(tmpl.getGraph());

View File

@ -15,8 +15,8 @@ import eu.dnetlib.domain.dsm.info.AggregationStage;
import eu.dnetlib.domain.dsm.info.CollectionInfo;
import eu.dnetlib.domain.dsm.info.CollectionMode;
import eu.dnetlib.domain.dsm.info.TransformationInfo;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.errors.DsmException;
public class WfManagerClient extends DnetServiceClient {

View File

@ -2,14 +2,13 @@ package eu.dnetlib.domain.wfs;
public class WorkflowsConstants {
// TODO (LOW PRIORITY): remove unused constants
public static final String DATASOURCE_PREFIX = "datasource:";
public static final String LOG_WF_NAME = "system:wfName";
public static final String LOG_WF_ID = "system:wfId";
public static final String LOG_WF_CONF_ID = "system:wfConfigurationId";
public static final String LOG_WF_FAMILY = "system:family";
public static final String LOG_WF_PRIORITY = "system:priority";
public static final String LOG_WF_PROCESS_ID = "system:processId";
public static final String LOG_WF_PROCESS_STATUS = "system:processStatus";
public static final String LOG_WF_PROCESS_START_DATE = "system:startDate";

View File

@ -1,40 +1,28 @@
package eu.dnetlib.domain.wfs;
package eu.dnetlib.domain.wfs.jobs;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import org.hibernate.annotations.Type;
import io.hypersistence.utils.hibernate.type.json.JsonType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Id;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import jakarta.persistence.Table;
import jakarta.persistence.MappedSuperclass;
@Entity
@Table(name = "wf_journal")
public class WfJournalEntry implements Serializable {
@MappedSuperclass
public abstract class AbstractJob implements Serializable {
private static final long serialVersionUID = -326994850248506828L;
private static final long serialVersionUID = -5402950017251447505L;
@Id
@Column(name = "process_id")
private String processId;
@Column(name = "wf_template_id")
private String wfTemplateId;
@Column(name = "wf_conf_id")
private String wfConfigurationId;
@Column(name = "wf_executor")
private String wfExecutor;
@Column(name = "name")
private String name;
@ -51,8 +39,8 @@ public class WfJournalEntry implements Serializable {
@Column(name = "end_date")
private LocalDateTime endDate;
@Column(name = "last_update")
private LocalDateTime lastUpdate;
@Column(name = "wf_conf_id")
private String wfConfId;
@Column(name = "ds_id")
private String dsId;
@ -65,52 +53,18 @@ public class WfJournalEntry implements Serializable {
@Type(JsonType.class)
@Column(name = "details")
private Map<String, String> details;
@PrePersist
protected void onCreate() {
lastUpdate = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
lastUpdate = LocalDateTime.now();
}
private Map<String, String> details = new HashMap<>();
public String getProcessId() {
return processId;
return this.processId;
}
public void setProcessId(final String processId) {
this.processId = processId;
}
public String getWfTemplateId() {
return wfTemplateId;
}
public void setWfTemplateId(final String wfTemplateId) {
this.wfTemplateId = wfTemplateId;
}
public String getWfConfigurationId() {
return wfConfigurationId;
}
public void setWfConfigurationId(final String wfConfigurationId) {
this.wfConfigurationId = wfConfigurationId;
}
public String getWfExecutor() {
return wfExecutor;
}
public void setWfExecutor(final String wfExecutor) {
this.wfExecutor = wfExecutor;
}
public String getName() {
return name;
return this.name;
}
public void setName(final String name) {
@ -118,7 +72,7 @@ public class WfJournalEntry implements Serializable {
}
public String getFamily() {
return family;
return this.family;
}
public void setFamily(final String family) {
@ -126,7 +80,7 @@ public class WfJournalEntry implements Serializable {
}
public JobStatus getStatus() {
return status;
return this.status;
}
public void setStatus(final JobStatus status) {
@ -134,7 +88,7 @@ public class WfJournalEntry implements Serializable {
}
public LocalDateTime getStartDate() {
return startDate;
return this.startDate;
}
public void setStartDate(final LocalDateTime startDate) {
@ -142,23 +96,23 @@ public class WfJournalEntry implements Serializable {
}
public LocalDateTime getEndDate() {
return endDate;
return this.endDate;
}
public void setEndDate(final LocalDateTime endDate) {
this.endDate = endDate;
}
public LocalDateTime getLastUpdate() {
return lastUpdate;
public String getWfConfId() {
return this.wfConfId;
}
public void setLastUpdate(final LocalDateTime lastUpdate) {
this.lastUpdate = lastUpdate;
public void setWfConfId(final String wfConfId) {
this.wfConfId = wfConfId;
}
public String getDsId() {
return dsId;
return this.dsId;
}
public void setDsId(final String dsId) {
@ -166,7 +120,7 @@ public class WfJournalEntry implements Serializable {
}
public String getDsName() {
return dsName;
return this.dsName;
}
public void setDsName(final String dsName) {
@ -174,7 +128,7 @@ public class WfJournalEntry implements Serializable {
}
public String getApiId() {
return apiId;
return this.apiId;
}
public void setApiId(final String apiId) {
@ -182,7 +136,7 @@ public class WfJournalEntry implements Serializable {
}
public Map<String, String> getDetails() {
return details;
return this.details;
}
public void setDetails(final Map<String, String> details) {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.domain.wfs;
package eu.dnetlib.domain.wfs.jobs;
public enum JobStatus {
created,

View File

@ -0,0 +1,12 @@
package eu.dnetlib.domain.wfs.jobs;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
@Entity
@Table(name = "wf_history")
public class WfHistoryJob extends AbstractJob {
private static final long serialVersionUID = -9122461293116018290L;
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.domain.wfs.jobs;
import java.time.LocalDateTime;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
@Entity
@Table(name = "wf_journal_view")
public class WfJournalEntry extends AbstractJob {
private static final long serialVersionUID = -326994850248506828L;
@Column(name = "last_update")
private LocalDateTime lastUpdate;
public LocalDateTime getLastUpdate() {
return this.lastUpdate;
}
public void setLastUpdate(final LocalDateTime lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

View File

@ -0,0 +1,80 @@
package eu.dnetlib.domain.wfs.jobs;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.hibernate.annotations.Type;
import eu.dnetlib.domain.wfs.WfTemplate;
import io.hypersistence.utils.hibernate.type.json.JsonType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
@Entity
@Table(name = "wf_runtime")
public class WfRunningJob extends AbstractJob {
private static final long serialVersionUID = -3163064347762609210L;
@Column(name = "wf_executor")
private String wfExecutor;
@Column(name = "priority")
private int priority;
@Column(name = "last_update")
private LocalDateTime lastUpdate;
@Type(JsonType.class)
@Column(name = "graph")
private List<WfTemplate.Node> graph = new ArrayList<>();
@Type(JsonType.class)
@Column(name = "params")
private Map<String, Object> params = new HashMap<>();
public String getWfExecutor() {
return this.wfExecutor;
}
public void setWfExecutor(final String wfExecutor) {
this.wfExecutor = wfExecutor;
}
public int getPriority() {
return this.priority;
}
public void setPriority(final int priority) {
this.priority = priority;
}
public LocalDateTime getLastUpdate() {
return this.lastUpdate;
}
public void setLastUpdate(final LocalDateTime lastUpdate) {
this.lastUpdate = lastUpdate;
}
public List<WfTemplate.Node> getGraph() {
return this.graph;
}
public void setGraph(final List<WfTemplate.Node> graph) {
this.graph = graph;
}
public Map<String, Object> getParams() {
return this.params;
}
public void setParams(final Map<String, Object> params) {
this.params = params;
}
}

View File

@ -0,0 +1,9 @@
package eu.dnetlib.wfs.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
public interface WfHistoryJobRepository extends JpaRepository<WfHistoryJob, String> {
}

View File

@ -6,22 +6,19 @@ import java.util.Optional;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.utils.ReadOnlyRepository;
public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry, String> {
public interface WfJournalEntryRepository extends ReadOnlyRepository<WfJournalEntry, String> {
// IMPORTANT: Postgres, by default, should return nulls first when you specify 'ORDER BY DESC'.
// Otherwise it will be necessary to implement the native queries specifying 'ORDER BY end_date DESC NULLS FIRST';
Page<WfJournalEntry> findByLastUpdateBetweenOrderByLastUpdateDesc(LocalDateTime start, LocalDateTime end, Pageable pageable);
Page<WfJournalEntry> findByWfConfigurationIdOrderByLastUpdateDesc(String id, Pageable pageable);
Page<WfJournalEntry> findByWfConfIdOrderByLastUpdateDesc(String id, Pageable pageable);
Page<WfJournalEntry> findAllByOrderByLastUpdateDesc(Pageable pageable);
@ -31,11 +28,6 @@ public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry,
List<WfJournalEntry> findByStatus(JobStatus status);
Optional<WfJournalEntry> findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(String id);
@Modifying
@Transactional
@Query(value = "update WfJournalEntry set wfExecutor = ?2, status = ?3, startDate = ?4, lastUpdate = ?4 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);
Optional<WfJournalEntry> findFirstByWfConfIdAndEndDateNotNullOrderByEndDateDesc(String id);
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.wfs.repository;
import java.time.LocalDateTime;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
public interface WfRunningJobRepository extends JpaRepository<WfRunningJob, String> {
@Modifying
@Transactional
@Query(value = "update WfRunningJob set wfExecutor = ?2, status = ?3, startDate = ?4, lastUpdate = ?4 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.wfs.controller;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
@ -8,10 +9,10 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.wfs.graph.GraphLoader;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.ProcessRegistry;
import eu.dnetlib.wfs.procs.WorkflowProcess;
public abstract class AbstractWfExecutorApiController extends DnetRestController {
@ -25,17 +26,22 @@ public abstract class AbstractWfExecutorApiController extends DnetRestController
private GraphLoader graphLoader;
@GetMapping("/process/{id}")
public WorkflowProcess findProcess(@PathVariable final String id) throws Exception {
return registry.findProcess(id);
public WfRunningJob findProcess(@PathVariable final String id) throws Exception {
return this.registry.findProcess(id).getJobDetails();
}
@GetMapping("/procs")
public Set<String> findProcess() throws Exception {
return this.registry.listProcIds();
}
@DeleteMapping("/process/{id}")
public void killProcess(@PathVariable final String id) throws Exception {
engine.killProcess(id);
this.engine.killProcess(id);
}
@GetMapping("/node-types")
public List<String> listAvailableNodes() {
return graphLoader.getValidTypes().stream().sorted().toList();
return this.graphLoader.getValidTypes().stream().sorted().toList();
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.wfs.graph;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@ -44,34 +45,34 @@ public class GraphLoader {
private void init() {
log.info("************************************");
log.info("* Checking workflow nodes:");
applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> {
this.applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> {
if (ProcessNode.class.isAssignableFrom(bean.getClass())) {
log.info("* Type: " + key + " -> " + bean.getClass());
validTypes.add(key);
this.validTypes.add(key);
} else {
log.warn("* Type: " + key + " -> " + bean.getClass() + "(ERROR: it is not a ProcessNode)");
}
});
if (validTypes.isEmpty()) {
if (this.validTypes.isEmpty()) {
log.warn("* 0 nodes available");
}
log.info("************************************");
}
public Graph loadGraph(final WfTemplate wfTemplate, final Map<String, Object> globalParams)
public Graph loadGraph(final List<WfTemplate.Node> nodes, final Map<String, Object> globalParams)
throws WorkflowManagerException {
final Graph graph = new Graph();
graph.addNode(GraphNode.newSuccessNode());
for (final WfTemplate.Node node : wfTemplate.getGraph()) {
for (final WfTemplate.Node node : nodes) {
final String nodeName = node.getName();
final String nodeType = node.getType();
final boolean isStart = node.isStart();
final boolean isJoin = node.isJoin();
final Map<String, Object> params = node.calculateInitialParams(globalParams, environment);
final Map<String, Object> params = node.calculateInitialParams(globalParams, this.environment);
final Map<String, Object> envParams = node.findEnvParams();
final Map<String, String> outputEnvMap = node.outputToEnvMap();
@ -141,13 +142,13 @@ public class GraphLoader {
if (n.isStart()) {
foundStart = true;
}
if ((n.getType() != null) && !validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
if ((n.getType() != null) && !this.validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
}
if (!foundStart) { throw new WorkflowManagerException("Start node not found"); }
}
public Set<String> getValidTypes() {
return validTypes;
return this.validTypes;
}
}

View File

@ -16,11 +16,12 @@ public class DeleteWfConfigurationNode extends AbstractJobNode {
@Override
protected void execute() throws Exception {
final String wfConfId = getProcess().getWfConfId();
final String wfConfId = getProcess().getJobDetails().getWfConfId();
if (StringUtils.isBlank(wfConfId)) { throw new WorkflowManagerException("wfConfId is null"); }
wfConfigurationRepository.deleteById(wfConfId);
this.wfConfigurationRepository.deleteById(wfConfId);
}
}

View File

@ -58,22 +58,22 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
@Override
protected void execute() throws Exception {
wfConfId = "wf-aggr-" + UUID.randomUUID();
this.wfConfId = "wf-aggr-" + UUID.randomUUID();
final WfConfiguration conf = new WfConfiguration();
conf.setId(wfConfId);
conf.setId(this.wfConfId);
conf.setSection(WorkflowsConstants.AGGREGATION_WF_CONFS_SECTION);
conf.setParentId(null);
conf.setName(String.format("Aggregation of '%s' (api: %s) ", ds.getOfficialname(), api.getId()));
conf.setName(String.format("Aggregation of '%s' (api: %s)", this.ds.getOfficialname(), this.api.getId()));
conf.setDetails(new HashMap<>());
conf.setDsId(ds.getId());
conf.setDsName(ds.getOfficialname());
conf.setApiId(api.getId());
conf.setDsId(this.ds.getId());
conf.setDsName(this.ds.getOfficialname());
conf.setApiId(this.api.getId());
conf.setWorkflow(wfId);
conf.setDestroyWf(destroyWfId);
conf.setWorkflow(this.wfId);
conf.setDestroyWf(this.destroyWfId);
conf.setEnabled(true);
conf.setPriority(DEFAULT_AGGR_PRIORITY);
@ -83,19 +83,19 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
conf.setSystemParams(prepareSystemParams());
final List<WfParam> wfParams = clientFactory.getClient(SimpleResourceClient.class)
final List<WfParam> wfParams = this.clientFactory.getClient(SimpleResourceClient.class)
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, conf.getWorkflow(), WfTemplate.class)
.getParameters();
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
wfConfigurationRepository.save(conf);
this.wfConfigurationRepository.save(conf);
}
private Map<String, Object> prepareSystemParams() {
final Map<String, Object> map = new LinkedHashMap<>();
map.put("nativeMdStoreId", nativeMdStoreId);
map.put("cleanedMdStoreId", cleanedMdStoreId);
map.put("nativeMdStoreId", this.nativeMdStoreId);
map.put("cleanedMdStoreId", this.cleanedMdStoreId);
return map;
}

View File

@ -4,6 +4,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.DsmClient;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
@ -14,29 +15,30 @@ public class ClearApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
@WfInputParam
private Api api;
@Autowired
private DnetServiceClientFactory clientFactory;
@Override
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final DsmClient dsm = this.clientFactory.getClient(DsmClient.class);
final String apiId = getProcess().getApiId();
switch (infoType.toUpperCase()) {
switch (this.infoType.toUpperCase()) {
case "COLLECT":
dsm.updateApiCollectionInfo(apiId, null, 0);
dsm.updateApiCollectionInfo(this.api.getId(), null, 0);
break;
case "AGGREGATOR":
case "TRANSFORM":
dsm.updateApiAggregationInfo(apiId, null, 0);
dsm.updateApiAggregationInfo(this.api.getId(), null, 0);
break;
case "DOWNLOAD":
dsm.updateApiDownloadInfo(apiId, null, 0);
dsm.updateApiDownloadInfo(this.api.getId(), null, 0);
break;
default:
throw new RuntimeException("Invalid infoType: " + infoType);
throw new RuntimeException("Invalid infoType: " + this.infoType);
}
}

View File

@ -24,14 +24,13 @@ public class LoadDatasourceInfoNode extends AbstractJobNode {
@Override
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final String dsId = getProcess().getJobDetails().getDsId();
final String apiId = getProcess().getJobDetails().getApiId();
final String dsId = getProcess().getDsId();
final String apiId = getProcess().getApiId();
ds = dsm.findDs(dsId);
api = dsm.findApi(apiId);
final DsmClient dsm = this.clientFactory.getClient(DsmClient.class);
this.ds = dsm.findDs(dsId);
this.api = dsm.findApi(apiId);
}
}

View File

@ -1,21 +1,23 @@
package eu.dnetlib.wfs.nodes.launch;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.utils.NodeCallback;
@ -30,54 +32,41 @@ public class LaunchWorkflowJobNode extends ProcessNode {
@WfInputParam
private String wfId;
@WfInputParam(optional = true)
private Datasource ds;
@WfInputParam(optional = true)
private Api api;
@Autowired
private DnetServiceClientFactory clientFactory;
@Override
public final void execute(final Token token, final NodeCallback nodeCallback) {
try {
final ProcessEngine engine = getEngine();
final String procId = WfProcessUtils.generateProcessId();
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final WfTemplate tmpl = client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, this.wfId, WfTemplate.class);
final WorkflowProcess process = getProcess();
final WfConfiguration conf = new WfConfiguration();
conf.setId("CHILD_" + UUID.randomUUID());
conf.setParentId(process.getWfConfId());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(process.getDsId());
conf.setDsName(process.getDsName());
conf.setApiId(process.getApiId());
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(this.wfId);
conf.setDestroyWf(null);
conf.setSystemParams(process.getGlobalParams());
conf.setUserParams(new HashMap<>());
final WfRunningJob job = new WfRunningJob();
final WfJournalEntry job = new WfJournalEntry();
job.setProcessId(procId);
job.setWfConfigurationId(null);
job.setName(process.getName() + ":" + getNodeName());
job.setFamily(process.getFamily());
job.setWfTemplateId(this.wfId);
job.setDsId(process.getDsId());
job.setDsName(process.getDsName());
job.setApiId(process.getApiId());
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setProcessId(WfProcessUtils.generateProcessId());
job.setPriority(process.getJobDetails().getPriority());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
job.setLastUpdate(LocalDateTime.now());
job.setEndDate(null);
job.setGraph(tmpl.getGraph());
// TODO (MEDIUM PRIORITY): verify if all necessary parameters are correctly passed
job.setParams(process.getJobDetails().getParams());
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
job.setStatus(JobStatus.accepted);
engine.startWorkflowJob(job, conf, new ProcessCallback() {
// TODO (MEDIUM PRIORITY): verify if the job should be saved in the DB
getEngine().startWorkflowJob(job, new ProcessCallback() {
@Override
public void onSuccess(final WorkflowProcess t) {
@ -94,10 +83,10 @@ public class LaunchWorkflowJobNode extends ProcessNode {
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [conf: " + conf.getId() + "] is starting with procId: " + procId);
log.debug("The child workflow [conf: " + process.getJobDetails().getWfConfId() + "] is starting with procId: " + job.getProcessId());
}
token.setProgressMessage("Launched sub workflow, proc: " + procId);
token.setProgressMessage("Launched sub workflow, proc: " + job.getProcessId());
} catch (final Throwable e) {
log.error("got exception while launching child workflow", e);

View File

@ -24,25 +24,20 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfConfiguration;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WfTemplate;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.graph.GraphLoader;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
import eu.dnetlib.wfs.repository.WfHistoryJobRepository;
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
import jakarta.annotation.PostConstruct;
import jakarta.transaction.Transactional;
@ -54,24 +49,21 @@ public class ProcessEngine {
@Autowired
private EmailSender emailSender;
@Autowired
private WfJournalEntryRepository wfJournalEntryRepository;
@Autowired
private WfConfigurationRepository wfConfigurationRepository;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private WfRunningJobRepository jobRepository;
@Autowired
private WfHistoryJobRepository historyJobRepository;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ProcessRegistry processRegistry;
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private GraphLoader graphLoader;
@ -140,68 +132,23 @@ public class ProcessEngine {
private void tryToStartWf(final String processId, final String executor) {
log.debug("Trying to start " + processId);
this.wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
this.jobRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
final Optional<WfJournalEntry> job = this.wfJournalEntryRepository.findById(processId);
final Optional<WfRunningJob> job = this.jobRepository.findById(processId);
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
startWorkflowJob(job.get(), null, null);
startWorkflowJob(job.get(), null);
}
}
@Transactional
public void startWorkflowJob(final WfJournalEntry job, final WfConfiguration passedConf, final ProcessCallback callback) {
public void startWorkflowJob(final WfRunningJob job, final ProcessCallback callback) {
final WorkflowProcess process = new WorkflowProcess(job);
try {
log.debug("Configure process " + job.getProcessId() + " using wf template " + job.getWfTemplateId());
final SimpleResourceClient simpleResourceClient = this.clientFactory.getClient(SimpleResourceClient.class);
final SimpleResource wfMetadata = simpleResourceClient.findResource(job.getWfTemplateId());
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + job.getWfTemplateId()); }
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(job.getWfTemplateId(), WfTemplate.class);
final Map<String, Object> globalParams = new HashMap<>();
final WorkflowProcess process = new WorkflowProcess(job.getProcessId());
if ((passedConf == null) && StringUtils.isBlank(job.getWfConfigurationId())) {
process.setName(job.getFamily());
process.setWfConfId(null);
process.setParentId(null);
process.setPriority(100);
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null));
} else {
final WfConfiguration conf = passedConf != null ? passedConf
: this.wfConfigurationRepository.findById(job.getWfConfigurationId())
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
process.setName(conf.getName());
process.setWfConfId(conf.getId());
process.setParentId(conf.getParentId());
process.setPriority(conf.getPriority());
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf));
}
process.setWfId(job.getWfTemplateId());
process.setFamily(wfMetadata.getSubtype());
process.setDsId(job.getDsId());
process.setDsName(job.getDsName());
process.setApiId(job.getApiId());
process.getGlobalParams().putAll(globalParams);
process.setGraph(this.graphLoader.loadGraph(wfTmpl, globalParams));
process.setGraph(this.graphLoader.loadGraph(job.getGraph(), job.getParams()));
process.setCallback(callback);
this.processRegistry.registerProcess(process);
@ -209,20 +156,13 @@ public class ProcessEngine {
job.setStatus(JobStatus.running);
job.setLastUpdate(LocalDateTime.now());
this.wfJournalEntryRepository.save(job);
this.jobRepository.save(job);
startProcess(process);
} catch (final Throwable e) {
final Map<String, String> details = new LinkedHashMap<>();
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, e.getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(e));
job.setDetails(details);
job.setStatus(JobStatus.failure);
job.setEndDate(LocalDateTime.now());
job.setLastUpdate(LocalDateTime.now());
this.wfJournalEntryRepository.save(job);
process.setError(e);
updateRunningJob(process, null);
}
}
@ -236,23 +176,25 @@ public class ProcessEngine {
log.debug(process.getGraph());
final LocalDateTime now = LocalDateTime.now();
process.setStatus(JobStatus.running);
process.setStartDate(now);
process.setLastActivityDate(now);
process.getJobDetails().setStatus(JobStatus.running);
process.getJobDetails().setStartDate(now);
updateRunningJob(process, null);
try {
for (final GraphNode graphNode : process.getGraph().startNodes()) {
final Token token = process.newToken(process.getGlobalParams());
final Token token = process.newToken(process.getJobDetails().getParams());
executeNode(process, graphNode, token);
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(JobStatus.failure);
process.setError(e);
updateRunningJob(process, null);
}
}
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
process.setLastActivityDate(LocalDateTime.now());
updateRunningJob(process, oldToken);
try {
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken)) {
@ -279,20 +221,15 @@ public class ProcessEngine {
executeNode(process, graphNode, token);
}
}
} catch (
final Throwable e) {
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setStatus(JobStatus.failure);
process.setError(e.getMessage());
process.setErrorStacktrace(ExceptionUtils.getStackTrace(e));
process.setLastActivityDate(LocalDateTime.now());
process.setError(e);
updateRunningJob(process, oldToken);
}
}
private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException {
process.setLastActivityDate(LocalDateTime.now());
final Map<String, Object> params = new HashMap<>();
if (graphNode.getParams() != null) {
@ -343,12 +280,72 @@ public class ProcessEngine {
process.complete(token);
updateRunningJob(process, token);
saveHistoryJob(process);
this.processRegistry.unregisterProcess(process.getId());
final String wfConfId = process.getJobDetails().getWfConfId();
if (StringUtils.isNotBlank(wfConfId)) {
this.emailSender.sendMails(wfConfId, process);
}
}
private void saveHistoryJob(final WorkflowProcess process) {
log.debug("Process completed " + process.getId());
final WfJournalEntry job = process.asLog();
this.wfJournalEntryRepository.save(job);
final WfHistoryJob historyJob = new WfHistoryJob();
this.emailSender.sendMails(process);
historyJob.setProcessId(process.getId());
historyJob.setName(process.getJobDetails().getName());
historyJob.setFamily(process.getJobDetails().getFamily());
historyJob.setWfConfId(process.getJobDetails().getWfConfId());
historyJob.setDsId(process.getJobDetails().getDsId());
historyJob.setDsName(process.getJobDetails().getDsName());
historyJob.setApiId(process.getJobDetails().getApiId());
historyJob.setStartDate(process.getJobDetails().getStartDate());
historyJob.setEndDate(process.getJobDetails().getEndDate());
final Map<String, String> details = new LinkedHashMap<>();
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
historyJob.setStatus(JobStatus.failure);
} else {
historyJob.setStatus(JobStatus.success);
}
details.putAll(process.getOutputParams());
historyJob.setDetails(details);
this.historyJobRepository.save(historyJob);
}
private void updateRunningJob(final WorkflowProcess process, final Token token) {
log.debug("UPDATING JOB USING TOKEN " + token);
final LocalDateTime now = LocalDateTime.now();
process.getJobDetails().setLastUpdate(LocalDateTime.now());
if (process.isTerminated()) {
final Map<String, String> details = new LinkedHashMap<>();
details.putAll(process.getOutputParams());
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
process.getJobDetails().setStatus(JobStatus.failure);
} else {
process.getJobDetails().setStatus(JobStatus.success);
}
process.getJobDetails().setDetails(details);
process.getJobDetails().setEndDate(now);
} else {
process.getJobDetails().setStatus(JobStatus.running);
}
this.jobRepository.save(process.getJobDetails());
}
}

View File

@ -3,12 +3,10 @@ package eu.dnetlib.wfs.procs;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import eu.dnetlib.errors.WorkflowManagerException;
@ -18,7 +16,6 @@ public class ProcessRegistry {
private static final Log log = LogFactory.getLog(ProcessRegistry.class);
private final Map<String, WorkflowProcess> procs = new HashMap<>();
private final Map<String, WorkflowProcess> byConfId = new HashMap<>();
synchronized public int countRunningWfs() {
int count = 0;
@ -31,6 +28,10 @@ public class ProcessRegistry {
return count;
}
public Set<String> listProcIds() {
return this.procs.keySet();
}
public WorkflowProcess findProcess(final String procId) {
return this.procs.get(procId);
}
@ -39,43 +40,16 @@ public class ProcessRegistry {
return this.procs.values();
}
public WorkflowProcess findProcsByConfigurationId(final String id) {
return this.byConfId.get(id);
}
public void registerProcess(final WorkflowProcess process) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registered process: " + process);
}
this.procs.put(process.getId(), process);
if (process.getWfConfId() != null) {
this.byConfId.put(process.getWfConfId(), process);
}
}
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
public void removeTerminatedProcess() {
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
final WorkflowProcess proc = e.getValue();
if (proc.isTerminated()) {
unregisterProcess(proc.getId());
}
}
}
public void unregisterProcess(final String procId) {
synchronized (this) {
final WorkflowProcess process = this.procs.remove(procId);
if (process != null) {
this.byConfId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(Entry::getKey)
.forEach(confId -> this.byConfId.remove(confId, process));
}
}
this.procs.remove(procId);
}
}

View File

@ -2,17 +2,14 @@ package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfJournalEntry;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.wfs.graph.Graph;
import eu.dnetlib.wfs.utils.ProcessCallback;
@ -28,93 +25,35 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
private final String id;
private String family;
private String wfId;
private String name;
private String wfConfId;
private String parentId;
private int priority = 100;
private String dsId;
private String dsName;
private String apiId;
private final WfRunningJob jobDetails;
private Graph graph;
private ProcessCallback callback;
private List<Token> tokens = new CopyOnWriteArrayList<>();
private LocalDateTime lastActivityDate;
private JobStatus status;
private LocalDateTime startDate = LocalDateTime.MIN;
private LocalDateTime endDate = LocalDateTime.MIN;
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
private final Map<String, Object> globalParams = new HashMap<>();
private final Map<String, String> outputParams = new HashMap<>();
private String error;
private String errorStacktrace;
private Throwable error;
public WorkflowProcess(final String id) {
this.id = id;
this.status = JobStatus.created;
this.lastActivityDate = LocalDateTime.now();
public WorkflowProcess(final WfRunningJob jobDetails) {
this.id = jobDetails.getProcessId();
this.jobDetails = jobDetails;
}
public String getId() {
return this.id;
}
public String getName() {
return this.name;
}
public String getFamily() {
return this.family;
}
public String getWfId() {
return this.wfId;
}
public String getWfConfId() {
return this.wfConfId;
}
public String getParentId() {
return this.parentId;
}
public int getPriority() {
return this.priority;
}
public String getDsId() {
return this.dsId;
}
public String getDsName() {
return this.dsName;
}
public String getApiId() {
return this.apiId;
public WfRunningJob getJobDetails() {
return this.jobDetails;
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
return this.pausedJoinNodeTokens;
}
public JobStatus getStatus() {
return this.status;
}
public void setStatus(final JobStatus status) {
this.status = status;
}
public Graph getGraph() {
return this.graph;
}
@ -124,70 +63,37 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public void kill() {
setStatus(JobStatus.killed);
// TODO (MEDIUM PRIORITY)
this.jobDetails.setStatus(JobStatus.killed);
}
public boolean isTerminated() {
return switch (this.status) {
return (this.error != null) || switch (this.jobDetails.getStatus()) {
case success, failure, killed -> true;
default -> false;
};
}
public LocalDateTime getLastActivityDate() {
return this.lastActivityDate;
}
public void setLastActivityDate(final LocalDateTime lastActivityDate) {
this.lastActivityDate = lastActivityDate;
}
@Override
public String toString() {
return String.format("[process id='%s' name='%s']", this.id, this.name);
return String.format("[process id='%s']", this.id);
}
@Override
public int compareTo(final WorkflowProcess wp) {
return NumberUtils.compare(getPriority(), wp.getPriority());
public int compareTo(final WorkflowProcess other) {
final int n1 = this.jobDetails.getPriority();
final int n2 = other.jobDetails.getPriority();
return NumberUtils.compare(n1, n2);
}
public Map<String, Object> getGlobalParams() {
return this.globalParams;
}
public void setStartDate(final LocalDateTime startDate) {
this.startDate = startDate;
}
public void setEndDate(final LocalDateTime endDate) {
this.endDate = endDate;
}
public LocalDateTime getStartDate() {
return this.startDate;
}
public LocalDateTime getEndDate() {
return this.endDate;
}
public String getError() {
public Throwable getError() {
return this.error;
}
public void setError(final String error) {
public void setError(final Throwable error) {
this.error = error;
}
public String getErrorStacktrace() {
return this.errorStacktrace;
}
public void setErrorStacktrace(final String errorStacktrace) {
this.errorStacktrace = errorStacktrace;
}
public Map<String, String> getOutputParams() {
return this.outputParams;
}
@ -201,15 +107,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
public void complete(final Token token) {
final LocalDateTime now = token.getEndDate();
setLastActivityDate(now);
setEndDate(now);
setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
this.jobDetails.setLastUpdate(now);
this.jobDetails.setEndDate(now);
this.jobDetails.setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
if (token.isFailed()) {
setStatus(JobStatus.failure);
setError(token.getError().getMessage());
setErrorStacktrace(ExceptionUtils.getStackTrace(token.getError()));
setLastActivityDate(LocalDateTime.now());
setError(token.getError());
}
if (this.callback != null) {
@ -222,38 +127,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public WfJournalEntry asLog() {
final Map<String, String> details = new LinkedHashMap<>();
details.putAll(getOutputParams());
details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + getPriority());
details.put(WorkflowsConstants.LOG_WF_ID, getWfId());
details.put(WorkflowsConstants.LOG_WF_CONF_ID, getWfConfId());
if (getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, getError());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, getErrorStacktrace());
}
final WfJournalEntry pe = new WfJournalEntry();
pe.setProcessId(getId());
pe.setWfTemplateId(getWfId());
pe.setWfConfigurationId(getWfConfId());
pe.setName(getName());
pe.setFamily(getFamily());
pe.setDsId(getDsId());
pe.setDsName(getDsName());
pe.setApiId(getApiId());
pe.setStartDate(getStartDate());
pe.setEndDate(getEndDate());
pe.setStatus(getStatus());
pe.setDetails(details);
return pe;
}
public ProcessCallback getCallback() {
return this.callback;
}
@ -262,42 +135,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
this.callback = callback;
}
public void setFamily(final String family) {
this.family = family;
}
public void setWfId(final String wfId) {
this.wfId = wfId;
}
public void setName(final String name) {
this.name = name;
}
public void setWfConfId(final String wfConfId) {
this.wfConfId = wfConfId;
}
public void setParentId(final String parentId) {
this.parentId = parentId;
}
public void setPriority(final int priority) {
this.priority = priority;
}
public void setDsId(final String dsId) {
this.dsId = dsId;
}
public void setDsName(final String dsName) {
this.dsName = dsName;
}
public void setApiId(final String apiId) {
this.apiId = apiId;
}
public void setGraph(final Graph graph) {
this.graph = graph;
}

View File

@ -11,8 +11,8 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.EmailClient;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.NotificationCondition;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
@ -33,16 +33,16 @@ public class EmailSender {
@Autowired
private DnetServiceClientFactory clientFactory;
public void sendMails(final WorkflowProcess proc) {
public void sendMails(final String wfConfId, final WorkflowProcess proc) {
wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> {
this.wfSubscriptionRepository.findByWfConfigurationId(wfConfId).forEach(s -> {
if ((s.getCondition() == NotificationCondition.ALWAYS) ||
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getStatus() == JobStatus.failure)) ||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getStatus() == JobStatus.success))) {
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getJobDetails().getStatus() == JobStatus.failure)) ||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getJobDetails().getStatus() == JobStatus.success))) {
try {
final Map<String, Object> params = new HashMap<>();
clientFactory.getClient(EmailClient.class).sendStoredMail(s.getEmail(), fromMail, fromName, s.getMessageId(), params);
this.clientFactory.getClient(EmailClient.class).sendStoredMail(s.getEmail(), this.fromMail, this.fromName, s.getMessageId(), params);
} catch (final Exception e) {
log.error("Error sending mail to " + s.getEmail(), e);