WfAsync annotation

This commit is contained in:
Michele Artini 2023-04-27 08:39:36 +02:00
parent 3737f5fa6c
commit 3517cbde4d
10 changed files with 16 additions and 33 deletions

View File

@ -0,0 +1,14 @@
package eu.dnetlib.manager.wf.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WfAsync {
}

View File

@ -5,6 +5,7 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.annotations.WfAsync;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
@ -12,12 +13,6 @@ public abstract class AbstractJobNode extends ProcessNode {
private static final Log log = LogFactory.getLog(AbstractJobNode.class);
private final boolean async;
public AbstractJobNode(final boolean async) {
this.async = async;
}
@Override
public final void execute(final Token token, final NodeCallback callback) {
try {
@ -47,7 +42,7 @@ public abstract class AbstractJobNode extends ProcessNode {
protected abstract void execute();
public final boolean isAsync() {
return async;
return this.getClass().isAnnotationPresent(WfAsync.class);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.manager.wf.nodes;
public final class DefaultJobNode extends AbstractJobNode {
public DefaultJobNode(final String name) {
super(false);
setNodeName(name);
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.manager.wf.nodes;
public class SuccessNode extends AbstractJobNode {
public SuccessNode() {
super(false);
setNodeName("success");
}

View File

@ -10,10 +10,6 @@ import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("findDateRangeForIncrementalHarvesting")
public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {
public FindDateRangeForIncrementalHarvestingNode() {
super(false);
}
@WfInputParam
private LocalDateTime overrideFromDate;
@WfInputParam

View File

@ -10,10 +10,6 @@ public abstract class StreamConsumerNode<T, K> extends AbstractJobNode {
@WfInputParam
private Stream<T> inputStream;
public StreamConsumerNode() {
super(false);
}
abstract protected Stream<T> consumeStream(Stream<T> stream);
@Override

View File

@ -14,10 +14,6 @@ public abstract class StreamFilterNode<T> extends AbstractJobNode {
@WfOutputParam
private Stream<T> outputStream;
public StreamFilterNode() {
super(false);
}
abstract protected Stream<T> filterStream(Stream<T> input);
@Override

View File

@ -14,10 +14,6 @@ public abstract class StreamMapperNode<T, K> extends AbstractJobNode {
@WfOutputParam
private Stream<K> outputStream;
public StreamMapperNode() {
super(false);
}
abstract protected Stream<K> mapStream(Stream<T> input);
@Override

View File

@ -10,10 +10,6 @@ public abstract class StreamSupplierNode<T> extends AbstractJobNode {
@WfOutputParam
private Stream<T> outputStream;
public StreamSupplierNode() {
super(false);
}
abstract protected Stream<T> prepareStream();
@Override

View File

@ -17,10 +17,6 @@ public class Test01Node extends AbstractJobNode {
@WfOutputParam
private String response;
public Test01Node() {
super(false);
}
@Override
protected void execute() {
System.out.println("************************");