dnet-applications/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java

185 lines
5.9 KiB
Java

package eu.dnetlib.manager.wf.workflows.procs;
import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.errors.DsmException;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowGraph;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails;
@Service
public class WorkflowExecutor implements Stoppable {
private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
@Autowired
private ProcessRegistry processRegistry;
@Autowired
private ProcessFactory processFactory;
@Autowired
private ProcessEngine processEngine;
@Autowired
private DsmService dsmService;
@Autowired
private SimpleResourceRepository simpleResourceRepository;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
private boolean paused = false;
@PostConstruct
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
final WorkflowProcess process = processRegistry.nextProcessToStart();
if (process != null) {
processEngine.startProcess(process);
} else {
log.debug("WorkflowProcess queue is empty");
}
}, 10, 10, TimeUnit.SECONDS);
}
public String startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
throws WorkflowManagerException {
if (isPaused()) {
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
try {
final String dsName = dsmService.getDs(dsId).getOfficialname();
final WorkflowInstance instance = new WorkflowInstance();
instance.setId("REPO_HI_" + UUID.randomUUID());
instance.setDetails(new HashMap<>());
instance.setPriority(100);
instance.setDsId(dsId);
instance.setDsName(dsName);
instance.setApiId(apiId);
instance.setEnabled(true);
instance.setConfigured(true);
instance.setSchedulingEnabled(false);
instance.setCronExpression("");
instance.setCronMinInterval(0);
instance.setWorkflow(wfId);
instance.setDestroyWf(null);
instance.setSystemParams(new HashMap<>());
instance.setUserParams(new HashMap<>());
return startWorkflowInstance(instance, parent, onSuccess, onFail);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
}
public String startWorkflowInstance(final String wfInstanceId,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail) throws Exception {
if (isPaused()) {
log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
final WorkflowInstance instance =
workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId));
return startWorkflowInstance(instance, parent, onSuccess, onFail);
}
public String startWorkflowInstance(final WorkflowInstance wfInstance,
final String parent,
final Consumer<WorkflowProcess> onSuccess,
final Consumer<WorkflowProcess> onFail)
throws WorkflowManagerException {
if (!wfInstance.isEnabled() || !wfInstance.isConfigured()) {
log.warn("Wf instance " + wfInstance.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf instance " + wfInstance.getId() + " is not ready to start");
}
final SimpleResource wfMetadata = simpleResourceRepository
.findById(wfInstance.getWorkflow())
.filter(r -> r.getType().equals("workflows"))
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + wfInstance.getWorkflow()));
final WorkflowGraph wfGraph = simpleResourceRepository.findContentById(wfMetadata.getId())
.map(s -> {
try {
return new ObjectMapper().readValue(s, WorkflowGraph.class);
} catch (final Exception e) {
return (WorkflowGraph) null;
}
})
.filter(Objects::nonNull)
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfGraph, wfInstance, parent, onSuccess, onFail);
return processRegistry.registerProcess(process, wfInstance);
}
@Override
public void stop() {
paused = true;
}
@Override
public void resume() {
paused = false;
}
@Override
public StoppableDetails getStopDetails() {
final int count = processRegistry.countRunningWfs();
final StoppableDetails.StopStatus status =
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
}
public ProcessRegistry getProcessRegistry() {
return processRegistry;
}
public boolean isPaused() {
return paused;
}
public void setPaused(final boolean paused) {
this.paused = paused;
}
}