180 lines
6.1 KiB
Java
180 lines
6.1 KiB
Java
package eu.dnetlib.manager.wf.workflows.procs;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import eu.dnetlib.dsm.DsmService;
|
|
import eu.dnetlib.errors.DsmException;
|
|
import eu.dnetlib.errors.WorkflowManagerException;
|
|
import eu.dnetlib.is.model.resource.SimpleResource;
|
|
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
|
|
import eu.dnetlib.manager.wf.model.GraphNode;
|
|
import eu.dnetlib.manager.wf.model.WorkflowInstance;
|
|
import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository;
|
|
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
|
|
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
|
|
import eu.dnetlib.utils.Stoppable;
|
|
import eu.dnetlib.utils.StoppableDetails;
|
|
|
|
@Service
|
|
public class WorkflowExecutor implements Stoppable {
|
|
|
|
private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
|
|
|
|
@Autowired
|
|
private ProcessRegistry processRegistry;
|
|
@Autowired
|
|
private ProcessFactory processFactory;
|
|
@Autowired
|
|
private ProcessEngine processEngine;
|
|
@Autowired
|
|
private DsmService dsmService;
|
|
|
|
@Autowired
|
|
private SimpleResourceRepository simpleResourceRepository;
|
|
@Autowired
|
|
private WorkflowInstanceRepository workflowInstanceRepository;
|
|
|
|
private boolean paused = false;
|
|
|
|
@PostConstruct
|
|
public void init() {
|
|
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
|
|
if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
|
|
|
|
final WorkflowProcess process = processRegistry.nextProcessToStart();
|
|
if (process != null) {
|
|
processEngine.startProcess(process);
|
|
} else {
|
|
log.debug("WorkflowProcess queue is empty");
|
|
}
|
|
}, 10, 10, TimeUnit.SECONDS);
|
|
}
|
|
|
|
public String startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, final ProcessCallback processCallback, final String parent)
|
|
throws WorkflowManagerException {
|
|
|
|
if (isPaused()) {
|
|
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
|
|
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
|
|
}
|
|
|
|
try {
|
|
final String dsName = dsmService.getDs(dsId).getOfficialname();
|
|
|
|
final WorkflowInstance instance = new WorkflowInstance();
|
|
instance.setId("REPO_HI_" + UUID.randomUUID());
|
|
instance.setDetails(new HashMap<>());
|
|
instance.setPriority(100);
|
|
instance.setDsId(dsId);
|
|
instance.setDsName(dsName);
|
|
instance.setApiId(apiId);
|
|
instance.setEnabled(true);
|
|
instance.setConfigured(true);
|
|
instance.setSchedulingEnabled(false);
|
|
instance.setCronExpression("");
|
|
instance.setCronMinInterval(0);
|
|
instance.setWorkflow(wfId);
|
|
instance.setDestroyWf(null);
|
|
instance.setSystemParams(new HashMap<>());
|
|
instance.setUserParams(new HashMap<>());
|
|
|
|
return startWorkflowInstance(instance, processCallback, parent);
|
|
} catch (final DsmException e) {
|
|
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
|
|
}
|
|
}
|
|
|
|
public String startWorkflowInstance(final String wfInstanceId, final ProcessCallback processCallback, final String parent) throws Exception {
|
|
|
|
if (isPaused()) {
|
|
log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown");
|
|
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
|
|
}
|
|
|
|
final WorkflowInstance instance =
|
|
workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId));
|
|
return startWorkflowInstance(instance, processCallback, parent);
|
|
}
|
|
|
|
public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent)
|
|
throws WorkflowManagerException {
|
|
|
|
if (!instance.isEnabled() || !instance.isConfigured()) {
|
|
log.warn("Wf instance " + instance.getId() + " is not ready to start");
|
|
throw new WorkflowManagerException("Wf instance " + instance.getId() + " is not ready to start");
|
|
}
|
|
|
|
final SimpleResource wf = simpleResourceRepository
|
|
.findById(instance.getWorkflow())
|
|
.filter(r -> r.getType().equals("workflows"))
|
|
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow()));
|
|
|
|
final List<GraphNode> graph = simpleResourceRepository.findContentById(wf.getId())
|
|
.map(s -> {
|
|
final ObjectMapper mapper = new ObjectMapper();
|
|
final TypeReference<List<GraphNode>> type = new TypeReference<List<GraphNode>>() {};
|
|
final List<GraphNode> list = new ArrayList<>();
|
|
try {
|
|
list.addAll(mapper.readValue(s, type));
|
|
} catch (final Exception e) {
|
|
log.error("Error parsing json", e);
|
|
}
|
|
return list;
|
|
})
|
|
.filter(list -> !list.isEmpty())
|
|
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wf.getId()));
|
|
|
|
final WorkflowProcess process =
|
|
processFactory.newProcess(wf.getId(), wf.getName(), wf.getSubtype(), graph, instance, processCallback, parent);
|
|
|
|
return processRegistry.registerProcess(process, instance.getId());
|
|
}
|
|
|
|
@Override
|
|
public void stop() {
|
|
paused = true;
|
|
}
|
|
|
|
@Override
|
|
public void resume() {
|
|
paused = false;
|
|
}
|
|
|
|
@Override
|
|
public StoppableDetails getStopDetails() {
|
|
final int count = processRegistry.countRunningWfs();
|
|
|
|
final StoppableDetails.StopStatus status =
|
|
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
|
|
|
|
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
|
|
}
|
|
|
|
public ProcessRegistry getProcessRegistry() {
|
|
return processRegistry;
|
|
}
|
|
|
|
public boolean isPaused() {
|
|
return paused;
|
|
}
|
|
|
|
public void setPaused(final boolean paused) {
|
|
this.paused = paused;
|
|
}
|
|
}
|