dnet-applications/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WorkflowManagerService.java

251 lines
8.2 KiB
Java

package eu.dnetlib.manager.wf;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.errors.DsmException;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.is.model.resource.SimpleResource;
import eu.dnetlib.is.resource.repository.SimpleResourceRepository;
import eu.dnetlib.manager.wf.model.WorkflowConfiguration;
import eu.dnetlib.manager.wf.model.WorkflowSection;
import eu.dnetlib.manager.wf.model.WorkflowSubscription;
import eu.dnetlib.manager.wf.model.WorkflowTemplate;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSectionRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ExecutionCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails;
@Service
public class WorkflowManagerService implements Stoppable {
private static final Log log = LogFactory.getLog(WorkflowManagerService.class);
@Autowired
private ProcessRegistry processRegistry;
@Autowired
private ProcessFactory processFactory;
@Autowired
private ProcessEngine processEngine;
@Autowired
private DsmService dsmService;
@Autowired
private SimpleResourceRepository simpleResourceRepository;
@Autowired
private WorkflowSectionRepository workflowSectionRepository;
@Autowired
private WorkflowConfigurationRepository workflowConfigurationRepository;
@Autowired
private WorkflowSubscriptionRepository workflowSubscriptionRepository;
private boolean paused = false;
@PostConstruct
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
final WorkflowProcess process = processRegistry.nextProcessToStart();
if (process != null) {
processEngine.startProcess(process);
} else {
log.debug("WorkflowProcess queue is empty");
}
}, 10, 10, TimeUnit.SECONDS);
}
public WorkflowConfiguration findWorkflowConfiguration(final String id) throws WorkflowManagerException {
return workflowConfigurationRepository.findById(id).orElseThrow(() -> new WorkflowManagerException("WF configuration not found: " + id));
}
public ExecutionStatus startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
if (isPaused()) {
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
try {
final String dsName = dsmService.getDs(dsId).getOfficialname();
final WorkflowConfiguration conf = new WorkflowConfiguration();
conf.setId("REPO_HI_" + UUID.randomUUID());
conf.setDetails(new HashMap<>());
conf.setPriority(100);
conf.setDsId(dsId);
conf.setDsName(dsName);
conf.setApiId(apiId);
conf.setEnabled(true);
conf.setConfigured(true);
conf.setSchedulingEnabled(false);
conf.setCronExpression("");
conf.setCronMinInterval(0);
conf.setWorkflow(wfId);
conf.setDestroyWf(null);
conf.setSystemParams(new HashMap<>());
conf.setUserParams(new HashMap<>());
return startWorkflowConfiguration(conf, callback);
} catch (final DsmException e) {
throw new WorkflowManagerException("Invalid datasource: " + dsId, e);
}
}
public ExecutionStatus startWorkflowConfiguration(final String wfConfId,
final String parent,
final ExecutionCallback<WorkflowProcess> callback) throws Exception {
if (isPaused()) {
log.warn("Wf configuration " + wfConfId + " not launched, because WorkflowExecutor is preparing for shutdown");
throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown");
}
final WorkflowConfiguration conf = findWorkflowConfiguration(wfConfId);
return startWorkflowConfiguration(conf, callback);
}
public ExecutionStatus startWorkflowConfiguration(final WorkflowConfiguration conf,
final ExecutionCallback<WorkflowProcess> callback)
throws WorkflowManagerException {
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResource wfMetadata = simpleResourceRepository
.findById(conf.getWorkflow())
.filter(r -> r.getType().equals("workflows"))
.orElseThrow(() -> new WorkflowManagerException("WF not found: " + conf.getWorkflow()));
final WorkflowTemplate wfTmpl = simpleResourceRepository.findContentById(wfMetadata.getId())
.map(s -> {
try {
return new ObjectMapper().readValue(s, WorkflowTemplate.class);
} catch (final Exception e) {
return (WorkflowTemplate) null;
}
})
.filter(Objects::nonNull)
.orElseThrow(() -> new WorkflowManagerException("Invalid wf: " + wfMetadata.getId()));
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfTmpl, conf, callback);
processRegistry.registerProcess(process, conf);
return process.getExecutionStatus();
}
@Override
public void stop() {
paused = true;
}
@Override
public void resume() {
paused = false;
}
@Override
public StoppableDetails getStopDetails() {
final int count = processRegistry.countRunningWfs();
final StoppableDetails.StopStatus status =
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
}
public ProcessRegistry getProcessRegistry() {
return processRegistry;
}
public boolean isPaused() {
return paused;
}
public void setPaused(final boolean paused) {
this.paused = paused;
}
public List<WorkflowSection> listSsections() {
return workflowSectionRepository.findAll();
}
public List<WorkflowConfiguration> listWfConfigurationsBySection(final String section) {
return workflowConfigurationRepository.findBySection(section);
}
public WorkflowConfiguration saveWfConfiguration(final WorkflowConfiguration conf) {
if (StringUtils.isBlank(conf.getId())) {
conf.setId("wfconf-" + UUID.randomUUID());
}
checkConfiguration(conf);
workflowConfigurationRepository.save(conf);
return conf;
}
private void checkConfiguration(final WorkflowConfiguration conf) {
// TODO Auto-generated method stub
}
@Transactional
public void deleteWfConfiguration(final String id) {
workflowSubscriptionRepository.deleteByWfConfigurationId(id);
workflowConfigurationRepository.deleteById(id);
}
public List<WorkflowSubscription> listSubscriptions(final String id) {
return workflowSubscriptionRepository.findByWfConfigurationId(id);
}
@Transactional
public void saveSubscriptions(final String wfConfId, final List<WorkflowSubscription> subscriptions) {
subscriptions.forEach(s -> s.setWfConfigurationId(wfConfId));
workflowSubscriptionRepository.deleteByWfConfigurationId(wfConfId);
workflowSubscriptionRepository.saveAll(subscriptions);
}
public ExecutionStatus findProcess(final String procId) {
return processRegistry.findProcess(procId).getExecutionStatus();
}
public void killProcess(final String procId) {
processRegistry.findProcess(procId).kill();
}
}