diff --git a/apps/dnet-is-application/pom.xml b/apps/dnet-is-application/pom.xml index 27cb2137..92bde558 100644 --- a/apps/dnet-is-application/pom.xml +++ b/apps/dnet-is-application/pom.xml @@ -20,6 +20,12 @@ ${project.version} + + eu.dnetlib.dhp + dnet-wf-service + ${project.version} + + eu.dnetlib.dhp dnet-data-services diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java index dc4b18f5..0993426b 100644 --- a/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java @@ -9,12 +9,13 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import eu.dnetlib.common.controller.AbstractDnetController; import eu.dnetlib.manager.history.WorkflowLogger; import eu.dnetlib.manager.history.model.WfProcessExecution; @RestController -@RequestMapping("/ajax/wfs") -public class WfHistoryAjaxController { +@RequestMapping("/ajax/wf_history") +public class WfHistoryAjaxController extends AbstractDnetController { @Autowired private WorkflowLogger logger; diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfInstancesController.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfInstancesController.java new file mode 100644 index 00000000..d8fc4096 --- /dev/null +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfInstancesController.java @@ -0,0 +1,30 @@ +package eu.dnetlib.manager.wf; + +import java.util.List; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import eu.dnetlib.common.controller.AbstractDnetController; +import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.utils.CountedValue; + +@RestController +@RequestMapping("/ajax/wf_instances") +public class WfInstancesController extends AbstractDnetController { + + private WorkflowManagerService wfManagerService; + + @GetMapping("/instance/{id}") + public WorkflowInstance getWfInstance(@PathVariable final String id) throws Exception { + return wfManagerService.findWorkflowInstance(id); + } + + @GetMapping("/families") + public List listWfFamilies() throws Exception { + return wfManagerService.families(); + } + +} diff --git a/frontends/dnet-is-application/src/app/common/is.service.ts b/frontends/dnet-is-application/src/app/common/is.service.ts index ed6297dc..339da869 100644 --- a/frontends/dnet-is-application/src/app/common/is.service.ts +++ b/frontends/dnet-is-application/src/app/common/is.service.ts @@ -99,7 +99,7 @@ export class ISService { if (from && from > 0) { params = params.append('from', from); } if (to && to > 0) { params = params.append('to', to); } - this.client.get('/ajax/wfs/', { params: params }).subscribe({ + this.client.get('/ajax/wf_history/', { params: params }).subscribe({ next: data => onSuccess(data), error: error => this.showError(error) }); diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java index dca1ef1c..0b6ff317 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java @@ -32,7 +32,7 @@ public class WorkflowInstance implements Serializable { private String id; @Type(type = "jsonb") - @Column(name = "data", columnDefinition = "jsonb") + @Column(name = "details", columnDefinition = "jsonb") private Map details = new LinkedHashMap<>(); @Column(name = "priority") @@ -206,4 +206,5 @@ public class WorkflowInstance implements Serializable { public void setParentId(final String parentId) { this.parentId = parentId; } + } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/CountedValue.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/CountedValue.java new file mode 100644 index 00000000..046e6c6d --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/CountedValue.java @@ -0,0 +1,13 @@ +package eu.dnetlib.utils; + +public interface CountedValue { + + public String getValue(); + + public void setValue(); + + public long getCount(); + + public void setCount(final long count); + +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java index cb477905..e30934d5 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java @@ -1,9 +1,19 @@ package eu.dnetlib.manager.wf.repository; +import java.util.List; + import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.utils.CountedValue; public interface WorkflowInstanceRepository extends JpaRepository { + @Query(value = "select r.subtype as value, count(*) as count " + + "from workflow_instances i join resources r on (i.workflow = r.id) " + + "group by r.subtype " + + "order by count desc;", nativeQuery = true) + List families(); + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java similarity index 86% rename from libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java rename to libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java index 996abe97..82d7964c 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java @@ -1,6 +1,7 @@ -package eu.dnetlib.manager.wf.workflows.procs; +package eu.dnetlib.manager.wf; import java.util.HashMap; +import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.concurrent.Executors; @@ -24,14 +25,19 @@ import eu.dnetlib.is.resource.repository.SimpleResourceRepository; import eu.dnetlib.manager.wf.model.WorkflowGraph; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; +import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine; +import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory; +import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; +import eu.dnetlib.utils.CountedValue; import eu.dnetlib.utils.Stoppable; import eu.dnetlib.utils.StoppableDetails; @Service -public class WorkflowExecutor implements Stoppable { +public class WorkflowManagerService implements Stoppable { - private static final Log log = LogFactory.getLog(WorkflowExecutor.class); + private static final Log log = LogFactory.getLog(WorkflowManagerService.class); @Autowired private ProcessRegistry processRegistry; @@ -63,6 +69,10 @@ public class WorkflowExecutor implements Stoppable { }, 10, 10, TimeUnit.SECONDS); } + public WorkflowInstance findWorkflowInstance(final String id) throws WorkflowManagerException { + return workflowInstanceRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + id)); + } + public String startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, @@ -111,8 +121,8 @@ public class WorkflowExecutor implements Stoppable { throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown"); } - final WorkflowInstance instance = - workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId)); + final WorkflowInstance instance = findWorkflowInstance(wfInstanceId); + return startWorkflowInstance(instance, onSuccess, onFail); } @@ -179,4 +189,9 @@ public class WorkflowExecutor implements Stoppable { public void setPaused(final boolean paused) { this.paused = paused; } + + public List families() { + return workflowInstanceRepository.families(); + } + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java index 28f927f0..11c02092 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java @@ -12,10 +12,10 @@ import org.springframework.scheduling.support.CronExpression; import org.springframework.stereotype.Service; import eu.dnetlib.manager.history.WorkflowLogger; +import eu.dnetlib.manager.wf.WorkflowManagerService; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; -import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; @Service @@ -24,7 +24,7 @@ public class ScheduledWorkflowLauncher { private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class); @Autowired - private WorkflowExecutor workflowExecutor; + private WorkflowManagerService wfManagerService; @Autowired private ProcessRegistry processRegistry; @@ -35,10 +35,10 @@ public class ScheduledWorkflowLauncher { @Autowired private WorkflowLogger logger; - @Value("${'dnet.workflow.scheduler.windowSize'}") + @Value("${dnet.workflow.scheduler.windowSize:1800000}") private int windowSize; // 1800000 are 30 minutes - @Scheduled(cron = "${'dnet.workflow.scheduler.cron'}") + @Scheduled(fixedRateString = "${dnet.workflow.scheduler.fixedRate:900000}") // 900000 are 5 minutes public void verifySheduledWorkflows() { log.debug("Verifying scheduled workflows - START"); @@ -51,7 +51,7 @@ public class ScheduledWorkflowLauncher { .filter(this::isReady) .forEach(instance -> { try { - workflowExecutor.startWorkflowInstance(instance, null, null); + wfManagerService.startWorkflowInstance(instance, null, null); } catch (final Exception e) { log.error("Error launching scheduled wf instance: " + instance.getId(), e); } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java index 663da422..0261f917 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java @@ -7,10 +7,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; +import eu.dnetlib.manager.wf.WorkflowManagerService; import eu.dnetlib.manager.wf.model.WorkflowInstance; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.Token; -import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; /** @@ -23,7 +23,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { private String wfId; @Autowired - private WorkflowExecutor executor; + private WorkflowManagerService wfManagerService; private WorkflowProcess process; @@ -49,7 +49,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { instance.setSystemParams(process.getGlobalParams()); instance.setUserParams(new HashMap<>()); - final String procId = executor.startWorkflowInstance(instance, (proc) -> { + final String procId = wfManagerService.startWorkflowInstance(instance, (proc) -> { log.debug("Child workflow has been completed successfully"); token.release(); }, (proc) -> { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java index fd1180b6..c9e0dbf2 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java @@ -25,7 +25,7 @@ public class ProcessRegistry { private final PriorityBlockingQueue pendingProcs = new PriorityBlockingQueue<>(); - @Value("${'dnet.wf.registry.size'}") + @Value("${dnet.wf.registry.size:100}") private int maxSize; synchronized public int countRunningWfs() { diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java index 35cf72ce..f50ffc00 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java @@ -7,6 +7,7 @@ import org.springframework.beans.BeansException; import org.springframework.beans.PropertyAccessorFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; import eu.dnetlib.errors.WorkflowManagerException; import eu.dnetlib.manager.wf.nodes.DefaultJobNode; @@ -17,9 +18,7 @@ import eu.dnetlib.manager.wf.workflows.procs.Env; import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; -/** - * Created by michele on 19/11/15. - */ +@Component public class NodeHelper implements ApplicationContextAware { public static final String beanNamePrefix = "wfNode";