renaming classes

This commit is contained in:
Michele Artini 2023-03-22 10:42:33 +01:00
parent f168ca9a2b
commit e40094a0b8
20 changed files with 189 additions and 192 deletions

View File

@ -13,49 +13,49 @@ import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.is.info.KeyValue;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.model.WorkflowSubscription;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
@RestController
@RequestMapping("/ajax/wf_instances")
public class WfInstancesController extends AbstractDnetController {
@RequestMapping("/ajax/wfs")
public class WfConfigurationsController extends AbstractDnetController {
private WorkflowManagerService wfManagerService;
@GetMapping("/sections")
public List<KeyValue<Long>> listWfFamilies() throws Exception {
public List<KeyValue<Long>> listWfSections() throws Exception {
return wfManagerService.streamSections()
.map(x -> new KeyValue<>(x.getValue(), x.getCount()))
.collect(Collectors.toList());
}
@GetMapping("/search")
public List<KeyValue<String>> listWfInstances(@RequestParam final String section) throws Exception {
return wfManagerService.streamWfInstancesBySection(section)
public List<KeyValue<String>> listWfConfigurations(@RequestParam final String section) throws Exception {
return wfManagerService.streamWfConfigurationsBySection(section)
.map(x -> new KeyValue<>(x.getId(), x.getName()))
.collect(Collectors.toList());
}
@GetMapping("/instance/{id}")
public WorkflowInstance getWfInstance(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowInstance(id);
@GetMapping("/conf/{id}")
public WorkflowConfiguration getWfConfiguration(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowConfiguration(id);
}
@PostMapping("/instance")
public void saveWfInstance(@RequestBody final WorkflowInstance instance) throws Exception {
wfManagerService.saveWfInstance(instance);
@PostMapping("/conf")
public void saveWfConfiguration(@RequestBody final WorkflowConfiguration conf) throws Exception {
wfManagerService.saveWfConfiguration(conf);
}
@DeleteMapping("/instance/{id}")
public void deleteWfInstance(@PathVariable final String id) throws Exception {
wfManagerService.deleteWfInstance(id);
@DeleteMapping("/conf/{id}")
public void deleteWfConfiguration(@PathVariable final String id) throws Exception {
wfManagerService.deleteWfConfiguration(id);
}
@GetMapping("/instance/{id}/start")
public ExecutionStatus startWorkflowInstance(@PathVariable final String id) throws Exception {
return wfManagerService.startWorkflowInstance(id, null, null);
@GetMapping("/conf/{id}/start")
public ExecutionStatus startWorkflowConfiguration(@PathVariable final String id) throws Exception {
return wfManagerService.startWorkflowConfiguration(id, null, null);
}
@GetMapping("/process/{id}")
@ -68,12 +68,12 @@ public class WfInstancesController extends AbstractDnetController {
wfManagerService.killProcess(id);
}
@GetMapping("/instance/{id}/subscriptions")
@GetMapping("/conf/{id}/subscriptions")
public List<WorkflowSubscription> listWorkflowSubscriptions(@PathVariable final String id) throws Exception {
return wfManagerService.listSubscriptions(id);
}
@PostMapping("/instance/{id}/subscriptions")
@PostMapping("/conf/{id}/subscriptions")
public void saveWorkflowSubscriptions(@PathVariable final String id, @RequestBody final List<WorkflowSubscription> subscriptions) throws Exception {
wfManagerService.saveSubscriptions(id, subscriptions);
}

View File

@ -208,7 +208,7 @@ export interface EmailTemplate {
}
export interface WfInstance {
export interface WfConf {
id: string,
details: Map<string, string>,
priority: number,
@ -226,7 +226,7 @@ export interface WfInstance {
userParams: Map<string, string>
}
export interface WfInstanceSubscription {
export interface WfSubscription {
}

View File

@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Page, DsmConf, ResourceType, Protocol, WfHistoryEntry, SimpleResource, Context, ContextNode, Vocabulary, VocabularyTerm, KeyValue, BrowseTerm, Datasource, MDStore, MDStoreVersion, MDStoreRecord, EmailTemplate, WfInstance, WfInstanceSubscription, WfProcessStatus } from './is.model';
import { Page, DsmConf, ResourceType, Protocol, WfHistoryEntry, SimpleResource, Context, ContextNode, Vocabulary, VocabularyTerm, KeyValue, BrowseTerm, Datasource, MDStore, MDStoreVersion, MDStoreRecord, EmailTemplate, WfConf, WfSubscription, WfProcessStatus } from './is.model';
import { FormGroup } from '@angular/forms';
import { MatSnackBar } from '@angular/material/snack-bar';
@ -351,70 +351,70 @@ export class ISService {
}
loadWfIntancesSections(onSuccess: Function): void {
this.client.get<KeyValue[]>('./ajax/wf_instances/sections').subscribe({
this.client.get<KeyValue[]>('./ajax/wfs/sections').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
loadWfIntances(section: string, onSuccess: Function): void {
this.client.get<KeyValue[]>('./ajax/wf_instances/search?section=' + encodeURIComponent(section)).subscribe({
this.client.get<KeyValue[]>('./ajax/wfs/search?section=' + encodeURIComponent(section)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
loadWfIntance(id: string, onSuccess: Function): void {
this.client.get<WfInstance>('./ajax/wf_instances/instance/' + encodeURIComponent(id)).subscribe({
this.client.get<WfConf>('./ajax/wfs/conf/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
saveWfIntance(instance: WfInstance, onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wf_instances/instance', instance).subscribe({
saveWfIntance(conf: WfConf, onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wfs/conf', conf).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error, relatedForm)
});
}
deleteWfIntance(id: string, onSuccess: Function): void {
this.client.delete<void>('./ajax/wf_instances/instance/' + encodeURIComponent(id)).subscribe({
this.client.delete<void>('./ajax/wfs/conf/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
startWfIntance(id: string, onSuccess: Function): void {
this.client.get<WfProcessStatus>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/start').subscribe({
this.client.get<WfProcessStatus>('./ajax/wfs/conf/' + encodeURIComponent(id) + '/start').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
findProcess(id: string, onSuccess: Function): void {
this.client.get<WfProcessStatus>('./ajax/wf_instances/process/' + encodeURIComponent(id)).subscribe({
this.client.get<WfProcessStatus>('./ajax/wfs/process/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
killProcess(id: string, onSuccess: Function): void {
this.client.delete<void>('./ajax/wf_instances/process/' + encodeURIComponent(id)).subscribe({
this.client.delete<void>('./ajax/wfs/process/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
findWfInstanceSubscriptions(id: string, onSuccess: Function): void {
this.client.get<WfInstanceSubscription[]>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/subscriptions').subscribe({
findWfSubscriptions(id: string, onSuccess: Function): void {
this.client.get<WfSubscription[]>('./ajax/wfs/conf/' + encodeURIComponent(id) + '/subscriptions').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
saveWfInstanceSubscriptions(id: string, subscriptions: WfInstanceSubscription[], onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/subscriptions', subscriptions).subscribe({
saveWfSubscriptions(id: string, subscriptions: WfSubscription[], onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wfs/conf/' + encodeURIComponent(id) + '/subscriptions', subscriptions).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error, relatedForm)
});

View File

@ -30,8 +30,8 @@ public class WfHistoryEntry implements Serializable {
@Column(name = "process_id")
private String processId;
@Column(name = "wf_instance_id")
private String wfInstanceId;
@Column(name = "wf_conf_id")
private String wfConfigurationId;
@Column(name = "name")
private String name;
@ -69,12 +69,12 @@ public class WfHistoryEntry implements Serializable {
this.processId = processId;
}
public String getWfInstanceId() {
return wfInstanceId;
public String getWfConfigurationId() {
return wfConfigurationId;
}
public void setWfInstanceId(final String wfInstanceId) {
this.wfInstanceId = wfInstanceId;
public void setWfConfigurationId(final String wfConfigurationId) {
this.wfConfigurationId = wfConfigurationId;
}
public String getName() {

View File

@ -18,12 +18,12 @@ import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import com.vladmihalcea.hibernate.type.json.JsonStringType;
@Entity
@Table(name = "workflow_instances")
@Table(name = "wf_configurations")
@TypeDefs({
@TypeDef(name = "json", typeClass = JsonStringType.class),
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
})
public class WorkflowInstance implements Serializable {
public class WorkflowConfiguration implements Serializable {
private static final long serialVersionUID = 7503841966138333044L;

View File

@ -11,15 +11,15 @@ import javax.persistence.IdClass;
import javax.persistence.Table;
@Entity
@Table(name = "workflow_subscriptions")
@Table(name = "wf_subscriptions")
@IdClass(WorkflowSubscriptionPK.class)
public class WorkflowSubscription implements Serializable {
private static final long serialVersionUID = -3662770213782581404L;
@Id
@Column(name = "wf_instance_id")
private String wfInstanceId;
@Column(name = "wf_conf_id")
private String wfConfigurationId;
@Id
@Column(name = "condition")
@ -33,12 +33,12 @@ public class WorkflowSubscription implements Serializable {
@Column(name = "message_id")
private String messageId;
public String getWfInstanceId() {
return wfInstanceId;
public String getWfConfigurationId() {
return wfConfigurationId;
}
public void setWfInstanceId(final String wfInstanceId) {
this.wfInstanceId = wfInstanceId;
public void setWfConfigurationId(final String wfConfigurationId) {
this.wfConfigurationId = wfConfigurationId;
}
public NotificationCondition getCondition() {

View File

@ -6,18 +6,18 @@ public class WorkflowSubscriptionPK implements Serializable {
private static final long serialVersionUID = -7569690774071644848L;
private String wfInstanceId;
private String wfConfigurationId;
private NotificationCondition condition;
private String email;
public String getWfInstanceId() {
return wfInstanceId;
public String getWfConfigurationId() {
return wfConfigurationId;
}
public void setWfInstanceId(final String wfInstanceId) {
this.wfInstanceId = wfInstanceId;
public void setWfConfigurationId(final String wfConfigurationId) {
this.wfConfigurationId = wfConfigurationId;
}
public NotificationCondition getCondition() {

View File

@ -106,7 +106,7 @@ CREATE INDEX ON context_cat_concepts_lvl_2 (parent);
CREATE TABLE wf_history (
process_id text PRIMARY KEY,
wf_instance_id text NOT NULL,
wf_conf_id text NOT NULL,
name text NOT NULL,
family text NOT NULL,
status text NOT NULL,
@ -132,7 +132,7 @@ INSERT INTO resource_types(id, name, content_type) VALUES
('cleaning_rule', 'Cleaning Rules', 'application/xml'),
('hadoop_job_configuration', 'Hadoop Job Configurations', 'application/xml')
('dedup_configuration', 'Dedup Configurations', 'application/json')
('workflow', 'Workflows', 'application/json');
('wf_template', 'Workflow Templates', 'application/json');
CREATE TABLE resources (
id text PRIMARY KEY,
@ -262,7 +262,7 @@ CREATE TABLE emails (
-- Workflows
CREATE TABLE workflow_instances (
CREATE TABLE wf_configurations (
id text PRIMARY KEY,
name text NOT NULL,
section text,
@ -282,10 +282,10 @@ CREATE TABLE workflow_instances (
user_params jsonb NOT NULL DEFAULT '{}'
);
CREATE TABLE workflow_subscriptions (
wf_instance_id text NOT NULL REFERENCES workflow_instances(id),
CREATE TABLE wf_subscriptions (
wf_conf_id text NOT NULL REFERENCES wf_configurations(id),
condition text NOT NULL,
email text NOT NULL,
message_id text NOT NULL REFERENCES emails(id),
PRIMARY KEY (wf_instance_id, condition, email)
PRIMARY KEY (wf_conf_id, condition, email)
);

View File

@ -44,8 +44,8 @@ public class WorkflowLogger {
wfHistoryEntryRepository.save(pe);
}
public Optional<WfHistoryEntry> getLastLogForInstance(final String id) {
return wfHistoryEntryRepository.findOneByWfInstanceIdOrderByEndDateAsc(id);
public Optional<WfHistoryEntry> getLastLogForConfiguration(final String id) {
return wfHistoryEntryRepository.findOneByWfConfigurationIdOrderByEndDateAsc(id);
}
}

View File

@ -12,5 +12,5 @@ public interface WfHistoryEntryRepository extends JpaRepository<WfHistoryEntry,
List<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end);
Optional<WfHistoryEntry> findOneByWfInstanceIdOrderByEndDateAsc(String id);
Optional<WfHistoryEntry> findOneByWfConfigurationIdOrderByEndDateAsc(String id);
}

View File

@ -5,17 +5,17 @@ import java.util.stream.Stream;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.utils.CountedValue;
public interface WorkflowInstanceRepository extends JpaRepository<WorkflowInstance, String> {
public interface WorkflowConfigurationRepository extends JpaRepository<WorkflowConfiguration, String> {
@Query(value = "select section as value, count(*) as count "
+ "from workflow_instances "
+ "from wf_configurations "
+ "group by section "
+ "order by count desc;", nativeQuery = true)
Stream<CountedValue> streamSections();
Stream<WorkflowInstance> findBySection(String section);
Stream<WorkflowConfiguration> findBySection(String section);
}

View File

@ -9,7 +9,7 @@ import eu.dnetlib.manager.wf.model.WorkflowSubscriptionPK;
public interface WorkflowSubscriptionRepository extends JpaRepository<WorkflowSubscription, WorkflowSubscriptionPK> {
List<WorkflowSubscription> findByWfInstanceId(String wfInstanceId);
List<WorkflowSubscription> findByWfConfigurationId(String wfConfigurationId);
void deleteByWfInstanceId(String wfInstanceId);
void deleteByWfConfigurationId(String wfConfigurationId);
}

View File

@ -23,10 +23,10 @@ import eu.dnetlib.errors.DsmException;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowSubscription;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
@ -56,7 +56,7 @@ public class WorkflowManagerService implements Stoppable {
@Autowired
private SimpleResourceRepository simpleResourceRepository;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
private WorkflowConfigurationRepository workflowConfigurationRepository;
@Autowired
private WorkflowSubscriptionRepository workflowSubscriptionRepository;
@ -76,8 +76,8 @@ public class WorkflowManagerService implements Stoppable {
}, 10, 10, TimeUnit.SECONDS);
}
public WorkflowInstance findWorkflowInstance(final String id) throws WorkflowManagerException {
return workflowInstanceRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + id));
public WorkflowConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
return workflowConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
}
public ExecutionStatus startRepoHiWorkflow(final String wfId,
@ -94,56 +94,56 @@ public class WorkflowManagerService implements Stoppable {
try {
final String dsName = dsmService.getDs(dsId).getOfficialname();
final WorkflowInstance instance = new WorkflowInstance();
instance.setId("REPO_HI_" + UUID.randomUUID());
instance.setDetails(new HashMap<>());
instance.setPriority(100);
instance.setDsId(dsId);
instance.setDsName(dsName);
instance.setApiId(apiId);
instance.setEnabled(true);
instance.setConfigured(true);
instance.setSchedulingEnabled(false);
instance.setCronExpression("");
instance.setCronMinInterval(0);
instance.setWorkflow(wfId);
instance.setDestroyWf(null);
instance.setSystemParams(new HashMap<>());
instance.setUserParams(new HashMap<>());
final WorkflowConfiguration conf = new WorkflowConfiguration();
conf.setId("REPO_HI_" + UUID.randomUUID());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(dsId);
conf.setDsName(dsName);
conf.setApiId(apiId);
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(wfId);
conf.setDestroyWf(null);
conf.setSystemParams(new HashMap<>());
conf.setUserParams(new HashMap<>());
return startWorkflowInstance(instance, callback);
return startWorkflowConfiguration(conf, callback);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
}
public ExecutionStatus startWorkflowInstance(final String wfInstanceId,
public ExecutionStatus startWorkflowConfiguration(final String wfConfId,
final String parent,
final ExecutionCallback<WorkflowProcess> callback) throws Exception {
if (isPaused()) {
log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown");
log.warn("Wf configuration " + wfConfId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
final WorkflowInstance instance = findWorkflowInstance(wfInstanceId);
final WorkflowConfiguration conf = findWorkflowConfiguration(wfConfId);
return startWorkflowInstance(instance, callback);
return startWorkflowConfiguration(conf, callback);
}
public ExecutionStatus startWorkflowInstance(final WorkflowInstance wfInstance,
public ExecutionStatus startWorkflowConfiguration(final WorkflowConfiguration conf,
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) {
log.warn("Wf instance " + wfInstance.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf instance " + wfInstance.getId() + " is not ready to start");
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResource wfMetadata = simpleResourceRepository
.findById(wfInstance.getWorkflow())
.findById(conf.getWorkflow())
.filter(r -> r.getType().equals("workflows"))
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + wfInstance.getWorkflow()));
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + conf.getWorkflow()));
final WorkflowGraph wfGraph = simpleResourceRepository.findContentById(wfMetadata.getId())
.map(s -> {
@ -157,9 +157,9 @@ public class WorkflowManagerService implements Stoppable {
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, callback);
processFactory.newProcess(wfMetadata, wfGraph, conf, callback);
processRegistry.registerProcess(process, wfInstance);
processRegistry.registerProcess(process, conf);
return process.getExecutionStatus();
}
@ -197,31 +197,31 @@ public class WorkflowManagerService implements Stoppable {
}
public Stream<CountedValue> streamSections() {
return workflowInstanceRepository.streamSections();
return workflowConfigurationRepository.streamSections();
}
public Stream<WorkflowInstance> streamWfInstancesBySection(final String section) {
return workflowInstanceRepository.findBySection(section);
public Stream<WorkflowConfiguration> streamWfConfigurationsBySection(final String section) {
return workflowConfigurationRepository.findBySection(section);
}
public void saveWfInstance(final WorkflowInstance instance) {
workflowInstanceRepository.save(instance);
public void saveWfConfiguration(final WorkflowConfiguration conf) {
workflowConfigurationRepository.save(conf);
}
@Transactional
public void deleteWfInstance(final String id) {
workflowSubscriptionRepository.deleteByWfInstanceId(id);
workflowInstanceRepository.deleteById(id);
public void deleteWfConfiguration(final String id) {
workflowSubscriptionRepository.deleteByWfConfigurationId(id);
workflowConfigurationRepository.deleteById(id);
}
public List<WorkflowSubscription> listSubscriptions(final String id) {
return workflowSubscriptionRepository.findByWfInstanceId(id);
return workflowSubscriptionRepository.findByWfConfigurationId(id);
}
@Transactional
public void saveSubscriptions(final String instanceId, final List<WorkflowSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfInstanceId(instanceId));
workflowSubscriptionRepository.deleteByWfInstanceId(instanceId);
public void saveSubscriptions(final String wfConfId, final List<WorkflowSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
workflowSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
workflowSubscriptionRepository.saveAll(subscriptions);
}

View File

@ -13,8 +13,8 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@ -30,7 +30,7 @@ public class ScheduledWorkflowLauncher {
private ProcessRegistry processRegistry;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
private WorkflowConfigurationRepository workflowConfigurationRepository;
@Autowired
private WorkflowLogger logger;
@ -42,32 +42,32 @@ public class ScheduledWorkflowLauncher {
public void verifySheduledWorkflows() {
log.debug("Verifying scheduled workflows - START");
workflowInstanceRepository.findAll()
workflowConfigurationRepository.findAll()
.stream()
.filter(WorkflowInstance::isEnabled)
.filter(WorkflowInstance::isConfigured)
.filter(WorkflowInstance::isSchedulingEnabled)
.filter(WorkflowConfiguration::isEnabled)
.filter(WorkflowConfiguration::isConfigured)
.filter(WorkflowConfiguration::isSchedulingEnabled)
.filter(this::isNotRunning)
.filter(this::isReady)
.forEach(instance -> {
.forEach(conf -> {
try {
wfManagerService.startWorkflowInstance(instance, null);
wfManagerService.startWorkflowConfiguration(conf, null);
} catch (final Exception e) {
log.error("Error launching scheduled wf instance: " + instance.getId(), e);
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
}
});
log.debug("Verifying scheduled workflows - END");
}
private boolean isReady(final WorkflowInstance instance) {
final LocalDateTime lastExecutionDate = calculateLastExecutionDate(instance.getId());
private boolean isReady(final WorkflowConfiguration conf) {
final LocalDateTime lastExecutionDate = calculateLastExecutionDate(conf.getId());
final LocalDateTime now = LocalDateTime.now();
final String cron = instance.getCronExpression();
final String cron = conf.getCronExpression();
if (CronExpression.isValidExpression(cron)) {
final int minInterval = instance.getCronMinInterval(); // in minutes
final int minInterval = conf.getCronMinInterval(); // in minutes
final boolean res;
if (lastExecutionDate != null) {
@ -79,12 +79,12 @@ public class ScheduledWorkflowLauncher {
if (log.isDebugEnabled()) {
log.debug("**************************************************************");
log.debug("WORKFLOW INSTANCE ID : " + instance.getId());
log.debug("NOW : " + now);
log.debug("LAST EXECUTION DATE : " + lastExecutionDate);
log.debug("MIN INTERVAL (minutes) : " + minInterval);
log.debug("WINDOW SIZE (ms) : " + windowSize);
log.debug("MUST BE EXECUTED : " + res);
log.debug("WORKFLOW CONFIGURATION ID : " + conf.getId());
log.debug("NOW : " + now);
log.debug("LAST EXECUTION DATE : " + lastExecutionDate);
log.debug("MIN INTERVAL (minutes) : " + minInterval);
log.debug("WINDOW SIZE (ms) : " + windowSize);
log.debug("MUST BE EXECUTED : " + res);
log.debug("**************************************************************");
}
@ -95,7 +95,7 @@ public class ScheduledWorkflowLauncher {
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return logger.getLastLogForInstance(id)
return logger.getLastLogForConfiguration(id)
.map(e -> e.getEndDate())
.orElse(LocalDateTime.MIN);
}
@ -118,8 +118,8 @@ public class ScheduledWorkflowLauncher {
}
}
private boolean isNotRunning(final WorkflowInstance instance) {
final WorkflowProcess p = processRegistry.findProcsByInstanceId(instance.getId());
private boolean isNotRunning(final WorkflowConfiguration conf) {
final WorkflowProcess p = processRegistry.findProcsByConfigurationId(conf.getId());
if (p != null) {
switch (p.getStatus()) {

View File

@ -8,7 +8,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@ -33,25 +33,25 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
public final void execute(final Token token, final ExecutionCallback<Token> callback) {
try {
final WorkflowInstance instance = new WorkflowInstance();
instance.setId("CHILD_" + UUID.randomUUID());
instance.setParentId(process.getWfInstanceId());
instance.setDetails(new HashMap<>());
instance.setPriority(100);
instance.setDsId(process.getDsId());
instance.setDsName(process.getDsName());
instance.setApiId(process.getDsInterface());
instance.setEnabled(true);
instance.setConfigured(true);
instance.setSchedulingEnabled(false);
instance.setCronExpression("");
instance.setCronMinInterval(0);
instance.setWorkflow(wfId);
instance.setDestroyWf(null);
instance.setSystemParams(process.getGlobalParams());
instance.setUserParams(new HashMap<>());
final WorkflowConfiguration conf = new WorkflowConfiguration();
conf.setId("CHILD_" + UUID.randomUUID());
conf.setParentId(process.getWfConfId());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(process.getDsId());
conf.setDsName(process.getDsName());
conf.setApiId(process.getDsInterface());
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(wfId);
conf.setDestroyWf(null);
conf.setSystemParams(process.getGlobalParams());
conf.setUserParams(new HashMap<>());
final ExecutionStatus info = wfManagerService.startWorkflowInstance(instance, new ExecutionCallback<WorkflowProcess>() {
final ExecutionStatus info = wfManagerService.startWorkflowConfiguration(conf, new ExecutionCallback<WorkflowProcess>() {
@Override
public void onSuccess(final WorkflowProcess t) {
@ -70,7 +70,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [instance: " + instance.getId() + "] is starting with procId: " + info.getProcessId());
log.debug("The child workflow [conf: " + conf.getId() + "] is starting with procId: " + info.getProcessId());
}
token.setProgressMessage("Launched sub workflow, proc: " + info.getProcessId());

View File

@ -27,7 +27,7 @@ public class EmailSender {
public void sendMails(final WorkflowProcess proc) {
wfSubscriptionRepository.findByWfInstanceId(proc.getWfInstanceId()).forEach(s -> {
wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> {
if (s.getCondition() == NotificationCondition.ALWAYS ||
s.getCondition() == NotificationCondition.ONLY_FAILED && proc.getStatus() == Status.FAILURE ||
s.getCondition() == NotificationCondition.ONLY_SUCCESS && proc.getStatus() == Status.SUCCESS) {

View File

@ -13,7 +13,7 @@ import org.springframework.stereotype.Component;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
@ -32,16 +32,16 @@ public class ProcessFactory {
public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowGraph wfGraph,
final WorkflowInstance wfInstance,
final WorkflowConfiguration conf,
final ExecutionCallback<WorkflowProcess> callback) throws WorkflowManagerException {
final Map<String, String> globalParams = new HashMap<>();
globalParams.putAll(wfInstance.getSystemParams());
globalParams.putAll(wfInstance.getUserParams());
globalParams.putAll(conf.getSystemParams());
globalParams.putAll(conf.getUserParams());
final Graph graph = graphLoader.loadGraph(wfGraph, globalParams);
return new WorkflowProcess(generateProcessId(), wfMetadata, wfInstance, graph, globalParams, callback);
return new WorkflowProcess(generateProcessId(), wfMetadata, conf, graph, globalParams, callback);
}

View File

@ -4,7 +4,6 @@ import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
@ -13,7 +12,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
@Service
@ -21,7 +20,7 @@ public class ProcessRegistry {
private static final Log log = LogFactory.getLog(ProcessRegistry.class);
private final Map<String, WorkflowProcess> procs = new HashMap<>();
private final Map<String, WorkflowProcess> byInstanceId = new HashMap<>();
private final Map<String, WorkflowProcess> byConfId = new HashMap<>();
private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<>();
@ -47,11 +46,11 @@ public class ProcessRegistry {
return this.procs.values();
}
public WorkflowProcess findProcsByInstanceId(final String id) {
return this.byInstanceId.get(id);
public WorkflowProcess findProcsByConfigurationId(final String id) {
return this.byConfId.get(id);
}
public void registerProcess(final WorkflowProcess process, final WorkflowInstance wfInstance) throws WorkflowManagerException {
public void registerProcess(final WorkflowProcess process, final WorkflowConfiguration conf) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registerd process: " + process);
@ -62,7 +61,7 @@ public class ProcessRegistry {
}
this.procs.put(process.getId(), process);
this.byInstanceId.put(wfInstance.getId(), process);
this.byConfId.put(conf.getId(), process);
synchronized (this.pendingProcs) {
if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
@ -101,16 +100,14 @@ public class ProcessRegistry {
synchronized (this) {
final WorkflowProcess process = this.procs.remove(procId);
if (process != null) {
final Optional<String> instanceId = this.byInstanceId.entrySet()
this.byConfId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(e -> e.getKey())
.findFirst();
if (instanceId.isPresent()) {
this.byInstanceId.remove(instanceId, process);
}
.forEach(confId -> this.byConfId.remove(confId, process));
}
}
}
public WorkflowProcess nextProcessToStart() {

View File

@ -11,7 +11,7 @@ import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
@ -37,7 +37,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private final String id;
private final SimpleResource wfMetadata;
private final WorkflowInstance wfInstance;
private final WorkflowConfiguration wfConf;
private final Graph graph;
private final ExecutionCallback<WorkflowProcess> callback;
private final Env env;
@ -55,13 +55,13 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
public WorkflowProcess(
final String id,
final SimpleResource wfMetadata,
final WorkflowInstance wfInstance,
final WorkflowConfiguration wfConf,
final Graph graph,
final Map<String, String> globalParams,
final ExecutionCallback<WorkflowProcess> callback) {
this.id = id;
this.wfMetadata = wfMetadata;
this.wfInstance = wfInstance;
this.wfConf = wfConf;
this.graph = graph;
this.callback = callback;
this.status = Status.CREATED;
@ -75,7 +75,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public String getName() {
return wfInstance.getName();
return wfConf.getName();
}
public String getFamily() {
@ -86,28 +86,28 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return wfMetadata.getId();
}
public String getWfInstanceId() {
return wfInstance.getId();
public String getWfConfId() {
return wfConf.getId();
}
public String getParentId() {
return wfInstance.getParentId();
return wfConf.getParentId();
}
public int getPriority() {
return wfInstance.getPriority();
return wfConf.getPriority();
}
public String getDsId() {
return wfInstance.getId();
return wfConf.getId();
}
public String getDsName() {
return wfInstance.getDsName();
return wfConf.getDsName();
}
public String getDsInterface() {
return wfInstance.getApiId();
return wfConf.getApiId();
}
public Map<String, List<Token>> getPausedJoinNodeTokens() {
@ -235,7 +235,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
details.putAll(getOutputParams());
details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + getPriority());
details.put(WorkflowsConstants.LOG_WF_ID, getWfId());
details.put(WorkflowsConstants.LOG_WF_ID, getWfInstanceId());
details.put(WorkflowsConstants.LOG_WF_CONF_ID, getWfConfId());
if (getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, getError());

View File

@ -15,7 +15,7 @@ public class WorkflowsConstants {
public static final String LOG_WF_NAME = "system:wfName";
public static final String LOG_WF_ID = "system:wfId";
public static final String LOG_WF_INSTANCE_ID = "system:wfInstanceId";
public static final String LOG_WF_CONF_ID = "system:wfConfigurationId";
public static final String LOG_WF_FAMILY = "system:family";
public static final String LOG_WF_PRIORITY = "system:priority";