dnet-docker/dnet-app/libs/dnet-wf-executor-common/src/main/java/eu/dnetlib/wfs/nodes/ProcessNode.java

263 lines
8.1 KiB
Java

package eu.dnetlib.wfs.nodes;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNode;
import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNodeStatus;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.RuntimeEnv;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class ProcessNode implements BeanNameAware {
private static final Log log = LogFactory.getLog(ProcessNode.class);
private String beanName;
private String nodeName;
private RuntimeNode graphNode;
private WorkflowProcess process;
private ProcessEngine engine;
public void execute(final RuntimeEnv env, final Map<String, Object> params) {
this.graphNode.setnExecutions(this.graphNode.getnExecutions() + 1);
this.graphNode.setStatus(RuntimeNodeStatus.running);
try {
initInputParams(params);
final NodeCallback callback = new NodeCallback() {
@Override
public void onSuccess(final RuntimeEnv env) {
try {
saveOutputParams(env);
ProcessNode.this.engine.releaseEnv(ProcessNode.this.process, ProcessNode.this.graphNode, env);
} catch (final WorkflowManagerException e) {
onFail(env, e);
}
}
@Override
public void onFail(final RuntimeEnv env, final Throwable e) {
log.debug("FAILURE IN NODE " + getNodeName());
ProcessNode.this.engine.completeProcess(ProcessNode.this.process, env, e);
}
};
if (isAsync()) {
Executors.newSingleThreadExecutor().execute(() -> execute(env, callback));
} else {
execute(env, callback);
}
} catch (final Throwable e) {
this.graphNode.setStatus(RuntimeNodeStatus.failed);
this.engine.completeProcess(this.process, env, e);
}
}
private final void execute(final RuntimeEnv env, final NodeCallback callback) {
try {
log.debug("START NODE: " + getBeanName());
execute();
this.graphNode.setStatus(RuntimeNodeStatus.completed);
callback.onSuccess(env);
log.debug("END NODE (SUCCESS): " + getBeanName());
} catch (final Throwable e) {
log.error("got exception while executing workflow node", e);
log.debug("END NODE (FAILED): " + getBeanName());
this.graphNode.setStatus(RuntimeNodeStatus.failed);
callback.onFail(env, e);
}
}
protected abstract void execute() throws Exception;
public final boolean isAsync() {
// return this.getClass().getAnnotation(WfNode.class).async;
return WfNode.async;
}
private void initInputParams(final Map<String, Object> params) throws WorkflowManagerException {
for (final Field field : findFields(getClass(), WfInputParam.class)) {
final String annName = field.getAnnotation(WfInputParam.class).value();
final boolean optional = field.getAnnotation(WfInputParam.class).optional();
final Object value = params.get(StringUtils.isNotBlank(annName) ? annName : field.getName());
if (!optional && (value == null)) {
if (log.isDebugEnabled()) {
log.debug("* NULL value in INPUT parameter: " + field.getName());
log.debug("* AVAILABLE PARAMS:");
params.entrySet().forEach(e -> log.debug("* * " + e.getKey() + ": " + e.getValue()));
}
log.warn("NULL value in INPUT parameter: " + field.getName());
throw new WorkflowManagerException("NULL value in INPUT parameter: " + field.getName());
}
fieldValue(field, value);
}
}
private void saveOutputParams(final RuntimeEnv env) throws WorkflowManagerException {
final Map<String, String> outputToEnv = this.graphNode.getOutputEnvMap();
if (log.isDebugEnabled()) {
log.debug("MAP (OUTPUT->ENV):");
outputToEnv.forEach((k, v) -> log.debug("* " + k + "->" + v));
log.debug("END MAP");
}
for (final Field field : findFields(getClass(), WfOutputParam.class)) {
final String annName = field.getAnnotation(WfOutputParam.class).value();
final boolean optional = field.getAnnotation(WfOutputParam.class).optional();
final String outputParam = StringUtils.isNotBlank(annName) ? annName : field.getName();
if (!outputToEnv.containsKey(outputParam)) { throw new WorkflowManagerException("Not mapped output field: " + outputParam); }
final String envParam = outputToEnv.get(outputParam);
final Object value = fieldValue(field);
if (!optional && (value == null)) { throw new WorkflowManagerException("NULL value in OUTPUT parameter: " + field.getName()); }
env.setAttribute(envParam, value);
log.debug("SAVED ENV PARAMETER " + envParam + "=" + value);
}
if (log.isDebugEnabled()) {
env.getAttributes().forEach((k, v) -> log.debug("ENV (END NODE EXECUTION) " + k + ": " + v));
}
}
private final void fieldValue(final Field field, final Object value) {
try {
field.setAccessible(true);
final Class<?> fieldClass = field.getType();
if (value == null) {
field.set(this, null);
} else if (value.getClass().isAssignableFrom(fieldClass)) {
field.set(this, value);
} else if (String.class.isAssignableFrom(fieldClass)) {
field.set(this, value.toString());
} else if (long.class.isAssignableFrom(fieldClass) || Long.class.isAssignableFrom(fieldClass)) {
field.set(this, NumberUtils.toLong(value.toString()));
} else if (int.class.isAssignableFrom(fieldClass) || Integer.class.isAssignableFrom(fieldClass)) {
field.set(this, NumberUtils.toInt(value.toString()));
} else if (double.class.isAssignableFrom(fieldClass) || Double.class.isAssignableFrom(fieldClass)) {
field.set(this, NumberUtils.toDouble(value.toString()));
} else if (float.class.isAssignableFrom(fieldClass) || Float.class.isAssignableFrom(fieldClass)) {
field.set(this, NumberUtils.toFloat(value.toString()));
} else {
log.error("Not Mapped Type, Field class: " + fieldClass + ", Value: " + value + " (" + value.getClass() + ")");
throw new RuntimeException("Not Mapped Type: " + fieldClass);
}
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("Error setting field " + field.getName(), e);
throw new RuntimeException(e);
}
}
private Object fieldValue(final Field field) {
try {
field.setAccessible(true);
return field.get(this);
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("Error getting field " + field.getName(), e);
throw new RuntimeException(e);
}
}
public String getBeanName() {
return this.beanName;
}
@Override
public void setBeanName(final String beanName) {
this.beanName = beanName;
}
public String getNodeName() {
return this.nodeName;
}
public void setNodeName(final String nodeName) {
this.nodeName = nodeName;
}
public WorkflowProcess getProcess() {
return this.process;
}
public void setProcess(final WorkflowProcess process) {
this.process = process;
}
public RuntimeNode getGraphNode() {
return this.graphNode;
}
public void setGraphNode(final RuntimeNode graphNode) {
this.graphNode = graphNode;
}
public ProcessEngine getEngine() {
return this.engine;
}
public void setEngine(final ProcessEngine engine) {
this.engine = engine;
}
@Override
public String toString() {
return String.format("[node beanName=%s, name=%s, object: %s]", this.beanName, this.nodeName, super.toString());
}
private final Set<Field> findFields(final Class<?> clazz, final Class<? extends Annotation> ann) {
final Set<Field> fields = new HashSet<>();
if (clazz != null) {
fields.addAll(findFields(clazz.getSuperclass(), ann));
for (final Field field : clazz.getDeclaredFields()) {
if (field.isAnnotationPresent(ann)) {
fields.add(field);
}
}
}
return fields;
}
protected void updateProcessMessage(final String message) {
this.graphNode.setProgressMessage(message);
}
}