Merge pull request 'new_history_table' (#1) from new_history_table into main
Reviewed-on: #1
This commit is contained in:
commit
c709559c06
|
@ -7,11 +7,11 @@ BEGIN;
|
|||
|
||||
-- WF History
|
||||
|
||||
CREATE TABLE wf_journal (
|
||||
process_id text PRIMARY KEY,
|
||||
wf_template_id text NOT NULL,
|
||||
wf_conf_id text,
|
||||
wf_executor text,
|
||||
CREATE TABLE wf_runtime (
|
||||
process_id text PRIMARY KEY,
|
||||
wf_conf_id text,
|
||||
wf_executor text,
|
||||
priority int NOT NULL DEFAULT 100,
|
||||
name text NOT NULL,
|
||||
family text NOT NULL,
|
||||
status text NOT NULL,
|
||||
|
@ -21,9 +21,41 @@ CREATE TABLE wf_journal (
|
|||
ds_id text,
|
||||
ds_name text,
|
||||
ds_api text,
|
||||
graph jsonb,
|
||||
params jsonb,
|
||||
details jsonb
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE wf_history (
|
||||
process_id text PRIMARY KEY,
|
||||
wf_conf_id text,
|
||||
name text NOT NULL,
|
||||
family text NOT NULL,
|
||||
status text NOT NULL,
|
||||
start_date timestamp,
|
||||
end_date timestamp,
|
||||
ds_id text,
|
||||
ds_name text,
|
||||
ds_api text,
|
||||
details jsonb
|
||||
);
|
||||
|
||||
CREATE VIEW wf_journal_view AS SELECT
|
||||
coalesce(r.process_id, h.process_id) AS process_id,
|
||||
coalesce(r.wf_conf_id, h.wf_conf_id) AS wf_conf_id,
|
||||
coalesce(r.name, h.name) AS name,
|
||||
coalesce(r.family, h.family) AS family,
|
||||
coalesce(r.status, h.status) AS status,
|
||||
coalesce(r.start_date, h.start_date) AS start_date,
|
||||
coalesce(r.end_date, h.end_date) AS end_date,
|
||||
coalesce(r.last_update, h.end_date) AS last_update,
|
||||
coalesce(r.ds_id, h.ds_id) AS ds_id,
|
||||
coalesce(r.ds_name, h.ds_name) AS ds_name,
|
||||
coalesce(r.ds_api, h.ds_api) AS ds_api,
|
||||
coalesce(r.details, h.details) AS details
|
||||
FROM wf_runtime r FULL OUTER JOIN wf_history h ON (r.process_id = h.process_id);
|
||||
|
||||
-- Workflows
|
||||
|
||||
CREATE TABLE wf_sections (
|
||||
|
|
|
@ -17,9 +17,9 @@ import eu.dnetlib.common.mapping.xslt.XsltTransformFactory;
|
|||
import eu.dnetlib.common.mdstores.backends.sql.MDStoreSqlBackend;
|
||||
import eu.dnetlib.domain.service.ServiceType;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WfSection;
|
||||
import eu.dnetlib.domain.wfs.WfSubscription;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
|
||||
@SpringBootApplication
|
||||
@EntityScan(basePackageClasses = {
|
||||
|
|
|
@ -13,9 +13,9 @@ import eu.dnetlib.common.mapping.xslt.DnetXsltFunction;
|
|||
import eu.dnetlib.common.mapping.xslt.XsltTransformFactory;
|
||||
import eu.dnetlib.domain.service.ServiceType;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WfSection;
|
||||
import eu.dnetlib.domain.wfs.WfSubscription;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
|
||||
@SpringBootApplication
|
||||
@EntityScan(basePackageClasses = {
|
||||
|
|
|
@ -19,9 +19,9 @@ import org.springframework.stereotype.Service;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
|
||||
import eu.dnetlib.wfs.repository.WfHistoryJobRepository;
|
||||
|
||||
@Service
|
||||
public class WfHistoryImporter {
|
||||
|
@ -29,7 +29,7 @@ public class WfHistoryImporter {
|
|||
private static final Log log = LogFactory.getLog(WfHistoryImporter.class);
|
||||
|
||||
@Autowired
|
||||
private WfJournalEntryRepository wfJournalEntryRepository;
|
||||
private WfHistoryJobRepository historyJobRepository;
|
||||
|
||||
public void load(final String path) throws Exception {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
|
@ -41,7 +41,7 @@ public class WfHistoryImporter {
|
|||
|
||||
private void saveWf(final JsonNode node) {
|
||||
|
||||
final WfJournalEntry wf = new WfJournalEntry();
|
||||
final WfHistoryJob wf = new WfHistoryJob();
|
||||
wf.setProcessId(node.get("system:processId").asText());
|
||||
wf.setName(node.get("system:wfName").asText());
|
||||
wf.setFamily(node.get("system:profileFamily").asText());
|
||||
|
@ -78,7 +78,7 @@ public class WfHistoryImporter {
|
|||
|
||||
wf.setDetails(details);
|
||||
|
||||
wfJournalEntryRepository.save(wf);
|
||||
this.historyJobRepository.save(wf);
|
||||
|
||||
log.info("Wf saved with id: " + wf.getProcessId());
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.wfs.manager.controller;
|
|||
import java.time.LocalDate;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -16,17 +17,22 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
|||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.SimpleResourceClient;
|
||||
import eu.dnetlib.common.controller.DnetRestController;
|
||||
import eu.dnetlib.domain.common.KeyValue;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WfRepoHiDesc;
|
||||
import eu.dnetlib.domain.wfs.WfRepoHiParams;
|
||||
import eu.dnetlib.domain.wfs.WfSection;
|
||||
import eu.dnetlib.domain.wfs.WfSubscription;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate.Node;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.wfs.manager.service.WorkflowManagerService;
|
||||
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
|
@ -35,42 +41,45 @@ public class ApiController extends DnetRestController {
|
|||
@Autowired
|
||||
private WorkflowManagerService wfManagerService;
|
||||
|
||||
@Autowired
|
||||
private DnetServiceClientFactory clientFactory;
|
||||
|
||||
@GetMapping("/history")
|
||||
public List<WfJournalEntry> history(
|
||||
@RequestParam(required = true) final int total,
|
||||
@RequestParam(required = false) final LocalDate from,
|
||||
@RequestParam(required = false) final LocalDate to) {
|
||||
return wfManagerService.findHistory(total, from, to);
|
||||
return this.wfManagerService.findHistory(total, from, to);
|
||||
}
|
||||
|
||||
@GetMapping("/history/byDsId/{dsId}")
|
||||
public List<WfJournalEntry> historyByDsId(@PathVariable final String dsId) {
|
||||
return wfManagerService.recentHistoryForDsId(dsId);
|
||||
return this.wfManagerService.recentHistoryForDsId(dsId);
|
||||
}
|
||||
|
||||
@GetMapping("/history/byApiId/{apiId}")
|
||||
public List<WfJournalEntry> historyByApiId(@PathVariable final String apiId) {
|
||||
return wfManagerService.recentHistoryForApiId(apiId);
|
||||
return this.wfManagerService.recentHistoryForApiId(apiId);
|
||||
}
|
||||
|
||||
@GetMapping("/history/byConf/{wfConfId}")
|
||||
public List<WfJournalEntry> history(@PathVariable final String wfConfId) {
|
||||
return wfManagerService.recentHistoryForConfiguration(wfConfId);
|
||||
return this.wfManagerService.recentHistoryForConfiguration(wfConfId);
|
||||
}
|
||||
|
||||
@GetMapping("/proc/{processId}")
|
||||
public List<WfJournalEntry> getProcessExecution(@PathVariable final String processId) {
|
||||
return Arrays.asList(wfManagerService.findProcessLog(processId));
|
||||
return Arrays.asList(this.wfManagerService.findProcessLog(processId));
|
||||
}
|
||||
|
||||
@GetMapping("/sections")
|
||||
public Iterable<WfSection> listWfSections() throws Exception {
|
||||
return wfManagerService.listSections();
|
||||
return this.wfManagerService.listSections();
|
||||
}
|
||||
|
||||
@GetMapping("/sections/{section}")
|
||||
public List<KeyValue<String>> listWfConfigurations(@PathVariable final String section) throws Exception {
|
||||
return wfManagerService.listWfConfigurationsBySection(section)
|
||||
return this.wfManagerService.listWfConfigurationsBySection(section)
|
||||
.stream()
|
||||
.map(x -> new KeyValue<>(x.getId(), x.getName()))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -78,76 +87,87 @@ public class ApiController extends DnetRestController {
|
|||
|
||||
@GetMapping("/conf/{id}")
|
||||
public WfConfiguration getWfConfiguration(@PathVariable final String id) throws Exception {
|
||||
return wfManagerService.findWorkflowConfiguration(id);
|
||||
return this.wfManagerService.findWorkflowConfiguration(id);
|
||||
}
|
||||
|
||||
@PostMapping("/conf")
|
||||
public WfConfiguration saveWfConfiguration(@RequestBody final WfConfiguration conf) throws Exception {
|
||||
return wfManagerService.saveWfConfiguration(conf);
|
||||
return this.wfManagerService.saveWfConfiguration(conf);
|
||||
}
|
||||
|
||||
@DeleteMapping("/conf/{id}")
|
||||
public void deleteWfConfiguration(@PathVariable final String id) throws Exception {
|
||||
wfManagerService.deleteWfConfiguration(id);
|
||||
this.wfManagerService.deleteWfConfiguration(id);
|
||||
}
|
||||
|
||||
@GetMapping("/conf/{id}/start")
|
||||
public List<WfJournalEntry> startWorkflowConfiguration(@PathVariable final String id) throws Exception {
|
||||
final WfConfiguration conf = wfManagerService.findWorkflowConfiguration(id);
|
||||
final WfConfiguration conf = this.wfManagerService.findWorkflowConfiguration(id);
|
||||
|
||||
wfManagerService.prepareNewJob(conf, false);
|
||||
this.wfManagerService.prepareNewJob(conf, false);
|
||||
|
||||
if (StringUtils.isNotBlank(conf.getApiId())) { return wfManagerService.recentHistoryForApiId(conf.getApiId()); }
|
||||
if (StringUtils.isNotBlank(conf.getDsId())) { return wfManagerService.recentHistoryForDsId(conf.getDsId()); }
|
||||
if (StringUtils.isNotBlank(conf.getId())) { return wfManagerService.recentHistoryForConfiguration(conf.getId()); }
|
||||
if (StringUtils.isNotBlank(conf.getApiId())) { return this.wfManagerService.recentHistoryForApiId(conf.getApiId()); }
|
||||
if (StringUtils.isNotBlank(conf.getDsId())) { return this.wfManagerService.recentHistoryForDsId(conf.getDsId()); }
|
||||
if (StringUtils.isNotBlank(conf.getId())) { return this.wfManagerService.recentHistoryForConfiguration(conf.getId()); }
|
||||
|
||||
return wfManagerService.recentHistory();
|
||||
return this.wfManagerService.recentHistory();
|
||||
}
|
||||
|
||||
@GetMapping("/conf/{id}/destroy")
|
||||
public List<WfJournalEntry> destroyWorkflowConfiguration(@PathVariable final String id) throws Exception {
|
||||
final WfConfiguration conf = wfManagerService.findWorkflowConfiguration(id);
|
||||
final WfConfiguration conf = this.wfManagerService.findWorkflowConfiguration(id);
|
||||
|
||||
wfManagerService.prepareNewJob(conf, true);
|
||||
this.wfManagerService.prepareNewJob(conf, true);
|
||||
|
||||
if (StringUtils.isNotBlank(conf.getApiId())) { return wfManagerService.recentHistoryForApiId(conf.getApiId()); }
|
||||
if (StringUtils.isNotBlank(conf.getDsId())) { return wfManagerService.recentHistoryForDsId(conf.getDsId()); }
|
||||
if (StringUtils.isNotBlank(conf.getId())) { return wfManagerService.recentHistoryForConfiguration(conf.getId()); }
|
||||
if (StringUtils.isNotBlank(conf.getApiId())) { return this.wfManagerService.recentHistoryForApiId(conf.getApiId()); }
|
||||
if (StringUtils.isNotBlank(conf.getDsId())) { return this.wfManagerService.recentHistoryForDsId(conf.getDsId()); }
|
||||
if (StringUtils.isNotBlank(conf.getId())) { return this.wfManagerService.recentHistoryForConfiguration(conf.getId()); }
|
||||
|
||||
return wfManagerService.recentHistory();
|
||||
return this.wfManagerService.recentHistory();
|
||||
|
||||
}
|
||||
|
||||
@GetMapping("/conf/{id}/subscriptions")
|
||||
public List<WfSubscription> listWorkflowSubscriptions(@PathVariable final String id) throws Exception {
|
||||
return wfManagerService.listSubscriptions(id);
|
||||
return this.wfManagerService.listSubscriptions(id);
|
||||
}
|
||||
|
||||
@PostMapping("/conf/{id}/subscriptions")
|
||||
public void saveWorkflowSubscriptions(@PathVariable final String id, @RequestBody final List<WfSubscription> subscriptions) throws Exception {
|
||||
wfManagerService.saveSubscriptions(id, subscriptions);
|
||||
this.wfManagerService.saveSubscriptions(id, subscriptions);
|
||||
}
|
||||
|
||||
@GetMapping("/repo-his")
|
||||
public List<WfRepoHiDesc> listRepoHIs() {
|
||||
return wfManagerService.listRepoHis();
|
||||
return this.wfManagerService.listRepoHis();
|
||||
}
|
||||
|
||||
@PostMapping("/repo-hi/{id}/start")
|
||||
public WfJournalEntry startRepoHi(@PathVariable final String id, @RequestBody final WfRepoHiParams params) {
|
||||
return wfManagerService
|
||||
.prepareNewJob(id, null, WorkflowsConstants.REPO_HI_FAMILY, WorkflowsConstants.REPO_HI_FAMILY, params.getDsId(), params.getDsName(), params
|
||||
.getApiId());
|
||||
public WfRunningJob startRepoHi(@PathVariable final String id, @RequestBody final WfRepoHiParams params) {
|
||||
final WfTemplate wfTmpl = this.clientFactory
|
||||
.getClient(SimpleResourceClient.class)
|
||||
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, id, WfTemplate.class);
|
||||
|
||||
final List<Node> graph = wfTmpl.getGraph();
|
||||
|
||||
// Prepare the parameters using their default values
|
||||
final Map<String, Object> wfParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null);
|
||||
|
||||
final String wfName = String.format("Aggregation of '%s' (api: %s)", params.getDsName(), params.getApiId());
|
||||
|
||||
return this.wfManagerService
|
||||
.prepareNewJob(null, wfName, WorkflowsConstants.REPO_HI_FAMILY, params.getDsId(), params.getDsName(), params
|
||||
.getApiId(), graph, wfParams);
|
||||
}
|
||||
|
||||
@GetMapping("/template/{id}")
|
||||
public WfTemplate findWfTemplate(@PathVariable final String id) {
|
||||
return wfManagerService.findWfTemplate(id);
|
||||
return this.wfManagerService.findWfTemplate(id);
|
||||
}
|
||||
|
||||
@GetMapping("/confs/byApi/{id}")
|
||||
public List<WfConfiguration> listWfConfigurationsByApi(@PathVariable final String id) {
|
||||
return wfManagerService.listWfConfigurationsByApiId(id);
|
||||
return this.wfManagerService.listWfConfigurationsByApiId(id);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,9 +12,9 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|||
import org.springframework.scheduling.support.CronExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
|
||||
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
|
||||
|
||||
|
@ -39,7 +39,7 @@ public class ScheduledWorkflowLauncher {
|
|||
public void verifySheduledWorkflows() {
|
||||
log.debug("Verifying scheduled workflows - START");
|
||||
|
||||
wfConfigurationRepository.findAll()
|
||||
this.wfConfigurationRepository.findAll()
|
||||
.stream()
|
||||
.filter(WfConfiguration::isEnabled)
|
||||
.filter(WfConfiguration::isConfigured)
|
||||
|
@ -48,7 +48,7 @@ public class ScheduledWorkflowLauncher {
|
|||
.filter(this::isReady)
|
||||
.forEach(conf -> {
|
||||
try {
|
||||
workflowManagerService.prepareNewJob(conf, false);
|
||||
this.workflowManagerService.prepareNewJob(conf, false);
|
||||
} catch (final Exception e) {
|
||||
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public class ScheduledWorkflowLauncher {
|
|||
log.debug("NOW : " + now);
|
||||
log.debug("LAST EXECUTION DATE : " + lastExecutionDate);
|
||||
log.debug("MIN INTERVAL (minutes) : " + minInterval);
|
||||
log.debug("WINDOW SIZE (ms) : " + windowSize);
|
||||
log.debug("WINDOW SIZE (ms) : " + this.windowSize);
|
||||
log.debug("MUST BE EXECUTED : " + res);
|
||||
log.debug("**************************************************************");
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ public class ScheduledWorkflowLauncher {
|
|||
}
|
||||
|
||||
private LocalDateTime calculateLastExecutionDate(final String id) {
|
||||
return wfJournalEntryRepository.findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(id)
|
||||
return this.wfJournalEntryRepository.findFirstByWfConfIdAndEndDateNotNullOrderByEndDateDesc(id)
|
||||
.map(WfJournalEntry::getEndDate)
|
||||
.orElse(LocalDateTime.MIN);
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ public class ScheduledWorkflowLauncher {
|
|||
try {
|
||||
final CronExpression cron = CronExpression.parse(cronExpression);
|
||||
|
||||
final LocalDateTime date = now.minus(windowSize, ChronoUnit.MINUTES);
|
||||
final LocalDateTime date = now.minus(this.windowSize, ChronoUnit.MINUTES);
|
||||
final LocalDateTime nextDate = cron.next(date);
|
||||
|
||||
if (nextDate != null) {
|
||||
|
@ -123,9 +123,9 @@ public class ScheduledWorkflowLauncher {
|
|||
|
||||
private boolean hasStatus(final WfConfiguration conf, final JobStatus status) {
|
||||
final String id = conf.getId();
|
||||
return wfJournalEntryRepository.findByStatus(status)
|
||||
return this.wfJournalEntryRepository.findByStatus(status)
|
||||
.stream()
|
||||
.map(WfJournalEntry::getWfConfigurationId)
|
||||
.map(WfJournalEntry::getWfConfId)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.filter(s -> s.equals(id))
|
||||
.map(s -> true)
|
||||
|
|
|
@ -6,6 +6,7 @@ import java.util.Comparator;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -17,18 +18,21 @@ import org.springframework.stereotype.Service;
|
|||
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.SimpleResourceClient;
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WfRepoHiDesc;
|
||||
import eu.dnetlib.domain.wfs.WfSection;
|
||||
import eu.dnetlib.domain.wfs.WfSubscription;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate.Node;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate.WfParam;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
|
||||
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
|
||||
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
|
||||
import eu.dnetlib.wfs.repository.WfSectionRepository;
|
||||
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
|
||||
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
|
||||
|
@ -42,6 +46,9 @@ public class WorkflowManagerService {
|
|||
|
||||
@Autowired
|
||||
private WfJournalEntryRepository wfJournalEntryRepository;
|
||||
|
||||
@Autowired
|
||||
private WfRunningJobRepository jobRepository;
|
||||
@Autowired
|
||||
private WfSectionRepository wfSectionRepository;
|
||||
@Autowired
|
||||
|
@ -55,22 +62,22 @@ public class WorkflowManagerService {
|
|||
|
||||
@Transactional
|
||||
public WfConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
|
||||
return wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
|
||||
return this.wfConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Iterable<WfSection> listSections() {
|
||||
return wfSectionRepository.findAll();
|
||||
return this.wfSectionRepository.findAll();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public List<WfConfiguration> listWfConfigurationsBySection(final String section) {
|
||||
return wfConfigurationRepository.findBySection(section);
|
||||
return this.wfConfigurationRepository.findBySection(section);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public List<WfConfiguration> listWfConfigurationsByApiId(final String apiId) {
|
||||
return wfConfigurationRepository.findByApiId(apiId);
|
||||
return this.wfConfigurationRepository.findByApiId(apiId);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
|
@ -83,38 +90,38 @@ public class WorkflowManagerService {
|
|||
|
||||
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
|
||||
|
||||
wfConfigurationRepository.save(conf);
|
||||
this.wfConfigurationRepository.save(conf);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
public WfTemplate findWfTemplate(final String name) {
|
||||
return clientFactory.getClient(SimpleResourceClient.class).findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
|
||||
return this.clientFactory.getClient(SimpleResourceClient.class).findResourceContent(WorkflowsConstants.WF_TEMPLATE, name, WfTemplate.class);
|
||||
}
|
||||
|
||||
public String findWfTemplateId(final String name) {
|
||||
return clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
|
||||
return this.clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void deleteWfConfiguration(final String id) {
|
||||
wfSubscriptionRepository.deleteByWfConfigurationId(id);
|
||||
wfConfigurationRepository.deleteById(id);
|
||||
this.wfSubscriptionRepository.deleteByWfConfigurationId(id);
|
||||
this.wfConfigurationRepository.deleteById(id);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public List<WfSubscription> listSubscriptions(final String id) {
|
||||
return wfSubscriptionRepository.findByWfConfigurationId(id);
|
||||
return this.wfSubscriptionRepository.findByWfConfigurationId(id);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void saveSubscriptions(final String wfConfId, final List<WfSubscription> subscriptions) {
|
||||
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
|
||||
wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
|
||||
wfSubscriptionRepository.saveAll(subscriptions);
|
||||
this.wfSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
|
||||
this.wfSubscriptionRepository.saveAll(subscriptions);
|
||||
}
|
||||
|
||||
public WfJournalEntry prepareNewJob(final WfConfiguration conf, final boolean destroy) {
|
||||
public WfRunningJob prepareNewJob(final WfConfiguration conf, final boolean destroy) {
|
||||
final String wfTemplateId = findWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
|
||||
|
||||
final String wfConfId = conf.getId();
|
||||
|
@ -142,26 +149,30 @@ public class WorkflowManagerService {
|
|||
final String dsName = conf.getDsName();
|
||||
final String apiId = conf.getApiId();
|
||||
|
||||
return prepareNewJob(wfTemplateId, wfConfId, name, family, dsId, dsName, apiId);
|
||||
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
|
||||
final WfTemplate wfTmpl = client.findResourceContent(wfTemplateId, WfTemplate.class);
|
||||
|
||||
final Map<String, Object> params = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
|
||||
|
||||
return prepareNewJob(wfConfId, name, family, dsId, dsName, apiId, wfTmpl.getGraph(), params);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public WfJournalEntry prepareNewJob(final String wfTemplateId,
|
||||
final String wfConfId,
|
||||
public WfRunningJob prepareNewJob(final String wfConfId,
|
||||
final String name,
|
||||
final String family,
|
||||
final String dsId,
|
||||
final String dsName,
|
||||
final String apiId) {
|
||||
|
||||
final WfJournalEntry job = new WfJournalEntry();
|
||||
final String apiId,
|
||||
final List<Node> graph,
|
||||
final Map<String, Object> params) {
|
||||
|
||||
final WfRunningJob job = new WfRunningJob();
|
||||
job.setProcessId(WfProcessUtils.generateProcessId());
|
||||
job.setWfConfigurationId(wfConfId);
|
||||
job.setName(name);
|
||||
job.setFamily(family);
|
||||
|
||||
job.setWfTemplateId(wfTemplateId);
|
||||
job.setWfConfId(wfConfId);
|
||||
|
||||
job.setDsId(dsId);
|
||||
job.setDsName(dsName);
|
||||
|
@ -175,46 +186,51 @@ public class WorkflowManagerService {
|
|||
job.setWfExecutor(null);
|
||||
job.setLastUpdate(LocalDateTime.now());
|
||||
|
||||
final WfJournalEntry saved = wfJournalEntryRepository.save(job);
|
||||
job.setGraph(graph);
|
||||
job.setParams(params);
|
||||
|
||||
jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
|
||||
final WfRunningJob saved = this.jobRepository.save(job);
|
||||
|
||||
this.jdbcTemplate.execute("NOTIFY " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL + ", '" + saved.getProcessId() + "'");
|
||||
|
||||
return saved;
|
||||
}
|
||||
|
||||
public List<WfJournalEntry> recentHistory() {
|
||||
return wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
return this.wfJournalEntryRepository.findAllByOrderByLastUpdateDesc(PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
}
|
||||
|
||||
public List<WfJournalEntry> recentHistoryForDsId(final String dsId) {
|
||||
return wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
return this.wfJournalEntryRepository.findByDsIdOrderByLastUpdateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
}
|
||||
|
||||
public List<WfJournalEntry> recentHistoryForApiId(final String apiId) {
|
||||
return wfJournalEntryRepository.findByApiIdOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
return this.wfJournalEntryRepository.findByApiIdOrderByLastUpdateDesc(apiId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
}
|
||||
|
||||
public List<WfJournalEntry> findHistory(final int total, final LocalDate from, final LocalDate to) {
|
||||
|
||||
final int max = ((total > 0) || (total < MAX_HISTORY_SIZE)) ? total : MAX_HISTORY_SIZE;
|
||||
|
||||
if ((from == null) && (to == null)) { return wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList(); }
|
||||
if ((from == null) && (to == null)) {
|
||||
return this.wfJournalEntryRepository.findAll(PageRequest.of(0, max, Sort.by("lastUpdate").descending())).toList();
|
||||
}
|
||||
final LocalDateTime fromTime = from != null ? from.atTime(0, 0, 0) : LocalDateTime.MIN;
|
||||
final LocalDateTime toTime = to != null ? to.atTime(23, 59, 59) : LocalDateTime.MAX;
|
||||
|
||||
return wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
|
||||
return this.wfJournalEntryRepository.findByLastUpdateBetweenOrderByLastUpdateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
|
||||
}
|
||||
|
||||
public List<WfJournalEntry> recentHistoryForConfiguration(final String wfConfId) {
|
||||
return wfJournalEntryRepository.findByWfConfigurationIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
return this.wfJournalEntryRepository.findByWfConfIdOrderByLastUpdateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
|
||||
}
|
||||
|
||||
public WfJournalEntry findProcessLog(final String processId) {
|
||||
return wfJournalEntryRepository.findById(processId).get();
|
||||
return this.wfJournalEntryRepository.findById(processId).get();
|
||||
}
|
||||
|
||||
public List<WfRepoHiDesc> listRepoHis() {
|
||||
final SimpleResourceClient client = clientFactory.getClient(SimpleResourceClient.class);
|
||||
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
|
||||
|
||||
return client.findResources(WorkflowsConstants.WF_TEMPLATE, WorkflowsConstants.REPO_HI_FAMILY)
|
||||
.stream()
|
||||
|
@ -222,7 +238,7 @@ public class WorkflowManagerService {
|
|||
final WfTemplate tmpl = client.findResourceContent(r.getId(), WfTemplate.class);
|
||||
|
||||
final WfRepoHiDesc repohi = new WfRepoHiDesc();
|
||||
repohi.setId(r.getId());
|
||||
repohi.setId(r.getName());
|
||||
repohi.setName(r.getName());
|
||||
repohi.setDescription(r.getDescription());
|
||||
repohi.setGraph(tmpl.getGraph());
|
||||
|
|
|
@ -15,8 +15,8 @@ import eu.dnetlib.domain.dsm.info.AggregationStage;
|
|||
import eu.dnetlib.domain.dsm.info.CollectionInfo;
|
||||
import eu.dnetlib.domain.dsm.info.CollectionMode;
|
||||
import eu.dnetlib.domain.dsm.info.TransformationInfo;
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
import eu.dnetlib.errors.DsmException;
|
||||
|
||||
public class WfManagerClient extends DnetServiceClient {
|
||||
|
|
|
@ -2,14 +2,13 @@ package eu.dnetlib.domain.wfs;
|
|||
|
||||
public class WorkflowsConstants {
|
||||
|
||||
// TODO (LOW PRIORITY): remove unused constants
|
||||
|
||||
public static final String DATASOURCE_PREFIX = "datasource:";
|
||||
|
||||
public static final String LOG_WF_NAME = "system:wfName";
|
||||
public static final String LOG_WF_ID = "system:wfId";
|
||||
public static final String LOG_WF_CONF_ID = "system:wfConfigurationId";
|
||||
|
||||
public static final String LOG_WF_FAMILY = "system:family";
|
||||
public static final String LOG_WF_PRIORITY = "system:priority";
|
||||
public static final String LOG_WF_PROCESS_ID = "system:processId";
|
||||
public static final String LOG_WF_PROCESS_STATUS = "system:processStatus";
|
||||
public static final String LOG_WF_PROCESS_START_DATE = "system:startDate";
|
||||
|
|
|
@ -1,40 +1,28 @@
|
|||
package eu.dnetlib.domain.wfs;
|
||||
package eu.dnetlib.domain.wfs.jobs;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.hibernate.annotations.Type;
|
||||
|
||||
import io.hypersistence.utils.hibernate.type.json.JsonType;
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.EnumType;
|
||||
import jakarta.persistence.Enumerated;
|
||||
import jakarta.persistence.Id;
|
||||
import jakarta.persistence.PrePersist;
|
||||
import jakarta.persistence.PreUpdate;
|
||||
import jakarta.persistence.Table;
|
||||
import jakarta.persistence.MappedSuperclass;
|
||||
|
||||
@Entity
|
||||
@Table(name = "wf_journal")
|
||||
public class WfJournalEntry implements Serializable {
|
||||
@MappedSuperclass
|
||||
public abstract class AbstractJob implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -326994850248506828L;
|
||||
private static final long serialVersionUID = -5402950017251447505L;
|
||||
|
||||
@Id
|
||||
@Column(name = "process_id")
|
||||
private String processId;
|
||||
|
||||
@Column(name = "wf_template_id")
|
||||
private String wfTemplateId;
|
||||
|
||||
@Column(name = "wf_conf_id")
|
||||
private String wfConfigurationId;
|
||||
|
||||
@Column(name = "wf_executor")
|
||||
private String wfExecutor;
|
||||
|
||||
@Column(name = "name")
|
||||
private String name;
|
||||
|
||||
|
@ -51,8 +39,8 @@ public class WfJournalEntry implements Serializable {
|
|||
@Column(name = "end_date")
|
||||
private LocalDateTime endDate;
|
||||
|
||||
@Column(name = "last_update")
|
||||
private LocalDateTime lastUpdate;
|
||||
@Column(name = "wf_conf_id")
|
||||
private String wfConfId;
|
||||
|
||||
@Column(name = "ds_id")
|
||||
private String dsId;
|
||||
|
@ -65,52 +53,18 @@ public class WfJournalEntry implements Serializable {
|
|||
|
||||
@Type(JsonType.class)
|
||||
@Column(name = "details")
|
||||
private Map<String, String> details;
|
||||
|
||||
@PrePersist
|
||||
protected void onCreate() {
|
||||
lastUpdate = LocalDateTime.now();
|
||||
}
|
||||
|
||||
@PreUpdate
|
||||
protected void onUpdate() {
|
||||
lastUpdate = LocalDateTime.now();
|
||||
}
|
||||
private Map<String, String> details = new HashMap<>();
|
||||
|
||||
public String getProcessId() {
|
||||
return processId;
|
||||
return this.processId;
|
||||
}
|
||||
|
||||
public void setProcessId(final String processId) {
|
||||
this.processId = processId;
|
||||
}
|
||||
|
||||
public String getWfTemplateId() {
|
||||
return wfTemplateId;
|
||||
}
|
||||
|
||||
public void setWfTemplateId(final String wfTemplateId) {
|
||||
this.wfTemplateId = wfTemplateId;
|
||||
}
|
||||
|
||||
public String getWfConfigurationId() {
|
||||
return wfConfigurationId;
|
||||
}
|
||||
|
||||
public void setWfConfigurationId(final String wfConfigurationId) {
|
||||
this.wfConfigurationId = wfConfigurationId;
|
||||
}
|
||||
|
||||
public String getWfExecutor() {
|
||||
return wfExecutor;
|
||||
}
|
||||
|
||||
public void setWfExecutor(final String wfExecutor) {
|
||||
this.wfExecutor = wfExecutor;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(final String name) {
|
||||
|
@ -118,7 +72,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public String getFamily() {
|
||||
return family;
|
||||
return this.family;
|
||||
}
|
||||
|
||||
public void setFamily(final String family) {
|
||||
|
@ -126,7 +80,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public JobStatus getStatus() {
|
||||
return status;
|
||||
return this.status;
|
||||
}
|
||||
|
||||
public void setStatus(final JobStatus status) {
|
||||
|
@ -134,7 +88,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public LocalDateTime getStartDate() {
|
||||
return startDate;
|
||||
return this.startDate;
|
||||
}
|
||||
|
||||
public void setStartDate(final LocalDateTime startDate) {
|
||||
|
@ -142,23 +96,23 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public LocalDateTime getEndDate() {
|
||||
return endDate;
|
||||
return this.endDate;
|
||||
}
|
||||
|
||||
public void setEndDate(final LocalDateTime endDate) {
|
||||
this.endDate = endDate;
|
||||
}
|
||||
|
||||
public LocalDateTime getLastUpdate() {
|
||||
return lastUpdate;
|
||||
public String getWfConfId() {
|
||||
return this.wfConfId;
|
||||
}
|
||||
|
||||
public void setLastUpdate(final LocalDateTime lastUpdate) {
|
||||
this.lastUpdate = lastUpdate;
|
||||
public void setWfConfId(final String wfConfId) {
|
||||
this.wfConfId = wfConfId;
|
||||
}
|
||||
|
||||
public String getDsId() {
|
||||
return dsId;
|
||||
return this.dsId;
|
||||
}
|
||||
|
||||
public void setDsId(final String dsId) {
|
||||
|
@ -166,7 +120,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public String getDsName() {
|
||||
return dsName;
|
||||
return this.dsName;
|
||||
}
|
||||
|
||||
public void setDsName(final String dsName) {
|
||||
|
@ -174,7 +128,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public String getApiId() {
|
||||
return apiId;
|
||||
return this.apiId;
|
||||
}
|
||||
|
||||
public void setApiId(final String apiId) {
|
||||
|
@ -182,7 +136,7 @@ public class WfJournalEntry implements Serializable {
|
|||
}
|
||||
|
||||
public Map<String, String> getDetails() {
|
||||
return details;
|
||||
return this.details;
|
||||
}
|
||||
|
||||
public void setDetails(final Map<String, String> details) {
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.domain.wfs;
|
||||
package eu.dnetlib.domain.wfs.jobs;
|
||||
|
||||
public enum JobStatus {
|
||||
created,
|
|
@ -0,0 +1,12 @@
|
|||
package eu.dnetlib.domain.wfs.jobs;
|
||||
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.Table;
|
||||
|
||||
@Entity
|
||||
@Table(name = "wf_history")
|
||||
public class WfHistoryJob extends AbstractJob {
|
||||
|
||||
private static final long serialVersionUID = -9122461293116018290L;
|
||||
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package eu.dnetlib.domain.wfs.jobs;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.Table;
|
||||
|
||||
@Entity
|
||||
@Table(name = "wf_journal_view")
|
||||
public class WfJournalEntry extends AbstractJob {
|
||||
|
||||
private static final long serialVersionUID = -326994850248506828L;
|
||||
|
||||
@Column(name = "last_update")
|
||||
private LocalDateTime lastUpdate;
|
||||
|
||||
public LocalDateTime getLastUpdate() {
|
||||
return this.lastUpdate;
|
||||
}
|
||||
|
||||
public void setLastUpdate(final LocalDateTime lastUpdate) {
|
||||
this.lastUpdate = lastUpdate;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
package eu.dnetlib.domain.wfs.jobs;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.hibernate.annotations.Type;
|
||||
|
||||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import io.hypersistence.utils.hibernate.type.json.JsonType;
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.Table;
|
||||
|
||||
@Entity
|
||||
@Table(name = "wf_runtime")
|
||||
public class WfRunningJob extends AbstractJob {
|
||||
|
||||
private static final long serialVersionUID = -3163064347762609210L;
|
||||
|
||||
@Column(name = "wf_executor")
|
||||
private String wfExecutor;
|
||||
|
||||
@Column(name = "priority")
|
||||
private int priority;
|
||||
|
||||
@Column(name = "last_update")
|
||||
private LocalDateTime lastUpdate;
|
||||
|
||||
@Type(JsonType.class)
|
||||
@Column(name = "graph")
|
||||
private List<WfTemplate.Node> graph = new ArrayList<>();
|
||||
|
||||
@Type(JsonType.class)
|
||||
@Column(name = "params")
|
||||
private Map<String, Object> params = new HashMap<>();
|
||||
|
||||
public String getWfExecutor() {
|
||||
return this.wfExecutor;
|
||||
}
|
||||
|
||||
public void setWfExecutor(final String wfExecutor) {
|
||||
this.wfExecutor = wfExecutor;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return this.priority;
|
||||
}
|
||||
|
||||
public void setPriority(final int priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public LocalDateTime getLastUpdate() {
|
||||
return this.lastUpdate;
|
||||
}
|
||||
|
||||
public void setLastUpdate(final LocalDateTime lastUpdate) {
|
||||
this.lastUpdate = lastUpdate;
|
||||
}
|
||||
|
||||
public List<WfTemplate.Node> getGraph() {
|
||||
return this.graph;
|
||||
}
|
||||
|
||||
public void setGraph(final List<WfTemplate.Node> graph) {
|
||||
this.graph = graph;
|
||||
}
|
||||
|
||||
public Map<String, Object> getParams() {
|
||||
return this.params;
|
||||
}
|
||||
|
||||
public void setParams(final Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package eu.dnetlib.wfs.repository;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
|
||||
|
||||
public interface WfHistoryJobRepository extends JpaRepository<WfHistoryJob, String> {
|
||||
|
||||
}
|
|
@ -6,22 +6,19 @@ import java.util.Optional;
|
|||
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
|
||||
import eu.dnetlib.utils.ReadOnlyRepository;
|
||||
|
||||
public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry, String> {
|
||||
public interface WfJournalEntryRepository extends ReadOnlyRepository<WfJournalEntry, String> {
|
||||
|
||||
// IMPORTANT: Postgres, by default, should return nulls first when you specify 'ORDER BY DESC'.
|
||||
// Otherwise it will be necessary to implement the native queries specifying 'ORDER BY end_date DESC NULLS FIRST';
|
||||
|
||||
Page<WfJournalEntry> findByLastUpdateBetweenOrderByLastUpdateDesc(LocalDateTime start, LocalDateTime end, Pageable pageable);
|
||||
|
||||
Page<WfJournalEntry> findByWfConfigurationIdOrderByLastUpdateDesc(String id, Pageable pageable);
|
||||
Page<WfJournalEntry> findByWfConfIdOrderByLastUpdateDesc(String id, Pageable pageable);
|
||||
|
||||
Page<WfJournalEntry> findAllByOrderByLastUpdateDesc(Pageable pageable);
|
||||
|
||||
|
@ -31,11 +28,6 @@ public interface WfJournalEntryRepository extends JpaRepository<WfJournalEntry,
|
|||
|
||||
List<WfJournalEntry> findByStatus(JobStatus status);
|
||||
|
||||
Optional<WfJournalEntry> findFirstByWfConfigurationIdAndEndDateNotNullOrderByEndDateDesc(String id);
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Query(value = "update WfJournalEntry set wfExecutor = ?2, status = ?3, startDate = ?4, lastUpdate = ?4 where processId = ?1 and wfExecutor is NULL")
|
||||
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);
|
||||
Optional<WfJournalEntry> findFirstByWfConfIdAndEndDateNotNullOrderByEndDateDesc(String id);
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package eu.dnetlib.wfs.repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
|
||||
public interface WfRunningJobRepository extends JpaRepository<WfRunningJob, String> {
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Query(value = "update WfRunningJob set wfExecutor = ?2, status = ?3, startDate = ?4, lastUpdate = ?4 where processId = ?1 and wfExecutor is NULL")
|
||||
void tryAssegnment(String id, String workerId, JobStatus status, LocalDateTime startDate);
|
||||
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package eu.dnetlib.wfs.controller;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
|
@ -8,10 +9,10 @@ import org.springframework.web.bind.annotation.GetMapping;
|
|||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
|
||||
import eu.dnetlib.common.controller.DnetRestController;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.wfs.graph.GraphLoader;
|
||||
import eu.dnetlib.wfs.procs.ProcessEngine;
|
||||
import eu.dnetlib.wfs.procs.ProcessRegistry;
|
||||
import eu.dnetlib.wfs.procs.WorkflowProcess;
|
||||
|
||||
public abstract class AbstractWfExecutorApiController extends DnetRestController {
|
||||
|
||||
|
@ -25,17 +26,22 @@ public abstract class AbstractWfExecutorApiController extends DnetRestController
|
|||
private GraphLoader graphLoader;
|
||||
|
||||
@GetMapping("/process/{id}")
|
||||
public WorkflowProcess findProcess(@PathVariable final String id) throws Exception {
|
||||
return registry.findProcess(id);
|
||||
public WfRunningJob findProcess(@PathVariable final String id) throws Exception {
|
||||
return this.registry.findProcess(id).getJobDetails();
|
||||
}
|
||||
|
||||
@GetMapping("/procs")
|
||||
public Set<String> findProcess() throws Exception {
|
||||
return this.registry.listProcIds();
|
||||
}
|
||||
|
||||
@DeleteMapping("/process/{id}")
|
||||
public void killProcess(@PathVariable final String id) throws Exception {
|
||||
engine.killProcess(id);
|
||||
this.engine.killProcess(id);
|
||||
}
|
||||
|
||||
@GetMapping("/node-types")
|
||||
public List<String> listAvailableNodes() {
|
||||
return graphLoader.getValidTypes().stream().sorted().toList();
|
||||
return this.graphLoader.getValidTypes().stream().sorted().toList();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.wfs.graph;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
@ -44,34 +45,34 @@ public class GraphLoader {
|
|||
private void init() {
|
||||
log.info("************************************");
|
||||
log.info("* Checking workflow nodes:");
|
||||
applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> {
|
||||
this.applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> {
|
||||
if (ProcessNode.class.isAssignableFrom(bean.getClass())) {
|
||||
log.info("* Type: " + key + " -> " + bean.getClass());
|
||||
validTypes.add(key);
|
||||
this.validTypes.add(key);
|
||||
} else {
|
||||
log.warn("* Type: " + key + " -> " + bean.getClass() + "(ERROR: it is not a ProcessNode)");
|
||||
}
|
||||
|
||||
});
|
||||
if (validTypes.isEmpty()) {
|
||||
if (this.validTypes.isEmpty()) {
|
||||
log.warn("* 0 nodes available");
|
||||
}
|
||||
log.info("************************************");
|
||||
}
|
||||
|
||||
public Graph loadGraph(final WfTemplate wfTemplate, final Map<String, Object> globalParams)
|
||||
public Graph loadGraph(final List<WfTemplate.Node> nodes, final Map<String, Object> globalParams)
|
||||
throws WorkflowManagerException {
|
||||
final Graph graph = new Graph();
|
||||
|
||||
graph.addNode(GraphNode.newSuccessNode());
|
||||
|
||||
for (final WfTemplate.Node node : wfTemplate.getGraph()) {
|
||||
for (final WfTemplate.Node node : nodes) {
|
||||
final String nodeName = node.getName();
|
||||
final String nodeType = node.getType();
|
||||
final boolean isStart = node.isStart();
|
||||
final boolean isJoin = node.isJoin();
|
||||
|
||||
final Map<String, Object> params = node.calculateInitialParams(globalParams, environment);
|
||||
final Map<String, Object> params = node.calculateInitialParams(globalParams, this.environment);
|
||||
final Map<String, Object> envParams = node.findEnvParams();
|
||||
final Map<String, String> outputEnvMap = node.outputToEnvMap();
|
||||
|
||||
|
@ -141,13 +142,13 @@ public class GraphLoader {
|
|||
if (n.isStart()) {
|
||||
foundStart = true;
|
||||
}
|
||||
if ((n.getType() != null) && !validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
|
||||
if ((n.getType() != null) && !this.validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
|
||||
}
|
||||
if (!foundStart) { throw new WorkflowManagerException("Start node not found"); }
|
||||
}
|
||||
|
||||
public Set<String> getValidTypes() {
|
||||
return validTypes;
|
||||
return this.validTypes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,11 +16,12 @@ public class DeleteWfConfigurationNode extends AbstractJobNode {
|
|||
|
||||
@Override
|
||||
protected void execute() throws Exception {
|
||||
final String wfConfId = getProcess().getWfConfId();
|
||||
|
||||
final String wfConfId = getProcess().getJobDetails().getWfConfId();
|
||||
|
||||
if (StringUtils.isBlank(wfConfId)) { throw new WorkflowManagerException("wfConfId is null"); }
|
||||
|
||||
wfConfigurationRepository.deleteById(wfConfId);
|
||||
this.wfConfigurationRepository.deleteById(wfConfId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -58,22 +58,22 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
|
|||
@Override
|
||||
protected void execute() throws Exception {
|
||||
|
||||
wfConfId = "wf-aggr-" + UUID.randomUUID();
|
||||
this.wfConfId = "wf-aggr-" + UUID.randomUUID();
|
||||
|
||||
final WfConfiguration conf = new WfConfiguration();
|
||||
conf.setId(wfConfId);
|
||||
conf.setId(this.wfConfId);
|
||||
conf.setSection(WorkflowsConstants.AGGREGATION_WF_CONFS_SECTION);
|
||||
conf.setParentId(null);
|
||||
|
||||
conf.setName(String.format("Aggregation of '%s' (api: %s) ", ds.getOfficialname(), api.getId()));
|
||||
conf.setName(String.format("Aggregation of '%s' (api: %s)", this.ds.getOfficialname(), this.api.getId()));
|
||||
conf.setDetails(new HashMap<>());
|
||||
|
||||
conf.setDsId(ds.getId());
|
||||
conf.setDsName(ds.getOfficialname());
|
||||
conf.setApiId(api.getId());
|
||||
conf.setDsId(this.ds.getId());
|
||||
conf.setDsName(this.ds.getOfficialname());
|
||||
conf.setApiId(this.api.getId());
|
||||
|
||||
conf.setWorkflow(wfId);
|
||||
conf.setDestroyWf(destroyWfId);
|
||||
conf.setWorkflow(this.wfId);
|
||||
conf.setDestroyWf(this.destroyWfId);
|
||||
conf.setEnabled(true);
|
||||
conf.setPriority(DEFAULT_AGGR_PRIORITY);
|
||||
|
||||
|
@ -83,19 +83,19 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
|
|||
|
||||
conf.setSystemParams(prepareSystemParams());
|
||||
|
||||
final List<WfParam> wfParams = clientFactory.getClient(SimpleResourceClient.class)
|
||||
final List<WfParam> wfParams = this.clientFactory.getClient(SimpleResourceClient.class)
|
||||
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, conf.getWorkflow(), WfTemplate.class)
|
||||
.getParameters();
|
||||
|
||||
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
|
||||
|
||||
wfConfigurationRepository.save(conf);
|
||||
this.wfConfigurationRepository.save(conf);
|
||||
}
|
||||
|
||||
private Map<String, Object> prepareSystemParams() {
|
||||
final Map<String, Object> map = new LinkedHashMap<>();
|
||||
map.put("nativeMdStoreId", nativeMdStoreId);
|
||||
map.put("cleanedMdStoreId", cleanedMdStoreId);
|
||||
map.put("nativeMdStoreId", this.nativeMdStoreId);
|
||||
map.put("cleanedMdStoreId", this.cleanedMdStoreId);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.DsmClient;
|
||||
import eu.dnetlib.domain.dsm.Api;
|
||||
import eu.dnetlib.wfs.annotations.WfInputParam;
|
||||
import eu.dnetlib.wfs.annotations.WfNode;
|
||||
import eu.dnetlib.wfs.nodes.AbstractJobNode;
|
||||
|
@ -14,29 +15,30 @@ public class ClearApiExtraFieldsNode extends AbstractJobNode {
|
|||
@WfInputParam
|
||||
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
|
||||
|
||||
@WfInputParam
|
||||
private Api api;
|
||||
|
||||
@Autowired
|
||||
private DnetServiceClientFactory clientFactory;
|
||||
|
||||
@Override
|
||||
protected void execute() throws Exception {
|
||||
|
||||
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
|
||||
final DsmClient dsm = this.clientFactory.getClient(DsmClient.class);
|
||||
|
||||
final String apiId = getProcess().getApiId();
|
||||
|
||||
switch (infoType.toUpperCase()) {
|
||||
switch (this.infoType.toUpperCase()) {
|
||||
case "COLLECT":
|
||||
dsm.updateApiCollectionInfo(apiId, null, 0);
|
||||
dsm.updateApiCollectionInfo(this.api.getId(), null, 0);
|
||||
break;
|
||||
case "AGGREGATOR":
|
||||
case "TRANSFORM":
|
||||
dsm.updateApiAggregationInfo(apiId, null, 0);
|
||||
dsm.updateApiAggregationInfo(this.api.getId(), null, 0);
|
||||
break;
|
||||
case "DOWNLOAD":
|
||||
dsm.updateApiDownloadInfo(apiId, null, 0);
|
||||
dsm.updateApiDownloadInfo(this.api.getId(), null, 0);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Invalid infoType: " + infoType);
|
||||
throw new RuntimeException("Invalid infoType: " + this.infoType);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,14 +24,13 @@ public class LoadDatasourceInfoNode extends AbstractJobNode {
|
|||
|
||||
@Override
|
||||
protected void execute() throws Exception {
|
||||
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
|
||||
final String dsId = getProcess().getJobDetails().getDsId();
|
||||
final String apiId = getProcess().getJobDetails().getApiId();
|
||||
|
||||
final String dsId = getProcess().getDsId();
|
||||
final String apiId = getProcess().getApiId();
|
||||
|
||||
ds = dsm.findDs(dsId);
|
||||
api = dsm.findApi(apiId);
|
||||
final DsmClient dsm = this.clientFactory.getClient(DsmClient.class);
|
||||
|
||||
this.ds = dsm.findDs(dsId);
|
||||
this.api = dsm.findApi(apiId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,21 +1,23 @@
|
|||
package eu.dnetlib.wfs.nodes.launch;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import eu.dnetlib.common.app.ServiceStatusRegistry;
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.SimpleResourceClient;
|
||||
import eu.dnetlib.domain.dsm.Api;
|
||||
import eu.dnetlib.domain.dsm.Datasource;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.wfs.annotations.WfInputParam;
|
||||
import eu.dnetlib.wfs.annotations.WfNode;
|
||||
import eu.dnetlib.wfs.nodes.ProcessNode;
|
||||
import eu.dnetlib.wfs.procs.ProcessEngine;
|
||||
import eu.dnetlib.wfs.procs.Token;
|
||||
import eu.dnetlib.wfs.procs.WorkflowProcess;
|
||||
import eu.dnetlib.wfs.utils.NodeCallback;
|
||||
|
@ -30,54 +32,41 @@ public class LaunchWorkflowJobNode extends ProcessNode {
|
|||
@WfInputParam
|
||||
private String wfId;
|
||||
|
||||
@WfInputParam(optional = true)
|
||||
private Datasource ds;
|
||||
|
||||
@WfInputParam(optional = true)
|
||||
private Api api;
|
||||
|
||||
@Autowired
|
||||
private DnetServiceClientFactory clientFactory;
|
||||
|
||||
@Override
|
||||
public final void execute(final Token token, final NodeCallback nodeCallback) {
|
||||
try {
|
||||
final ProcessEngine engine = getEngine();
|
||||
|
||||
final String procId = WfProcessUtils.generateProcessId();
|
||||
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
|
||||
|
||||
final WfTemplate tmpl = client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, this.wfId, WfTemplate.class);
|
||||
|
||||
final WorkflowProcess process = getProcess();
|
||||
|
||||
final WfConfiguration conf = new WfConfiguration();
|
||||
conf.setId("CHILD_" + UUID.randomUUID());
|
||||
conf.setParentId(process.getWfConfId());
|
||||
conf.setDetails(new HashMap<>());
|
||||
conf.setPriority(100);
|
||||
conf.setDsId(process.getDsId());
|
||||
conf.setDsName(process.getDsName());
|
||||
conf.setApiId(process.getApiId());
|
||||
conf.setEnabled(true);
|
||||
conf.setConfigured(true);
|
||||
conf.setSchedulingEnabled(false);
|
||||
conf.setCronExpression("");
|
||||
conf.setCronMinInterval(0);
|
||||
conf.setWorkflow(this.wfId);
|
||||
conf.setDestroyWf(null);
|
||||
conf.setSystemParams(process.getGlobalParams());
|
||||
conf.setUserParams(new HashMap<>());
|
||||
final WfRunningJob job = new WfRunningJob();
|
||||
|
||||
final WfJournalEntry job = new WfJournalEntry();
|
||||
job.setProcessId(procId);
|
||||
job.setWfConfigurationId(null);
|
||||
job.setName(process.getName() + ":" + getNodeName());
|
||||
job.setFamily(process.getFamily());
|
||||
|
||||
job.setWfTemplateId(this.wfId);
|
||||
|
||||
job.setDsId(process.getDsId());
|
||||
job.setDsName(process.getDsName());
|
||||
job.setApiId(process.getApiId());
|
||||
|
||||
job.setStatus(JobStatus.created);
|
||||
|
||||
job.setDetails(new LinkedHashMap<>());
|
||||
job.setProcessId(WfProcessUtils.generateProcessId());
|
||||
job.setPriority(process.getJobDetails().getPriority());
|
||||
job.setStartDate(null);
|
||||
job.setEndDate(null);
|
||||
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
|
||||
job.setLastUpdate(LocalDateTime.now());
|
||||
job.setEndDate(null);
|
||||
job.setGraph(tmpl.getGraph());
|
||||
// TODO (MEDIUM PRIORITY): verify if all necessary parameters are correctly passed
|
||||
job.setParams(process.getJobDetails().getParams());
|
||||
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
|
||||
job.setStatus(JobStatus.accepted);
|
||||
|
||||
engine.startWorkflowJob(job, conf, new ProcessCallback() {
|
||||
// TODO (MEDIUM PRIORITY): verify if the job should be saved in the DB
|
||||
|
||||
getEngine().startWorkflowJob(job, new ProcessCallback() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(final WorkflowProcess t) {
|
||||
|
@ -94,10 +83,10 @@ public class LaunchWorkflowJobNode extends ProcessNode {
|
|||
});
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("The child workflow [conf: " + conf.getId() + "] is starting with procId: " + procId);
|
||||
log.debug("The child workflow [conf: " + process.getJobDetails().getWfConfId() + "] is starting with procId: " + job.getProcessId());
|
||||
}
|
||||
|
||||
token.setProgressMessage("Launched sub workflow, proc: " + procId);
|
||||
token.setProgressMessage("Launched sub workflow, proc: " + job.getProcessId());
|
||||
|
||||
} catch (final Throwable e) {
|
||||
log.error("got exception while launching child workflow", e);
|
||||
|
|
|
@ -24,25 +24,20 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
|||
import org.springframework.stereotype.Service;
|
||||
|
||||
import eu.dnetlib.common.app.ServiceStatusRegistry;
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.SimpleResourceClient;
|
||||
import eu.dnetlib.domain.resource.SimpleResource;
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfConfiguration;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WfTemplate;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
import eu.dnetlib.wfs.graph.GraphLoader;
|
||||
import eu.dnetlib.wfs.graph.GraphNode;
|
||||
import eu.dnetlib.wfs.nodes.DefaultJobNode;
|
||||
import eu.dnetlib.wfs.nodes.ProcessNode;
|
||||
import eu.dnetlib.wfs.nodes.SuccessNode;
|
||||
import eu.dnetlib.wfs.repository.WfConfigurationRepository;
|
||||
import eu.dnetlib.wfs.repository.WfJournalEntryRepository;
|
||||
import eu.dnetlib.wfs.repository.WfHistoryJobRepository;
|
||||
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
|
||||
import eu.dnetlib.wfs.utils.EmailSender;
|
||||
import eu.dnetlib.wfs.utils.ProcessCallback;
|
||||
import eu.dnetlib.wfs.utils.WfConfigurationUtils;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.transaction.Transactional;
|
||||
|
||||
|
@ -54,24 +49,21 @@ public class ProcessEngine {
|
|||
@Autowired
|
||||
private EmailSender emailSender;
|
||||
|
||||
@Autowired
|
||||
private WfJournalEntryRepository wfJournalEntryRepository;
|
||||
|
||||
@Autowired
|
||||
private WfConfigurationRepository wfConfigurationRepository;
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
private WfRunningJobRepository jobRepository;
|
||||
|
||||
@Autowired
|
||||
private WfHistoryJobRepository historyJobRepository;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Autowired
|
||||
private ProcessRegistry processRegistry;
|
||||
|
||||
@Autowired
|
||||
private DnetServiceClientFactory clientFactory;
|
||||
|
||||
@Autowired
|
||||
private GraphLoader graphLoader;
|
||||
|
||||
|
@ -140,68 +132,23 @@ public class ProcessEngine {
|
|||
private void tryToStartWf(final String processId, final String executor) {
|
||||
log.debug("Trying to start " + processId);
|
||||
|
||||
this.wfJournalEntryRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
|
||||
this.jobRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
|
||||
|
||||
final Optional<WfJournalEntry> job = this.wfJournalEntryRepository.findById(processId);
|
||||
final Optional<WfRunningJob> job = this.jobRepository.findById(processId);
|
||||
|
||||
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
|
||||
startWorkflowJob(job.get(), null, null);
|
||||
startWorkflowJob(job.get(), null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void startWorkflowJob(final WfJournalEntry job, final WfConfiguration passedConf, final ProcessCallback callback) {
|
||||
public void startWorkflowJob(final WfRunningJob job, final ProcessCallback callback) {
|
||||
|
||||
final WorkflowProcess process = new WorkflowProcess(job);
|
||||
|
||||
try {
|
||||
log.debug("Configure process " + job.getProcessId() + " using wf template " + job.getWfTemplateId());
|
||||
|
||||
final SimpleResourceClient simpleResourceClient = this.clientFactory.getClient(SimpleResourceClient.class);
|
||||
|
||||
final SimpleResource wfMetadata = simpleResourceClient.findResource(job.getWfTemplateId());
|
||||
|
||||
if (!WorkflowsConstants.WF_TEMPLATE.equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + job.getWfTemplateId()); }
|
||||
|
||||
final WfTemplate wfTmpl = simpleResourceClient.findResourceContent(job.getWfTemplateId(), WfTemplate.class);
|
||||
|
||||
final Map<String, Object> globalParams = new HashMap<>();
|
||||
|
||||
final WorkflowProcess process = new WorkflowProcess(job.getProcessId());
|
||||
|
||||
if ((passedConf == null) && StringUtils.isBlank(job.getWfConfigurationId())) {
|
||||
process.setName(job.getFamily());
|
||||
process.setWfConfId(null);
|
||||
process.setParentId(null);
|
||||
process.setPriority(100);
|
||||
|
||||
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null));
|
||||
} else {
|
||||
final WfConfiguration conf = passedConf != null ? passedConf
|
||||
: this.wfConfigurationRepository.findById(job.getWfConfigurationId())
|
||||
.orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + job.getWfConfigurationId()));
|
||||
|
||||
if (!conf.isEnabled() || !conf.isConfigured()) {
|
||||
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
|
||||
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
|
||||
}
|
||||
|
||||
process.setName(conf.getName());
|
||||
process.setWfConfId(conf.getId());
|
||||
process.setParentId(conf.getParentId());
|
||||
process.setPriority(conf.getPriority());
|
||||
|
||||
globalParams.putAll(WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf));
|
||||
}
|
||||
|
||||
process.setWfId(job.getWfTemplateId());
|
||||
process.setFamily(wfMetadata.getSubtype());
|
||||
|
||||
process.setDsId(job.getDsId());
|
||||
process.setDsName(job.getDsName());
|
||||
process.setApiId(job.getApiId());
|
||||
|
||||
process.getGlobalParams().putAll(globalParams);
|
||||
process.setGraph(this.graphLoader.loadGraph(wfTmpl, globalParams));
|
||||
process.setGraph(this.graphLoader.loadGraph(job.getGraph(), job.getParams()));
|
||||
process.setCallback(callback);
|
||||
|
||||
this.processRegistry.registerProcess(process);
|
||||
|
@ -209,20 +156,13 @@ public class ProcessEngine {
|
|||
job.setStatus(JobStatus.running);
|
||||
job.setLastUpdate(LocalDateTime.now());
|
||||
|
||||
this.wfJournalEntryRepository.save(job);
|
||||
this.jobRepository.save(job);
|
||||
|
||||
startProcess(process);
|
||||
|
||||
} catch (final Throwable e) {
|
||||
final Map<String, String> details = new LinkedHashMap<>();
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, e.getMessage());
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(e));
|
||||
job.setDetails(details);
|
||||
job.setStatus(JobStatus.failure);
|
||||
job.setEndDate(LocalDateTime.now());
|
||||
job.setLastUpdate(LocalDateTime.now());
|
||||
|
||||
this.wfJournalEntryRepository.save(job);
|
||||
process.setError(e);
|
||||
updateRunningJob(process, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -236,23 +176,25 @@ public class ProcessEngine {
|
|||
log.debug(process.getGraph());
|
||||
|
||||
final LocalDateTime now = LocalDateTime.now();
|
||||
process.setStatus(JobStatus.running);
|
||||
process.setStartDate(now);
|
||||
process.setLastActivityDate(now);
|
||||
process.getJobDetails().setStatus(JobStatus.running);
|
||||
process.getJobDetails().setStartDate(now);
|
||||
|
||||
updateRunningJob(process, null);
|
||||
|
||||
try {
|
||||
for (final GraphNode graphNode : process.getGraph().startNodes()) {
|
||||
final Token token = process.newToken(process.getGlobalParams());
|
||||
final Token token = process.newToken(process.getJobDetails().getParams());
|
||||
executeNode(process, graphNode, token);
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.error("WorkflowProcess node instantiation failed", e);
|
||||
process.setStatus(JobStatus.failure);
|
||||
process.setError(e);
|
||||
updateRunningJob(process, null);
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
|
||||
process.setLastActivityDate(LocalDateTime.now());
|
||||
updateRunningJob(process, oldToken);
|
||||
|
||||
try {
|
||||
for (final GraphNode graphNode : process.getGraph().nextNodes(oldGraphNode, oldToken)) {
|
||||
|
@ -279,20 +221,15 @@ public class ProcessEngine {
|
|||
executeNode(process, graphNode, token);
|
||||
}
|
||||
}
|
||||
} catch (
|
||||
|
||||
final Throwable e) {
|
||||
} catch (final Throwable e) {
|
||||
log.error("WorkflowProcess node instantiation failed", e);
|
||||
process.setStatus(JobStatus.failure);
|
||||
process.setError(e.getMessage());
|
||||
process.setErrorStacktrace(ExceptionUtils.getStackTrace(e));
|
||||
process.setLastActivityDate(LocalDateTime.now());
|
||||
process.setError(e);
|
||||
updateRunningJob(process, oldToken);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void executeNode(final WorkflowProcess process, final GraphNode graphNode, final Token token) throws WorkflowManagerException {
|
||||
process.setLastActivityDate(LocalDateTime.now());
|
||||
|
||||
final Map<String, Object> params = new HashMap<>();
|
||||
if (graphNode.getParams() != null) {
|
||||
|
@ -343,12 +280,72 @@ public class ProcessEngine {
|
|||
|
||||
process.complete(token);
|
||||
|
||||
updateRunningJob(process, token);
|
||||
saveHistoryJob(process);
|
||||
|
||||
this.processRegistry.unregisterProcess(process.getId());
|
||||
|
||||
final String wfConfId = process.getJobDetails().getWfConfId();
|
||||
if (StringUtils.isNotBlank(wfConfId)) {
|
||||
this.emailSender.sendMails(wfConfId, process);
|
||||
}
|
||||
}
|
||||
|
||||
private void saveHistoryJob(final WorkflowProcess process) {
|
||||
|
||||
log.debug("Process completed " + process.getId());
|
||||
|
||||
final WfJournalEntry job = process.asLog();
|
||||
this.wfJournalEntryRepository.save(job);
|
||||
final WfHistoryJob historyJob = new WfHistoryJob();
|
||||
|
||||
this.emailSender.sendMails(process);
|
||||
historyJob.setProcessId(process.getId());
|
||||
historyJob.setName(process.getJobDetails().getName());
|
||||
historyJob.setFamily(process.getJobDetails().getFamily());
|
||||
historyJob.setWfConfId(process.getJobDetails().getWfConfId());
|
||||
historyJob.setDsId(process.getJobDetails().getDsId());
|
||||
historyJob.setDsName(process.getJobDetails().getDsName());
|
||||
historyJob.setApiId(process.getJobDetails().getApiId());
|
||||
historyJob.setStartDate(process.getJobDetails().getStartDate());
|
||||
historyJob.setEndDate(process.getJobDetails().getEndDate());
|
||||
|
||||
final Map<String, String> details = new LinkedHashMap<>();
|
||||
if (process.getError() != null) {
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
|
||||
historyJob.setStatus(JobStatus.failure);
|
||||
} else {
|
||||
historyJob.setStatus(JobStatus.success);
|
||||
}
|
||||
|
||||
details.putAll(process.getOutputParams());
|
||||
historyJob.setDetails(details);
|
||||
|
||||
this.historyJobRepository.save(historyJob);
|
||||
}
|
||||
|
||||
private void updateRunningJob(final WorkflowProcess process, final Token token) {
|
||||
|
||||
log.debug("UPDATING JOB USING TOKEN " + token);
|
||||
|
||||
final LocalDateTime now = LocalDateTime.now();
|
||||
process.getJobDetails().setLastUpdate(LocalDateTime.now());
|
||||
|
||||
if (process.isTerminated()) {
|
||||
final Map<String, String> details = new LinkedHashMap<>();
|
||||
details.putAll(process.getOutputParams());
|
||||
if (process.getError() != null) {
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
|
||||
process.getJobDetails().setStatus(JobStatus.failure);
|
||||
} else {
|
||||
process.getJobDetails().setStatus(JobStatus.success);
|
||||
}
|
||||
process.getJobDetails().setDetails(details);
|
||||
process.getJobDetails().setEndDate(now);
|
||||
} else {
|
||||
process.getJobDetails().setStatus(JobStatus.running);
|
||||
}
|
||||
|
||||
this.jobRepository.save(process.getJobDetails());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,12 +3,10 @@ package eu.dnetlib.wfs.procs;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
|
@ -18,7 +16,6 @@ public class ProcessRegistry {
|
|||
|
||||
private static final Log log = LogFactory.getLog(ProcessRegistry.class);
|
||||
private final Map<String, WorkflowProcess> procs = new HashMap<>();
|
||||
private final Map<String, WorkflowProcess> byConfId = new HashMap<>();
|
||||
|
||||
synchronized public int countRunningWfs() {
|
||||
int count = 0;
|
||||
|
@ -31,6 +28,10 @@ public class ProcessRegistry {
|
|||
return count;
|
||||
}
|
||||
|
||||
public Set<String> listProcIds() {
|
||||
return this.procs.keySet();
|
||||
}
|
||||
|
||||
public WorkflowProcess findProcess(final String procId) {
|
||||
return this.procs.get(procId);
|
||||
}
|
||||
|
@ -39,43 +40,16 @@ public class ProcessRegistry {
|
|||
return this.procs.values();
|
||||
}
|
||||
|
||||
public WorkflowProcess findProcsByConfigurationId(final String id) {
|
||||
return this.byConfId.get(id);
|
||||
}
|
||||
|
||||
public void registerProcess(final WorkflowProcess process) throws WorkflowManagerException {
|
||||
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
|
||||
log.error("Already registerd process: " + process);
|
||||
throw new WorkflowManagerException("Already registered process: " + process);
|
||||
}
|
||||
this.procs.put(process.getId(), process);
|
||||
|
||||
if (process.getWfConfId() != null) {
|
||||
this.byConfId.put(process.getWfConfId(), process);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
|
||||
public void removeTerminatedProcess() {
|
||||
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
|
||||
final WorkflowProcess proc = e.getValue();
|
||||
if (proc.isTerminated()) {
|
||||
unregisterProcess(proc.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void unregisterProcess(final String procId) {
|
||||
synchronized (this) {
|
||||
final WorkflowProcess process = this.procs.remove(procId);
|
||||
if (process != null) {
|
||||
this.byConfId.entrySet()
|
||||
.stream()
|
||||
.filter(e -> e.getValue().getId().equals(process.getId()))
|
||||
.map(Entry::getKey)
|
||||
.forEach(confId -> this.byConfId.remove(confId, process));
|
||||
}
|
||||
}
|
||||
this.procs.remove(procId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,17 +2,14 @@ package eu.dnetlib.wfs.procs;
|
|||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.domain.wfs.WorkflowsConstants;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
|
||||
import eu.dnetlib.wfs.graph.Graph;
|
||||
import eu.dnetlib.wfs.utils.ProcessCallback;
|
||||
|
||||
|
@ -28,93 +25,35 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
}
|
||||
|
||||
private final String id;
|
||||
private String family;
|
||||
private String wfId;
|
||||
|
||||
private String name;
|
||||
private String wfConfId;
|
||||
private String parentId;
|
||||
|
||||
private int priority = 100;
|
||||
|
||||
private String dsId;
|
||||
private String dsName;
|
||||
private String apiId;
|
||||
private final WfRunningJob jobDetails;
|
||||
|
||||
private Graph graph;
|
||||
|
||||
private ProcessCallback callback;
|
||||
private List<Token> tokens = new CopyOnWriteArrayList<>();
|
||||
private LocalDateTime lastActivityDate;
|
||||
private JobStatus status;
|
||||
private LocalDateTime startDate = LocalDateTime.MIN;
|
||||
private LocalDateTime endDate = LocalDateTime.MIN;
|
||||
|
||||
private final Map<String, List<Token>> pausedJoinNodeTokens = new HashMap<>();
|
||||
private final Map<String, Object> globalParams = new HashMap<>();
|
||||
private final Map<String, String> outputParams = new HashMap<>();
|
||||
|
||||
private String error;
|
||||
private String errorStacktrace;
|
||||
private Throwable error;
|
||||
|
||||
public WorkflowProcess(final String id) {
|
||||
this.id = id;
|
||||
this.status = JobStatus.created;
|
||||
this.lastActivityDate = LocalDateTime.now();
|
||||
public WorkflowProcess(final WfRunningJob jobDetails) {
|
||||
this.id = jobDetails.getProcessId();
|
||||
this.jobDetails = jobDetails;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public String getFamily() {
|
||||
return this.family;
|
||||
}
|
||||
|
||||
public String getWfId() {
|
||||
return this.wfId;
|
||||
}
|
||||
|
||||
public String getWfConfId() {
|
||||
return this.wfConfId;
|
||||
}
|
||||
|
||||
public String getParentId() {
|
||||
return this.parentId;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return this.priority;
|
||||
}
|
||||
|
||||
public String getDsId() {
|
||||
return this.dsId;
|
||||
}
|
||||
|
||||
public String getDsName() {
|
||||
return this.dsName;
|
||||
}
|
||||
|
||||
public String getApiId() {
|
||||
return this.apiId;
|
||||
public WfRunningJob getJobDetails() {
|
||||
return this.jobDetails;
|
||||
}
|
||||
|
||||
public Map<String, List<Token>> getPausedJoinNodeTokens() {
|
||||
return this.pausedJoinNodeTokens;
|
||||
}
|
||||
|
||||
public JobStatus getStatus() {
|
||||
return this.status;
|
||||
}
|
||||
|
||||
public void setStatus(final JobStatus status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public Graph getGraph() {
|
||||
return this.graph;
|
||||
}
|
||||
|
@ -124,70 +63,37 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
}
|
||||
|
||||
public void kill() {
|
||||
setStatus(JobStatus.killed);
|
||||
// TODO (MEDIUM PRIORITY)
|
||||
this.jobDetails.setStatus(JobStatus.killed);
|
||||
}
|
||||
|
||||
public boolean isTerminated() {
|
||||
return switch (this.status) {
|
||||
return (this.error != null) || switch (this.jobDetails.getStatus()) {
|
||||
case success, failure, killed -> true;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
public LocalDateTime getLastActivityDate() {
|
||||
return this.lastActivityDate;
|
||||
}
|
||||
|
||||
public void setLastActivityDate(final LocalDateTime lastActivityDate) {
|
||||
this.lastActivityDate = lastActivityDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("[process id='%s' name='%s']", this.id, this.name);
|
||||
return String.format("[process id='%s']", this.id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final WorkflowProcess wp) {
|
||||
return NumberUtils.compare(getPriority(), wp.getPriority());
|
||||
public int compareTo(final WorkflowProcess other) {
|
||||
final int n1 = this.jobDetails.getPriority();
|
||||
final int n2 = other.jobDetails.getPriority();
|
||||
return NumberUtils.compare(n1, n2);
|
||||
}
|
||||
|
||||
public Map<String, Object> getGlobalParams() {
|
||||
return this.globalParams;
|
||||
}
|
||||
|
||||
public void setStartDate(final LocalDateTime startDate) {
|
||||
this.startDate = startDate;
|
||||
}
|
||||
|
||||
public void setEndDate(final LocalDateTime endDate) {
|
||||
this.endDate = endDate;
|
||||
}
|
||||
|
||||
public LocalDateTime getStartDate() {
|
||||
return this.startDate;
|
||||
}
|
||||
|
||||
public LocalDateTime getEndDate() {
|
||||
return this.endDate;
|
||||
}
|
||||
|
||||
public String getError() {
|
||||
public Throwable getError() {
|
||||
return this.error;
|
||||
}
|
||||
|
||||
public void setError(final String error) {
|
||||
public void setError(final Throwable error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
public String getErrorStacktrace() {
|
||||
return this.errorStacktrace;
|
||||
}
|
||||
|
||||
public void setErrorStacktrace(final String errorStacktrace) {
|
||||
this.errorStacktrace = errorStacktrace;
|
||||
}
|
||||
|
||||
public Map<String, String> getOutputParams() {
|
||||
return this.outputParams;
|
||||
}
|
||||
|
@ -201,15 +107,14 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
|
||||
public void complete(final Token token) {
|
||||
final LocalDateTime now = token.getEndDate();
|
||||
setLastActivityDate(now);
|
||||
setEndDate(now);
|
||||
setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
|
||||
|
||||
this.jobDetails.setLastUpdate(now);
|
||||
|
||||
this.jobDetails.setEndDate(now);
|
||||
this.jobDetails.setStatus(token.isFailed() ? JobStatus.failure : JobStatus.success);
|
||||
|
||||
if (token.isFailed()) {
|
||||
setStatus(JobStatus.failure);
|
||||
setError(token.getError().getMessage());
|
||||
setErrorStacktrace(ExceptionUtils.getStackTrace(token.getError()));
|
||||
setLastActivityDate(LocalDateTime.now());
|
||||
setError(token.getError());
|
||||
}
|
||||
|
||||
if (this.callback != null) {
|
||||
|
@ -222,38 +127,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
|
||||
}
|
||||
|
||||
public WfJournalEntry asLog() {
|
||||
final Map<String, String> details = new LinkedHashMap<>();
|
||||
details.putAll(getOutputParams());
|
||||
details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + getPriority());
|
||||
details.put(WorkflowsConstants.LOG_WF_ID, getWfId());
|
||||
details.put(WorkflowsConstants.LOG_WF_CONF_ID, getWfConfId());
|
||||
|
||||
if (getError() != null) {
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, getError());
|
||||
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, getErrorStacktrace());
|
||||
}
|
||||
|
||||
final WfJournalEntry pe = new WfJournalEntry();
|
||||
pe.setProcessId(getId());
|
||||
pe.setWfTemplateId(getWfId());
|
||||
pe.setWfConfigurationId(getWfConfId());
|
||||
pe.setName(getName());
|
||||
pe.setFamily(getFamily());
|
||||
|
||||
pe.setDsId(getDsId());
|
||||
pe.setDsName(getDsName());
|
||||
pe.setApiId(getApiId());
|
||||
|
||||
pe.setStartDate(getStartDate());
|
||||
pe.setEndDate(getEndDate());
|
||||
pe.setStatus(getStatus());
|
||||
|
||||
pe.setDetails(details);
|
||||
|
||||
return pe;
|
||||
}
|
||||
|
||||
public ProcessCallback getCallback() {
|
||||
return this.callback;
|
||||
}
|
||||
|
@ -262,42 +135,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
|
|||
this.callback = callback;
|
||||
}
|
||||
|
||||
public void setFamily(final String family) {
|
||||
this.family = family;
|
||||
}
|
||||
|
||||
public void setWfId(final String wfId) {
|
||||
this.wfId = wfId;
|
||||
}
|
||||
|
||||
public void setName(final String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public void setWfConfId(final String wfConfId) {
|
||||
this.wfConfId = wfConfId;
|
||||
}
|
||||
|
||||
public void setParentId(final String parentId) {
|
||||
this.parentId = parentId;
|
||||
}
|
||||
|
||||
public void setPriority(final int priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public void setDsId(final String dsId) {
|
||||
this.dsId = dsId;
|
||||
}
|
||||
|
||||
public void setDsName(final String dsName) {
|
||||
this.dsName = dsName;
|
||||
}
|
||||
|
||||
public void setApiId(final String apiId) {
|
||||
this.apiId = apiId;
|
||||
}
|
||||
|
||||
public void setGraph(final Graph graph) {
|
||||
this.graph = graph;
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.springframework.stereotype.Service;
|
|||
|
||||
import eu.dnetlib.common.clients.DnetServiceClientFactory;
|
||||
import eu.dnetlib.common.clients.EmailClient;
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.NotificationCondition;
|
||||
import eu.dnetlib.domain.wfs.jobs.JobStatus;
|
||||
import eu.dnetlib.wfs.procs.WorkflowProcess;
|
||||
import eu.dnetlib.wfs.repository.WfSubscriptionRepository;
|
||||
|
||||
|
@ -33,16 +33,16 @@ public class EmailSender {
|
|||
@Autowired
|
||||
private DnetServiceClientFactory clientFactory;
|
||||
|
||||
public void sendMails(final WorkflowProcess proc) {
|
||||
public void sendMails(final String wfConfId, final WorkflowProcess proc) {
|
||||
|
||||
wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> {
|
||||
this.wfSubscriptionRepository.findByWfConfigurationId(wfConfId).forEach(s -> {
|
||||
if ((s.getCondition() == NotificationCondition.ALWAYS) ||
|
||||
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getStatus() == JobStatus.failure)) ||
|
||||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getStatus() == JobStatus.success))) {
|
||||
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getJobDetails().getStatus() == JobStatus.failure)) ||
|
||||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getJobDetails().getStatus() == JobStatus.success))) {
|
||||
try {
|
||||
final Map<String, Object> params = new HashMap<>();
|
||||
|
||||
clientFactory.getClient(EmailClient.class).sendStoredMail(s.getEmail(), fromMail, fromName, s.getMessageId(), params);
|
||||
this.clientFactory.getClient(EmailClient.class).sendStoredMail(s.getEmail(), this.fromMail, this.fromName, s.getMessageId(), params);
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Error sending mail to " + s.getEmail(), e);
|
||||
|
|
Loading…
Reference in New Issue