This commit is contained in:
Michele Artini 2023-03-10 15:20:49 +01:00
parent 34f6e2977b
commit 630b4c282d
12 changed files with 96 additions and 21 deletions

View File

@ -20,6 +20,12 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-wf-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-data-services</artifactId> <artifactId>dnet-data-services</artifactId>

View File

@ -9,12 +9,13 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.manager.history.WorkflowLogger; import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.history.model.WfProcessExecution; import eu.dnetlib.manager.history.model.WfProcessExecution;
@RestController @RestController
@RequestMapping("/ajax/wfs") @RequestMapping("/ajax/wf_history")
public class WfHistoryAjaxController { public class WfHistoryAjaxController extends AbstractDnetController {
@Autowired @Autowired
private WorkflowLogger logger; private WorkflowLogger logger;

View File

@ -0,0 +1,30 @@
package eu.dnetlib.manager.wf;
import java.util.List;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.utils.CountedValue;
@RestController
@RequestMapping("/ajax/wf_instances")
public class WfInstancesController extends AbstractDnetController {
private WorkflowManagerService wfManagerService;
@GetMapping("/instance/{id}")
public WorkflowInstance getWfInstance(@PathVariable final String id) throws Exception {
return wfManagerService.findWorkflowInstance(id);
}
@GetMapping("/families")
public List<CountedValue> listWfFamilies() throws Exception {
return wfManagerService.families();
}
}

View File

@ -99,7 +99,7 @@ export class ISService {
if (from && from > 0) { params = params.append('from', from); } if (from && from > 0) { params = params.append('from', from); }
if (to && to > 0) { params = params.append('to', to); } if (to && to > 0) { params = params.append('to', to); }
this.client.get<WfHistoryEntry[]>('/ajax/wfs/', { params: params }).subscribe({ this.client.get<WfHistoryEntry[]>('/ajax/wf_history/', { params: params }).subscribe({
next: data => onSuccess(data), next: data => onSuccess(data),
error: error => this.showError(error) error: error => this.showError(error)
}); });

View File

@ -32,7 +32,7 @@ public class WorkflowInstance implements Serializable {
private String id; private String id;
@Type(type = "jsonb") @Type(type = "jsonb")
@Column(name = "data", columnDefinition = "jsonb") @Column(name = "details", columnDefinition = "jsonb")
private Map<String, String> details = new LinkedHashMap<>(); private Map<String, String> details = new LinkedHashMap<>();
@Column(name = "priority") @Column(name = "priority")
@ -206,4 +206,5 @@ public class WorkflowInstance implements Serializable {
public void setParentId(final String parentId) { public void setParentId(final String parentId) {
this.parentId = parentId; this.parentId = parentId;
} }
} }

View File

@ -0,0 +1,13 @@
package eu.dnetlib.utils;
public interface CountedValue {
public String getValue();
public void setValue();
public long getCount();
public void setCount(final long count);
}

View File

@ -1,9 +1,19 @@
package eu.dnetlib.manager.wf.repository; package eu.dnetlib.manager.wf.repository;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.utils.CountedValue;
public interface WorkflowInstanceRepository extends JpaRepository<WorkflowInstance, String> { public interface WorkflowInstanceRepository extends JpaRepository<WorkflowInstance, String> {
@Query(value = "select r.subtype as value, count(*) as count "
+ "from workflow_instances i join resources r on (i.workflow = r.id) "
+ "group by r.subtype "
+ "order by count desc;", nativeQuery = true)
List<CountedValue> families();
} }

View File

@ -1,6 +1,7 @@
package eu.dnetlib.manager.wf.workflows.procs; package eu.dnetlib.manager.wf;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -24,14 +25,19 @@ import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.CountedValue;
import eu.dnetlib.utils.Stoppable; import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails; import eu.dnetlib.utils.StoppableDetails;
@Service @Service
public class WorkflowExecutor implements Stoppable { public class WorkflowManagerService implements Stoppable {
private static final Log log = LogFactory.getLog(WorkflowExecutor.class); private static final Log log = LogFactory.getLog(WorkflowManagerService.class);
@Autowired @Autowired
private ProcessRegistry processRegistry; private ProcessRegistry processRegistry;
@ -63,6 +69,10 @@ public class WorkflowExecutor implements Stoppable {
}, 10, 10, TimeUnit.SECONDS); }, 10, 10, TimeUnit.SECONDS);
} }
public WorkflowInstance findWorkflowInstance(final String id) throws WorkflowManagerException {
return workflowInstanceRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + id));
}
public String startRepoHiWorkflow(final String wfId, public String startRepoHiWorkflow(final String wfId,
final String dsId, final String dsId,
final String apiId, final String apiId,
@ -111,8 +121,8 @@ public class WorkflowExecutor implements Stoppable {
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown"); throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
} }
final WorkflowInstance instance = final WorkflowInstance instance = findWorkflowInstance(wfInstanceId);
workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId));
return startWorkflowInstance(instance, onSuccess, onFail); return startWorkflowInstance(instance, onSuccess, onFail);
} }
@ -179,4 +189,9 @@ public class WorkflowExecutor implements Stoppable {
public void setPaused(final boolean paused) { public void setPaused(final boolean paused) {
this.paused = paused; this.paused = paused;
} }
public List<CountedValue> families() {
return workflowInstanceRepository.families();
}
} }

