dnet-applications/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java

156 lines
5.3 KiB
Java

package eu.dnetlib.manager.wf.workflows.procs;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.errors.DsmException;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.model.WorkflowDbEntry;
import eu.dnetlib.manager.wf.model.WorkflowInstance;
import eu.dnetlib.manager.wf.repository.WorkflowDbEntryRepository;
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails;
@Service
public class WorkflowExecutor implements Stoppable {
private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
@Autowired
private GraphLoader graphLoader;
@Autowired
private ProcessRegistry processRegistry;
@Autowired
private ProcessFactory processFactory;
@Autowired
private ProcessEngine processEngine;
@Autowired
private DsmService dsmService;
@Autowired
private WorkflowDbEntryRepository workflowDbEntryRepository;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
private boolean paused = false;
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
final WorkflowProcess process = processRegistry.nextProcessToStart();
if (process != null) {
processEngine.startProcess(process);
} else {
log.debug("WorkflowProcess queue is empty");
}
}, 10, 10, TimeUnit.SECONDS);
}
public String startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, final ProcessCallback processCallback, final String parent)
throws WorkflowManagerException {
if (isPaused()) {
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
try {
final String dsName = dsmService.getDs(dsId).getOfficialname();
final WorkflowInstance instance = new WorkflowInstance();
instance.setId("REPO_HI_" + UUID.randomUUID());
instance.setDetails(new HashMap<>());
instance.setPriority(100);
instance.setDsId(dsId);
instance.setDsName(dsName);
instance.setApiId(apiId);
instance.setEnabled(true);
instance.setConfigured(true);
instance.setSchedulingEnabled(false);
instance.setCronExpression("");
instance.setCronMinInterval(0);
instance.setWorkflow(wfId);
instance.setDestroyWf(null);
instance.setSystemParams(new HashMap<>());
instance.setUserParams(new HashMap<>());
return startWorkflowInstance(instance, processCallback, parent);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
}
public String startWorkflowInstance(final String wfInstanceId, final ProcessCallback processCallback, final String parent) throws Exception {
if (isPaused()) {
log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
final WorkflowInstance instance =
workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId));
return startWorkflowInstance(instance, processCallback, parent);
}
public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent)
throws WorkflowManagerException {
final WorkflowDbEntry wf = workflowDbEntryRepository.findById(instance.getWorkflow())
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow()));
if (!instance.isEnabled() || !instance.isConfigured()) {
log.warn("Wf instance " + instance.getId() + " not launched, because it is not ready to start or it is disabled");
throw new WorkflowManagerException("Workflow " + instance.getId() + " is not ready to start");
}
final WorkflowProcess process =
processFactory.newProcess(wf, instance, processCallback, parent);
return processRegistry.registerProcess(process, instance.getId());
}
@Override
public void stop() {
paused = true;
}
@Override
public void resume() {
paused = false;
}
@Override
public StoppableDetails getStopDetails() {
final int count = processRegistry.countRunningWfs();
final StoppableDetails.StopStatus status =
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
}
public ProcessRegistry getProcessRegistry() {
return processRegistry;
}
public boolean isPaused() {
return paused;
}
public void setPaused(final boolean paused) {
this.paused = paused;
}
}