ajax methods for wfs

This commit is contained in:
Michele Artini 2023-03-21 15:28:18 +01:00
parent 8c3ae35dc4
commit f168ca9a2b
17 changed files with 228 additions and 67 deletions

View File

@ -19,8 +19,8 @@ import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.manager.history.model.WfProcessExecution;
import eu.dnetlib.manager.history.repository.WfProcessExecutionRepository;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
import eu.dnetlib.manager.history.repository.WfHistoryEntryRepository;
@Service
public class WfHistoryImporter {
@ -28,7 +28,7 @@ public class WfHistoryImporter {
private static final Log log = LogFactory.getLog(WfHistoryImporter.class);
@Autowired
private WfProcessExecutionRepository wfProcessExecutionRepository;
private WfHistoryEntryRepository wfHistoryEntryRepository;
public void load(final String path) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
@ -40,7 +40,7 @@ public class WfHistoryImporter {
private void saveWf(final JsonNode node) {
final WfProcessExecution wf = new WfProcessExecution();
final WfHistoryEntry wf = new WfHistoryEntry();
wf.setProcessId(node.get("system:processId").asText());
wf.setName(node.get("system:wfName").asText());
wf.setFamily(node.get("system:profileFamily").asText());
@ -77,7 +77,7 @@ public class WfHistoryImporter {
wf.setDetails(details);
wfProcessExecutionRepository.save(wf);
wfHistoryEntryRepository.save(wf);
log.info("Wf saved with id: " + wf.getProcessId());

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.history.model.WfProcessExecution;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
@RestController
@RequestMapping("/ajax/wf_history")
@ -21,7 +21,7 @@ public class WfHistoryAjaxController extends AbstractDnetController {
private WorkflowLogger logger;
@GetMapping("/")
public List<WfProcessExecution> history(
public List<WfHistoryEntry> history(
@RequestParam(required = true) final int total,
@RequestParam(required = false) final Long from,
@RequestParam(required = false) final Long to) {
@ -29,8 +29,8 @@ public class WfHistoryAjaxController extends AbstractDnetController {
}
@GetMapping("/{processId}")
public WfProcessExecution getProcessExecution(@PathVariable final String processId) {
return logger.getProcessExecution(processId);
public WfHistoryEntry getProcessExecution(@PathVariable final String processId) {
return logger.getLog(processId);
}
}

View File

@ -3,8 +3,10 @@ package eu.dnetlib.manager.wf;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@ -12,6 +14,9 @@ import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.is.info.KeyValue;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowSubscription;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
@RestController
@RequestMapping("/ajax/wf_instances")
@ -19,9 +24,11 @@ public class WfInstancesController extends AbstractDnetController {
private WorkflowManagerService wfManagerService;
@GetMapping("/instance/{id}")
public WorkflowInstance getWfInstance(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowInstance(id);
@GetMapping("/sections")
public List<KeyValue<Long>> listWfFamilies() throws Exception {
return wfManagerService.streamSections()
.map(x -> new KeyValue<>(x.getValue(), x.getCount()))
.collect(Collectors.toList());
}
@GetMapping("/search")
@ -31,11 +38,43 @@ public class WfInstancesController extends AbstractDnetController {
.collect(Collectors.toList());
}
@GetMapping("/sections")
public List<KeyValue<Long>> listWfFamilies() throws Exception {
return wfManagerService.streamSections()
.map(x -> new KeyValue<>(x.getValue(), x.getCount()))
.collect(Collectors.toList());
@GetMapping("/instance/{id}")
public WorkflowInstance getWfInstance(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowInstance(id);
}
@PostMapping("/instance")
public void saveWfInstance(@RequestBody final WorkflowInstance instance) throws Exception {
wfManagerService.saveWfInstance(instance);
}
@DeleteMapping("/instance/{id}")
public void deleteWfInstance(@PathVariable final String id) throws Exception {
wfManagerService.deleteWfInstance(id);
}
@GetMapping("/instance/{id}/start")
public ExecutionStatus startWorkflowInstance(@PathVariable final String id) throws Exception {
return wfManagerService.startWorkflowInstance(id, null, null);
}
@GetMapping("/process/{id}")
public ExecutionStatus findProcess(@PathVariable final String id) throws Exception {
return wfManagerService.findProcess(id);
}
@DeleteMapping("/process/{id}")
public void killProcess(@PathVariable final String id) throws Exception {
wfManagerService.killProcess(id);
}
@GetMapping("/instance/{id}/subscriptions")
public List<WorkflowSubscription> listWorkflowSubscriptions(@PathVariable final String id) throws Exception {
return wfManagerService.listSubscriptions(id);
}
@PostMapping("/instance/{id}/subscriptions")
public void saveWorkflowSubscriptions(@PathVariable final String id, @RequestBody final List<WorkflowSubscription> subscriptions) throws Exception {
wfManagerService.saveSubscriptions(id, subscriptions);
}
}

View File

@ -8,7 +8,7 @@ export interface ResourceType {
export interface KeyValue {
k: string;
v: string;
v: any;
}
export interface BrowseTerm {
@ -225,3 +225,11 @@ export interface WfInstance {
systemParams: Map<string, string>,
userParams: Map<string, string>
}
export interface WfInstanceSubscription {
}
export interface WfProcessStatus {
}

View File

@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Page, DsmConf, ResourceType, Protocol, WfHistoryEntry, SimpleResource, Context, ContextNode, Vocabulary, VocabularyTerm, KeyValue, BrowseTerm, Datasource, MDStore, MDStoreVersion, MDStoreRecord, EmailTemplate } from './is.model';
import { Page, DsmConf, ResourceType, Protocol, WfHistoryEntry, SimpleResource, Context, ContextNode, Vocabulary, VocabularyTerm, KeyValue, BrowseTerm, Datasource, MDStore, MDStoreVersion, MDStoreRecord, EmailTemplate, WfInstance, WfInstanceSubscription, WfProcessStatus } from './is.model';
import { FormGroup } from '@angular/forms';
import { MatSnackBar } from '@angular/material/snack-bar';
@ -330,7 +330,7 @@ export class ISService {
}
loadEmailTemplates(onSuccess: Function): void {
this.client.get<void>('./ajax/templates/email/').subscribe({
this.client.get<EmailTemplate[]>('./ajax/templates/email/').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
@ -351,26 +351,75 @@ export class ISService {
}
loadWfIntancesSections(onSuccess: Function): void {
this.client.get<void>('./ajax/wf_instances/sections').subscribe({
this.client.get<KeyValue[]>('./ajax/wf_instances/sections').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
loadWfIntances(section: string, onSuccess: Function): void {
this.client.get<void>('./ajax/wf_instances/search?section=' + encodeURIComponent(section)).subscribe({
this.client.get<KeyValue[]>('./ajax/wf_instances/search?section=' + encodeURIComponent(section)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
loadWfIntance(id: string, onSuccess: Function): void {
this.client.get<void>('./ajax/wf_instances/instance/' + encodeURIComponent(id)).subscribe({
this.client.get<WfInstance>('./ajax/wf_instances/instance/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
saveWfIntance(instance: WfInstance, onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wf_instances/instance', instance).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error, relatedForm)
});
}
deleteWfIntance(id: string, onSuccess: Function): void {
this.client.delete<void>('./ajax/wf_instances/instance/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
startWfIntance(id: string, onSuccess: Function): void {
this.client.get<WfProcessStatus>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/start').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
findProcess(id: string, onSuccess: Function): void {
this.client.get<WfProcessStatus>('./ajax/wf_instances/process/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
killProcess(id: string, onSuccess: Function): void {
this.client.delete<void>('./ajax/wf_instances/process/' + encodeURIComponent(id)).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
findWfInstanceSubscriptions(id: string, onSuccess: Function): void {
this.client.get<WfInstanceSubscription[]>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/subscriptions').subscribe({
next: data => onSuccess(data),
error: error => this.showError(error)
});
}
saveWfInstanceSubscriptions(id: string, subscriptions: WfInstanceSubscription[], onSuccess: Function, relatedForm?: FormGroup): void {
this.client.post<void>('./ajax/wf_instances/instance/' + encodeURIComponent(id) + '/subscriptions', subscriptions).subscribe({
next: data => onSuccess(data),
error: error => this.showError(error, relatedForm)
});
}
private showError(error: any, form?: FormGroup) {
console.log(error);

View File

@ -22,7 +22,7 @@ import com.vladmihalcea.hibernate.type.json.JsonStringType;
@TypeDef(name = "json", typeClass = JsonStringType.class),
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
})
public class WfProcessExecution implements Serializable {
public class WfHistoryEntry implements Serializable {
private static final long serialVersionUID = -326994850248506828L;

View File

@ -11,18 +11,18 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import eu.dnetlib.manager.history.model.WfProcessExecution;
import eu.dnetlib.manager.history.repository.WfProcessExecutionRepository;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
import eu.dnetlib.manager.history.repository.WfHistoryEntryRepository;
@Service
public class WorkflowLogger {
@Autowired
private WfProcessExecutionRepository wfProcessExecutionRepository;
private WfHistoryEntryRepository wfHistoryEntryRepository;
public List<WfProcessExecution> history(final int total, final Long from, final Long to) {
public List<WfHistoryEntry> history(final int total, final Long from, final Long to) {
if (from == null && to == null) {
return wfProcessExecutionRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList();
return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList();
} else {
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
@ -32,20 +32,20 @@ public class WorkflowLogger {
.getDefault()
.toZoneId()) : LocalDateTime.MAX;
return wfProcessExecutionRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
}
}
public WfProcessExecution getProcessExecution(final String processId) {
return wfProcessExecutionRepository.findById(processId).get();
public WfHistoryEntry getLog(final String processId) {
return wfHistoryEntryRepository.findById(processId).get();
}
public void saveProcessExecution(final WfProcessExecution pe) {
wfProcessExecutionRepository.save(pe);
public void saveLog(final WfHistoryEntry pe) {
wfHistoryEntryRepository.save(pe);
}
public Optional<WfProcessExecution> getLastExecutionForInstance(final String id) {
return wfProcessExecutionRepository.findOneByWfInstanceIdOrderByEndDateAsc(id);
public Optional<WfHistoryEntry> getLastLogForInstance(final String id) {
return wfHistoryEntryRepository.findOneByWfInstanceIdOrderByEndDateAsc(id);
}
}

View File

@ -0,0 +1,16 @@
package eu.dnetlib.manager.history.repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
public interface WfHistoryEntryRepository extends JpaRepository<WfHistoryEntry, String> {
List<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end);
Optional<WfHistoryEntry> findOneByWfInstanceIdOrderByEndDateAsc(String id);
}

View File

@ -1,16 +0,0 @@
package eu.dnetlib.manager.history.repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import eu.dnetlib.manager.history.model.WfProcessExecution;
public interface WfProcessExecutionRepository extends JpaRepository<WfProcessExecution, String> {
List<WfProcessExecution> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end);
Optional<WfProcessExecution> findOneByWfInstanceIdOrderByEndDateAsc(String id);
}

View File

@ -10,4 +10,6 @@ import eu.dnetlib.manager.wf.model.WorkflowSubscriptionPK;
public interface WorkflowSubscriptionRepository extends JpaRepository<WorkflowSubscription, WorkflowSubscriptionPK> {
List<WorkflowSubscription> findByWfInstanceId(String wfInstanceId);
void deleteByWfInstanceId(String wfInstanceId);
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.manager.wf;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
@ -8,6 +9,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -23,7 +25,10 @@ import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.model.WorkflowSubscription;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
@ -52,6 +57,8 @@ public class WorkflowManagerService implements Stoppable {
private SimpleResourceRepository simpleResourceRepository;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
@Autowired
private WorkflowSubscriptionRepository workflowSubscriptionRepository;
private boolean paused = false;
@ -73,7 +80,7 @@ public class WorkflowManagerService implements Stoppable {
return workflowInstanceRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + id));
}
public String startRepoHiWorkflow(final String wfId,
public ExecutionStatus startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final ExecutionCallback<WorkflowProcess> callback)
@ -110,7 +117,7 @@ public class WorkflowManagerService implements Stoppable {
}
}
public String startWorkflowInstance(final String wfInstanceId,
public ExecutionStatus startWorkflowInstance(final String wfInstanceId,
final String parent,
final ExecutionCallback<WorkflowProcess> callback) throws Exception {
@ -124,7 +131,7 @@ public class WorkflowManagerService implements Stoppable {
return startWorkflowInstance(instance, callback);
}
public String startWorkflowInstance(final WorkflowInstance wfInstance,
public ExecutionStatus startWorkflowInstance(final WorkflowInstance wfInstance,
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
@ -152,7 +159,9 @@ public class WorkflowManagerService implements Stoppable {
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, callback);
return processRegistry.registerProcess(process, wfInstance);
processRegistry.registerProcess(process, wfInstance);
return process.getExecutionStatus();
}
@Override
@ -195,4 +204,33 @@ public class WorkflowManagerService implements Stoppable {
return workflowInstanceRepository.findBySection(section);
}
public void saveWfInstance(final WorkflowInstance instance) {
workflowInstanceRepository.save(instance);
}
@Transactional
public void deleteWfInstance(final String id) {
workflowSubscriptionRepository.deleteByWfInstanceId(id);
workflowInstanceRepository.deleteById(id);
}
public List<WorkflowSubscription> listSubscriptions(final String id) {
return workflowSubscriptionRepository.findByWfInstanceId(id);
}
@Transactional
public void saveSubscriptions(final String instanceId, final List<WorkflowSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfInstanceId(instanceId));
workflowSubscriptionRepository.deleteByWfInstanceId(instanceId);
workflowSubscriptionRepository.saveAll(subscriptions);
}
public ExecutionStatus findProcess(final String procId) {
return processRegistry.findProcess(procId).getExecutionStatus();
}
public void killProcess(final String procId) {
processRegistry.findProcess(procId).kill();
}
}

View File

@ -95,7 +95,7 @@ public class ScheduledWorkflowLauncher {
}
private LocalDateTime calculateLastExecutionDate(final String id) {
return logger.getLastExecutionForInstance(id)
return logger.getLastLogForInstance(id)
.map(e -> e.getEndDate())
.orElse(LocalDateTime.MIN);
}

View File

@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@ -50,7 +51,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
instance.setSystemParams(process.getGlobalParams());
instance.setUserParams(new HashMap<>());
final String procId = wfManagerService.startWorkflowInstance(instance, new ExecutionCallback<WorkflowProcess>() {
final ExecutionStatus info = wfManagerService.startWorkflowInstance(instance, new ExecutionCallback<WorkflowProcess>() {
@Override
public void onSuccess(final WorkflowProcess t) {
@ -69,10 +70,10 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [instance: " + instance.getId() + "] is starting with procId: " + procId);
log.debug("The child workflow [instance: " + instance.getId() + "] is starting with procId: " + info.getProcessId());
}
token.setProgressMessage("Launched sub workflow, proc: " + procId);
token.setProgressMessage("Launched sub workflow, proc: " + info.getProcessId());
} catch (final Throwable e) {
log.error("got exception while launching child workflow", e);

View File

@ -0,0 +1,21 @@
package eu.dnetlib.manager.wf.workflows.procs;
import java.io.Serializable;
public class ExecutionStatus implements Serializable {
private static final long serialVersionUID = 9033354768530468698L;
private String processId;
// TODO: complete with the required info
public String getProcessId() {
return processId;
}
public void setProcessId(final String processId) {
this.processId = processId;
}
}

View File

@ -130,7 +130,7 @@ public class ProcessEngine {
private void completeProcess(final WorkflowProcess process, final Token token) {
token.checkStatus();
process.complete(token);
wfLogger.saveProcessExecution(process.asLog());
wfLogger.saveLog(process.asLog());
emailSender.sendMails(process);
}

View File

@ -51,7 +51,7 @@ public class ProcessRegistry {
return this.byInstanceId.get(id);
}
public String registerProcess(final WorkflowProcess process, final WorkflowInstance wfInstance) throws WorkflowManagerException {
public void registerProcess(final WorkflowProcess process, final WorkflowInstance wfInstance) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registerd process: " + process);
@ -73,8 +73,6 @@ public class ProcessRegistry {
log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority());
}
return process.getId();
}
private void removeOldestProcess() {

View File

@ -10,7 +10,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.manager.history.model.WfProcessExecution;
import eu.dnetlib.manager.history.model.WfHistoryEntry;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
@ -230,7 +230,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public WfProcessExecution asLog() {
public WfHistoryEntry asLog() {
final Map<String, String> details = new LinkedHashMap<>();
details.putAll(getOutputParams());
details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + getPriority());
@ -242,7 +242,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, getErrorStacktrace());
}
final WfProcessExecution pe = new WfProcessExecution();
final WfHistoryEntry pe = new WfHistoryEntry();
pe.setProcessId(getId());
pe.setName(getName());
pe.setFamily(getFamily());
@ -260,4 +260,9 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
return pe;
}
public ExecutionStatus getExecutionStatus() {
// TODO Auto-generated method stub
return null;
}
}