View File

@ -12,10 +12,10 @@ import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import eu.dnetlib.manager.history.WorkflowLogger; import eu.dnetlib.manager.history.WorkflowLogger;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
@Service @Service
@ -24,7 +24,7 @@ public class ScheduledWorkflowLauncher {
private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class); private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
@Autowired @Autowired
private WorkflowExecutor workflowExecutor; private WorkflowManagerService wfManagerService;
@Autowired @Autowired
private ProcessRegistry processRegistry; private ProcessRegistry processRegistry;
@ -35,10 +35,10 @@ public class ScheduledWorkflowLauncher {
@Autowired @Autowired
private WorkflowLogger logger; private WorkflowLogger logger;
@Value("${'dnet.workflow.scheduler.windowSize'}") @Value("${dnet.workflow.scheduler.windowSize:1800000}")
private int windowSize; // 1800000 are 30 minutes private int windowSize; // 1800000 are 30 minutes
@Scheduled(cron = "${'dnet.workflow.scheduler.cron'}") @Scheduled(fixedRateString = "${dnet.workflow.scheduler.fixedRate:900000}") // 900000 are 5 minutes
public void verifySheduledWorkflows() { public void verifySheduledWorkflows() {
log.debug("Verifying scheduled workflows - START"); log.debug("Verifying scheduled workflows - START");
@ -51,7 +51,7 @@ public class ScheduledWorkflowLauncher {
.filter(this::isReady) .filter(this::isReady)
.forEach(instance -> { .forEach(instance -> {
try { try {
workflowExecutor.startWorkflowInstance(instance, null, null); wfManagerService.startWorkflowInstance(instance, null, null);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error launching scheduled wf instance: " + instance.getId(), e); log.error("Error launching scheduled wf instance: " + instance.getId(), e);
} }

View File

@ -7,10 +7,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.WorkflowManagerService;
import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token; import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
/** /**
@ -23,7 +23,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
private String wfId; private String wfId;
@Autowired @Autowired
private WorkflowExecutor executor; private WorkflowManagerService wfManagerService;
private WorkflowProcess process; private WorkflowProcess process;
@ -49,7 +49,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
instance.setSystemParams(process.getGlobalParams()); instance.setSystemParams(process.getGlobalParams());
instance.setUserParams(new HashMap<>()); instance.setUserParams(new HashMap<>());
final String procId = executor.startWorkflowInstance(instance, (proc) -> { final String procId = wfManagerService.startWorkflowInstance(instance, (proc) -> {
log.debug("Child workflow has been completed successfully"); log.debug("Child workflow has been completed successfully");
token.release(); token.release();
}, (proc) -> { }, (proc) -> {

View File

@ -25,7 +25,7 @@ public class ProcessRegistry {
private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<>(); private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<>();
@Value("${'dnet.wf.registry.size'}") @Value("${dnet.wf.registry.size:100}")
private int maxSize; private int maxSize;
synchronized public int countRunningWfs() { synchronized public int countRunningWfs() {

View File

@ -7,6 +7,7 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyAccessorFactory; import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.nodes.DefaultJobNode; import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
@ -17,9 +18,7 @@ import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
/** @Component
* Created by michele on 19/11/15.
*/
public class NodeHelper implements ApplicationContextAware { public class NodeHelper implements ApplicationContextAware {
public static final String beanNamePrefix = "wfNode"; public static final String beanNamePrefix = "wfNode";