some refactoring

This commit is contained in:
Michele Artini 2023-04-27 11:08:47 +02:00
parent 3517cbde4d
commit 52cde91251
13 changed files with 171 additions and 48 deletions

View File

@ -60,6 +60,9 @@ public class MDStoreService {
private static final Logger log = LoggerFactory.getLogger(MDStoreService.class);
public static final String REFRESH_MODE = "REFRESH";
public static final String INCREMENTAL_MODE = "INCREMENTAL";
public List<MDStoreWithInfo> listMdStores() {
return StreamSupport.stream(mdstoreWithInfoRepository.findAll().spliterator(), false)

View File

@ -1,14 +0,0 @@
package eu.dnetlib.manager.wf.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WfAsync {
}

View File

@ -16,5 +16,7 @@ import org.springframework.stereotype.Component;
@Scope("prototype")
public @interface WfNode {
boolean async = false;
String value();
}

View File

@ -1,11 +1,8 @@
package eu.dnetlib.manager.wf.nodes;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.annotations.WfAsync;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
@ -18,13 +15,8 @@ public abstract class AbstractJobNode extends ProcessNode {
try {
log.debug("START NODE: " + getBeanName());
token.setProgressMessage(getNodeName());
if (isAsync()) {
Executors.newSingleThreadExecutor().execute(() -> doExecute(token, callback));
} else {
doExecute(token, callback);
}
execute();
callback.onComplete(token);
log.debug("END NODE (SUCCESS): " + getBeanName());
} catch (final Throwable e) {
log.error("got exception while executing workflow node", e);
@ -34,15 +26,6 @@ public abstract class AbstractJobNode extends ProcessNode {
}
private final void doExecute(final Token token, final NodeCallback callback) {
execute();
callback.onComplete(token);
}
protected abstract void execute();
public final boolean isAsync() {
return this.getClass().isAnnotationPresent(WfAsync.class);
}
}

View File

@ -13,6 +13,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.Token;
@ -28,6 +29,11 @@ public abstract class ProcessNode implements BeanNameAware {
public abstract void execute(final Token token, NodeCallback callback);
public final boolean isAsync() {
// return this.getClass().getAnnotation(WfNode.class).async;
return WfNode.async;
}
public final void initInputParams(final Map<String, Object> params) {
findFields(getClass(), WfInputParam.class).forEach(field -> {
final String annName = field.getAnnotation(WfInputParam.class).value();

View File

@ -0,0 +1,30 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.stream.StreamConsumerNode;
@WfNode("feedMdStore")
public class MdStoreFeederNode extends StreamConsumerNode<MetadataRecord> {
@WfInputParam
private String mdId;
@WfInputParam
private String storingType;
@WfOutputParam
private long total;
@Override
protected void consumeStream(final Stream<MetadataRecord> stream) {
// TODO Auto-generated method stub
total = 0;
}
}

View File

@ -0,0 +1,22 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
@WfNode("fetchMdStore")
public class MdStoreFetcherNode extends StreamSupplierNode<MetadataRecord> {
@WfInputParam
private String mdId;
@Override
protected Stream<MetadataRecord> prepareStream() {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
@WfNode("openaireMdBuilder")
public class OpenaireMdBuilderNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@WfInputParam
private String dsId;
@WfInputParam
private String dsName;
@WfInputParam
private String apiId;
@WfInputParam
private boolean inferred;
@WfInputParam
private boolean deletedbyinference;
@WfInputParam
private String inferenceprovenance;
@WfInputParam
private float trust;
@WfInputParam
private String provenanceactionclassname;
@WfInputParam
private String provenanceactionclassid;
@Override
protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("updateApiExtraFields")
public class UpdateApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
@WfInputParam
private String infoType; // COLLECT or TRANSFORM
@WfInputParam
private String mdId;
@WfInputParam
private long total;
@Override
protected void execute() {
final LocalDateTime date = LocalDateTime.now();
// TODO Auto-generated method stub
}
}

View File

@ -5,12 +5,12 @@ import java.util.stream.Stream;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
public abstract class StreamConsumerNode<T, K> extends AbstractJobNode {
public abstract class StreamConsumerNode<T> extends AbstractJobNode {
@WfInputParam
private Stream<T> inputStream;
abstract protected Stream<T> consumeStream(Stream<T> stream);
abstract protected void consumeStream(Stream<T> stream);
@Override
protected final void execute() {

View File

@ -6,6 +6,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -130,7 +131,15 @@ public class ProcessEngine {
final ProcessNode pNode = newProcessNode(graphNode, process);
pNode.initInputParams(params);
pNode.execute(token, newNodeCallback(process, pNode, graphNode));
final NodeCallback callback = newNodeCallback(process, pNode, graphNode);
if (pNode.isAsync()) {
Executors.newSingleThreadExecutor().execute(() -> pNode.execute(token, callback));
} else {
pNode.execute(token, callback);
}
}
private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {

View File

@ -176,10 +176,6 @@
"name":"trust",
"value":"0.9"
},
{
"name":"trust",
"value":"0.9"
},
{
"name":"provenanceactionclassname",
"value":"sysimport:crosswalk:repository"
@ -203,7 +199,7 @@
},
{
"name":"STORE",
"type":"StoreMDStoreRecords",
"type":"feedMdStore",
"input":[
{
"name":"mdId",
@ -220,7 +216,8 @@
],
"output":[
{
"total":"total"
"name":"total",
"env": "total"
}
],
"arcs":[
@ -231,8 +228,16 @@
},
{
"name":"UPDATE_INFO",
"type":"MDStoreToApiExtraField",
"type":"updateApiExtraFields",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
},
{
"name":"infoType",
"value":"Collect"

View File

@ -28,7 +28,7 @@
"graph":[
{
"name":"FETCH_MDSTORE",
"type":"FetchMdStoreRecords",
"type":"fetchMdStore",
"start": true,
"input":[
{
@ -50,7 +50,7 @@
},
{
"name":"TRANSFORM",
"type":"Transform",
"type":"applyXslt",
"input":[
{
"name":"ruleId",
@ -75,7 +75,7 @@
},
{
"name":"STORE",
"type":"StoreMDStoreRecords",
"type":"feedMdStore",
"input":[
{
"name":"mdId",
@ -92,7 +92,8 @@
],
"output":[
{
"total":"total"
"name":"total",
"env": "total"
}
],
"arcs":[
@ -103,8 +104,16 @@
},
{
"name":"UPDATE_INFO",
"type":"MDStoreToApiExtraField",
"type":"updateApiExtraFields",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
},
{
"name":"infoType",
"value":"Transform"