check node parameters
This commit is contained in:
parent
e9e843e7f7
commit
e462cf804e
|
@ -13,4 +13,6 @@ public @interface WfInputParam {
|
|||
|
||||
String value() default "";
|
||||
|
||||
boolean optional() default false;
|
||||
|
||||
}
|
||||
|
|
|
@ -12,4 +12,7 @@ import java.lang.annotation.Target;
|
|||
public @interface WfOutputParam {
|
||||
|
||||
String value() default "";
|
||||
|
||||
boolean optional() default false;
|
||||
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.BeanNameAware;
|
||||
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
import eu.dnetlib.wfs.annotations.WfInputParam;
|
||||
import eu.dnetlib.wfs.annotations.WfNode;
|
||||
import eu.dnetlib.wfs.annotations.WfOutputParam;
|
||||
|
@ -34,26 +35,40 @@ public abstract class ProcessNode implements BeanNameAware {
|
|||
return WfNode.async;
|
||||
}
|
||||
|
||||
public final void initInputParams(final Map<String, Object> params) {
|
||||
findFields(getClass(), WfInputParam.class).forEach(field -> {
|
||||
public final void initInputParams(final Map<String, Object> params) throws WorkflowManagerException {
|
||||
|
||||
for (final Field field : findFields(getClass(), WfInputParam.class)) {
|
||||
final String annName = field.getAnnotation(WfInputParam.class).value();
|
||||
if (StringUtils.isNotBlank(annName)) {
|
||||
fieldValue(field, params.get(annName));
|
||||
} else {
|
||||
fieldValue(field, params.get(field.getName()));
|
||||
final boolean optional = field.getAnnotation(WfInputParam.class).optional();
|
||||
|
||||
final Object value = params.get(StringUtils.isNotBlank(annName) ? annName : field.getName());
|
||||
|
||||
if (!optional && (value == null)) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("* NULL value in INPUT parameter: " + field.getName());
|
||||
log.debug("* AVAILABLE PARAMS:");
|
||||
params.entrySet().forEach(e -> log.debug("* * " + e.getKey() + ": " + e.getValue()));
|
||||
}
|
||||
|
||||
throw new WorkflowManagerException("NULL value in INPUT parameter: " + field.getName());
|
||||
}
|
||||
});
|
||||
|
||||
fieldValue(field, value);
|
||||
}
|
||||
}
|
||||
|
||||
public final void saveOutputParams(final Env env) {
|
||||
findFields(getClass(), WfOutputParam.class).forEach(field -> {
|
||||
public final void saveOutputParams(final Env env) throws WorkflowManagerException {
|
||||
|
||||
for (final Field field : findFields(getClass(), WfOutputParam.class)) {
|
||||
final String annName = field.getAnnotation(WfOutputParam.class).value();
|
||||
if (StringUtils.isNotBlank(annName)) {
|
||||
env.setAttribute(annName, fieldValue(field));
|
||||
} else {
|
||||
env.setAttribute(field.getName(), fieldValue(field));
|
||||
}
|
||||
});
|
||||
final boolean optional = field.getAnnotation(WfInputParam.class).optional();
|
||||
|
||||
final Object value = fieldValue(field);
|
||||
|
||||
if (!optional && (value == null)) { throw new WorkflowManagerException("NULL value in OUTPUT parameter: " + field.getName()); }
|
||||
|
||||
env.setAttribute(StringUtils.isNotBlank(annName) ? annName : field.getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
private final void fieldValue(final Field field, final Object value) {
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.springframework.context.ApplicationContext;
|
|||
import org.springframework.stereotype.Service;
|
||||
|
||||
import eu.dnetlib.domain.wfs.JobStatus;
|
||||
import eu.dnetlib.domain.wfs.WfJournalEntry;
|
||||
import eu.dnetlib.errors.WorkflowManagerException;
|
||||
import eu.dnetlib.wfs.graph.GraphNode;
|
||||
import eu.dnetlib.wfs.nodes.DefaultJobNode;
|
||||
|
@ -49,6 +50,9 @@ public class ProcessEngine {
|
|||
process.setStatus(JobStatus.running);
|
||||
process.setStartDate(now);
|
||||
process.setLastActivityDate(now);
|
||||
process.getEnv().setAttribute("dsId", process.getDsId());
|
||||
process.getEnv().setAttribute("dsName", process.getDsName());
|
||||
process.getEnv().setAttribute("apiId", process.getDsInterface());
|
||||
|
||||
try {
|
||||
for (final GraphNode graphNode : process.getGraph().startNodes()) {
|
||||
|
@ -66,13 +70,22 @@ public class ProcessEngine {
|
|||
|
||||
@Override
|
||||
public void onSuccess(final Token t) {
|
||||
pNode.saveOutputParams(t.getEnv());
|
||||
releaseToken(process, graphNode, t);
|
||||
try {
|
||||
pNode.saveOutputParams(t.getEnv());
|
||||
releaseToken(process, graphNode, t);
|
||||
} catch (final WorkflowManagerException e) {
|
||||
onFail(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFail(final Token t) {
|
||||
pNode.saveOutputParams(t.getEnv());
|
||||
try {
|
||||
pNode.saveOutputParams(t.getEnv());
|
||||
} catch (final WorkflowManagerException e) {
|
||||
// Probably the LOG LEVEL should be DEBUG
|
||||
log.warn("Output parameters not correctly saved");
|
||||
}
|
||||
completeProcess(process, t);
|
||||
}
|
||||
};
|
||||
|
@ -167,7 +180,10 @@ public class ProcessEngine {
|
|||
private void completeProcess(final WorkflowProcess process, final Token token) {
|
||||
token.checkStatus();
|
||||
process.complete(token);
|
||||
wfJournalEntryRepository.save(process.asLog());
|
||||
|
||||
final WfJournalEntry job = process.asLog();
|
||||
wfJournalEntryRepository.save(job);
|
||||
|
||||
emailSender.sendMails(process);
|
||||
}
|
||||
|
||||
|
|
|
@ -178,7 +178,6 @@ public class WfExecutorService implements Stoppable {
|
|||
final WorkflowProcess process = new WorkflowProcess(processId, wfMetadata, conf, graph, globalParams, callback);
|
||||
|
||||
processRegistry.registerProcess(process, conf);
|
||||
wfJournalEntryRepository.save(process.asLog());
|
||||
|
||||
return process;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue