dnet-docker/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/procs/ProcessEngine.java

384 lines
12 KiB
Java

package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNode;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfHistoryJob;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.repository.WfHistoryJobRepository;
import eu.dnetlib.wfs.repository.WfRunningJobRepository;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.GraphUtils;
import eu.dnetlib.wfs.utils.ProcessCallback;
import jakarta.annotation.PostConstruct;
import jakarta.transaction.Transactional;
@Service
public class ProcessEngine {
private static final Log log = LogFactory.getLog(ProcessEngine.class);
@Autowired
private EmailSender emailSender;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private WfRunningJobRepository jobRepository;
@Autowired
private WfHistoryJobRepository historyJobRepository;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ProcessRegistry processRegistry;
private final Set<String> validNodeTypes = new HashSet<>();
@Value("${dnet.wf.procs.size:20}")
private int maxSize;
private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
@PostConstruct
public void init() {
calculateValidNodeTypes();
final Thread t1 = new Thread(this::notificationHandler, "wf-created-listener");
t1.start();
final Thread t2 = new Thread(this::consumeQueue, "wf-queue-consumer");
t2.start();
}
private void calculateValidNodeTypes() {
log.info("************************************");
log.info("* Checking workflow nodes:");
this.applicationContext.getBeansWithAnnotation(WfNode.class).forEach((key, bean) -> {
if (ProcessNode.class.isAssignableFrom(bean.getClass())) {
log.info("* Type: " + key + " -> " + bean.getClass());
this.validNodeTypes.add(key);
} else {
log.warn("* Type: " + key + " -> " + bean.getClass() + "(ERROR: it is not a ProcessNode)");
}
});
if (this.validNodeTypes.isEmpty()) {
log.warn("* 0 nodes available");
}
log.info("************************************");
}
private void notificationHandler() {
while ((ServiceStatusRegistry.getStatus() == null) || StringUtils.isBlank(ServiceStatusRegistry.getStatus().getName())) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
final ConnectionCallback<Integer> action = c -> {
c.createStatement().execute("LISTEN " + WorkflowsConstants.WF_CREATED_NOTIFICATION_CHANNEL);
final PGConnection pgconn = c.unwrap(PGConnection.class);
while (!Thread.currentThread().isInterrupted()) {
final PGNotification[] nts = pgconn.getNotifications(10000);
if (nts != null) {
for (final PGNotification n : nts) {
final String processId = n.getParameter();
log.debug("NOTIFICATION RECEIVED: " + processId);
this.queue.add(processId);
}
}
}
return 0;
};
this.jdbcTemplate.execute(action);
}
private void consumeQueue() {
while ((ServiceStatusRegistry.getStatus() == null) || StringUtils.isBlank(ServiceStatusRegistry.getStatus().getName())) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true) {
try {
tryToStartWf(this.queue.take(), ServiceStatusRegistry.getStatus().getName());
} catch (final Throwable e) {
log.error("Error accepting new wfs", e);
}
}
}
@Transactional
private void tryToStartWf(final String processId, final String executor) {
log.debug("Trying to start " + processId);
this.jobRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
final Optional<WfRunningJob> job = this.jobRepository.findById(processId);
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
startWorkflowJob(job.get(), null);
}
}
@Transactional
public void startWorkflowJob(final WfRunningJob job, final ProcessCallback callback) {
final WorkflowProcess process = new WorkflowProcess(job);
try {
GraphUtils.checkValidity(job.getGraph(), this.validNodeTypes);
process.setCallback(callback);
this.processRegistry.registerProcess(process);
job.setStatus(JobStatus.running);
job.setLastUpdate(LocalDateTime.now());
this.jobRepository.save(job);
startProcess(process);
} catch (final Throwable e) {
process.setError(e);
updateRunningJob(process);
}
}
public final void killProcess(final String procId) {
this.processRegistry.findProcess(procId).kill();
}
public void startProcess(final WorkflowProcess process) {
log.info("Starting process: " + process);
log.debug(process.getJobDetails().getGraph());
final LocalDateTime now = LocalDateTime.now();
process.getJobDetails().setStatus(JobStatus.running);
process.getJobDetails().setStartDate(now);
updateRunningJob(process);
try {
for (final RuntimeNode graphNode : GraphUtils.startNodes(process.getJobDetails().getGraph())) {
final RuntimeEnv env = process.newEnv(process.getJobDetails().getInputParams());
executeNode(process, graphNode, env);
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setError(e);
updateRunningJob(process);
}
}
public void releaseToken(final WorkflowProcess process, final RuntimeNode oldGraphNode, final RuntimeEnv oldEnv) {
try {
for (final RuntimeNode graphNode : GraphUtils.nextNodes(process.getJobDetails().getGraph(), oldGraphNode, oldEnv)) {
if (graphNode.isJoin() || graphNode.isSuccessNode()) {
if (!process.getPausedJoinNodeTokens().containsKey(graphNode.getName())) {
process.getPausedJoinNodeTokens().put(graphNode.getName(), new ArrayList<>());
}
final List<RuntimeEnv> list = process.getPausedJoinNodeTokens().get(graphNode.getName());
list.add(oldEnv);
if (list.size() == GraphUtils.getNumberOfIncomingArcs(process.getJobDetails().getGraph(), graphNode)) {
final RuntimeEnv env = process.newEnv(mergeEnvParams(list.toArray(new RuntimeEnv[list.size()])));
executeNode(process, graphNode, env);
if (graphNode.isSuccessNode()) {
completeProcess(process, env);
}
}
} else {
executeNode(process, graphNode, process.newEnv(oldEnv.getAttributes()));
}
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setError(e);
updateRunningJob(process);
}
}
private void executeNode(final WorkflowProcess process, final RuntimeNode graphNode, final RuntimeEnv env) throws WorkflowManagerException {
updateRunningJob(process);
try {
final Map<String, Object> params = new HashMap<>();
if (graphNode.getParams() != null) {
graphNode.getParams().forEach((k, v) -> params.put(k, v));
}
if (graphNode.getEnvParams() != null) {
graphNode.getEnvParams().forEach((k, v) -> params.put(k, env.getAttribute(v.toString())));
}
final ProcessNode pNode = newProcessNode(graphNode, process);
pNode.execute(env, params);
} catch (final Throwable e) {
throw new WorkflowManagerException("Error executing node " + graphNode.getName(), e);
} finally {
updateRunningJob(process);
}
}
private ProcessNode newProcessNode(final RuntimeNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {
ProcessNode pnode;
if (graphNode.isSuccessNode()) {
pnode = new SuccessNode();
} else if (StringUtils.isBlank(graphNode.getType())) {
pnode = new DefaultJobNode(graphNode.getName());
} else {
pnode = (ProcessNode) this.applicationContext.getBean(graphNode.getType());
}
if (pnode == null) {
log.error("cannot find bean of type " + graphNode.getType());
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
}
pnode.setNodeName(graphNode.getName());
pnode.setProcess(process);
pnode.setGraphNode(graphNode);
pnode.setEngine(this);
return pnode;
}
private Map<String, Object> mergeEnvParams(final RuntimeEnv... envs) {
final Map<String, Object> map = new HashMap<>();
Arrays.stream(envs).forEach(t -> map.putAll(t.getAttributes()));
return map;
}
public void completeProcess(final WorkflowProcess process, final RuntimeEnv env) {
process.complete(env);
updateRunningJob(process);
saveHistoryJob(process);
this.processRegistry.unregisterProcess(process.getId());
final String wfConfId = process.getJobDetails().getWfConfId();
if (StringUtils.isNotBlank(wfConfId)) {
this.emailSender.sendMails(wfConfId, process);
}
}
private void saveHistoryJob(final WorkflowProcess process) {
log.debug("Process completed " + process.getId());
final WfHistoryJob historyJob = new WfHistoryJob();
historyJob.setProcessId(process.getId());
historyJob.setName(process.getJobDetails().getName());
historyJob.setFamily(process.getJobDetails().getFamily());
historyJob.setWfConfId(process.getJobDetails().getWfConfId());
historyJob.setDsId(process.getJobDetails().getDsId());
historyJob.setDsName(process.getJobDetails().getDsName());
historyJob.setApiId(process.getJobDetails().getApiId());
historyJob.setStartDate(process.getJobDetails().getStartDate());
historyJob.setEndDate(process.getJobDetails().getEndDate());
final Map<String, Object> details = new LinkedHashMap<>();
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
historyJob.setStatus(JobStatus.failure);
} else {
historyJob.setStatus(JobStatus.success);
}
details.putAll(process.getOutputParams());
historyJob.setOutputParams(details);
this.historyJobRepository.save(historyJob);
}
private void updateRunningJob(final WorkflowProcess process) {
log.debug("UPDATING JOB");
final LocalDateTime now = LocalDateTime.now();
process.getJobDetails().setLastUpdate(LocalDateTime.now());
if (process.isTerminated()) {
final Map<String, Object> details = new LinkedHashMap<>();
details.putAll(process.getOutputParams());
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
process.getJobDetails().setStatus(JobStatus.failure);
} else {
process.getJobDetails().setStatus(JobStatus.success);
}
process.getJobDetails().setOutputParams(details);
process.getJobDetails().setEndDate(now);
} else {
process.getJobDetails().setStatus(JobStatus.running);
}
this.jobRepository.save(process.getJobDetails());
}
public Set<String> getValidNodeTypes() {
return this.validNodeTypes;
}
}