refactoring

This commit is contained in:
Michele Artini 2024-01-12 16:07:03 +01:00
parent 694e114b2f
commit 0fe9b39a28
17 changed files with 183 additions and 259 deletions

View File

@ -21,9 +21,9 @@ CREATE TABLE wf_runtime (
ds_id text,
ds_name text,
ds_api text,
graph jsonb,
input jsonb,
output jsonb
graph jsonb NOT NULL DEFAULT '{}',
input jsonb NOT NULL DEFAULT '{}',
output jsonb NOT NULL DEFAULT '{}'
);
@ -38,8 +38,8 @@ CREATE TABLE wf_history (
ds_id text,
ds_name text,
ds_api text,
input jsonb,
output jsonb
input jsonb NOT NULL DEFAULT '{}',
output jsonb NOT NULL DEFAULT '{}'
);
CREATE VIEW wf_journal_view AS SELECT
@ -91,7 +91,7 @@ CREATE TABLE wf_configurations (
scheduling_enabled boolean NOT NULL DEFAULT false,
scheduling_cron text,
scheduling_min_interval int,
workflow text,
workflows text[] NOT NULL DEFAULT array[]::text[],
destroy_wf text,
system_params jsonb NOT NULL DEFAULT '{}',
user_params jsonb NOT NULL DEFAULT '{}'

View File

@ -7,6 +7,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@ -153,7 +154,7 @@ public class ApiController extends DnetRestController {
final List<Node> graph = wfTmpl.getGraph();
// Prepare the parameters using their default values
final Map<String, Object> wfParams = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), null);
final Map<String, Object> wfParams = WfConfigurationUtils.allConfiguredParameters(Arrays.asList(new ImmutablePair<>(id, wfTmpl)), null);
final String wfName = String.format("Aggregation of '%s' (api: %s)", params.getDsName(), params.getApiId());

View File

@ -2,6 +2,7 @@ package eu.dnetlib.wfs.manager.service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@ -10,6 +11,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.PageRequest;
@ -28,7 +30,6 @@ import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfJournalEntry;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.domain.wfs.subscriptions.WfSubscription;
import eu.dnetlib.domain.wfs.templates.WfParam;
import eu.dnetlib.domain.wfs.templates.WfRepoHiDesc;
import eu.dnetlib.domain.wfs.templates.WfTemplate;
import eu.dnetlib.domain.wfs.templates.WfTemplateDesc;
@ -93,13 +94,14 @@ public class WorkflowManagerService {
conf.setId("wfconf-" + UUID.randomUUID());
}
final List<WfParam> wfParams = this.clientFactory.getClient(SimpleResourceClient.class)
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, conf.getWorkflow(), WfTemplate.class)
.getParameters();
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
final List<ImmutablePair<String, WfTemplate>> wfTemplates = conf.getWorkflows()
.stream()
.map(wf -> new ImmutablePair<>(wf, client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, wf, WfTemplate.class)))
.toList();
this.wfConfigurationRepository.save(conf);
conf.setConfigured(WfConfigurationUtils.isConfigured(wfTemplates, conf));
return conf;
}
@ -120,10 +122,6 @@ public class WorkflowManagerService {
return res;
}
public String findWfTemplateId(final String name) {
return this.clientFactory.getClient(SimpleResourceClient.class).findResource(WorkflowsConstants.WF_TEMPLATE, name).getId();
}
@Transactional
public void deleteWfConfiguration(final String id) {
this.wfSubscriptionRepository.deleteByWfConfigurationId(id);
@ -143,7 +141,6 @@ public class WorkflowManagerService {
}
public WfRunningJob prepareNewJob(final WfConfiguration conf, final boolean destroy) throws WorkflowManagerException {
final String wfTemplateId = findWfTemplateId(destroy ? conf.getDestroyWf() : conf.getWorkflow());
final String wfConfId = conf.getId();
final String name = conf.getName();
@ -171,11 +168,16 @@ public class WorkflowManagerService {
final String apiId = conf.getApiId();
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final WfTemplate wfTmpl = client.findResourceContent(wfTemplateId, WfTemplate.class);
final Map<String, Object> params = WfConfigurationUtils.allConfiguredParameters(wfTmpl.getParameters(), conf);
final List<ImmutablePair<String, WfTemplate>> wfTemplates = (destroy ? Arrays.asList(conf.getDestroyWf()) : conf.getWorkflows())
.stream()
.map(wf -> new ImmutablePair<>(wf, client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, wf, WfTemplate.class)))
.toList();
return prepareNewJob(wfConfId, name, family, dsId, dsName, apiId, wfTmpl.getGraph(), params);
final List<Node> graph = WfConfigurationUtils.compositeGraph(wfTemplates);
final Map<String, Object> params = WfConfigurationUtils.allConfiguredParameters(wfTemplates, conf);
return prepareNewJob(wfConfId, name, family, dsId, dsName, apiId, graph, params);
}
@Transactional

View File

@ -1,40 +0,0 @@
{
"parameters": [
{
"name":"nativeMdStoreId",
"description":"Native Metadata Store ID"
},
{
"name":"cleanedMdStoreId",
"description":"Cleaned Metadata Store ID"
}
],
"graph": [
{
"name": "COLLECT",
"type": "launchWorkflow",
"start": true,
"arcs": [
{
"to": "TRANSFORM"
}
],
"input": [
{
"name": "wfId",
"value": ""
}
]
},
{
"name": "TRANSFORM",
"type": "launchWorkflow",
"input": [
{
"name": "wfId",
"value": ""
}
]
}
]
}

View File

@ -1,14 +1,14 @@
{
"parameters":[
{
"name":"nativeMdStoreId",
"description":"Native Metadata Store ID"
},
{
"name":"mode",
"description":"Collection Mode",
"defaultValue":"REFRESH"
},
{
"name":"mdStoreId",
"description":"Metadata Store ID"
},
{
"name":"overrideFromDate",
"description":"Override the default fromDate (history)",
@ -207,7 +207,7 @@
"input":[
{
"name":"mdId",
"ref":"mdStoreId"
"ref":"nativeMdStoreId"
},
{
"name":"inputStream",
@ -244,7 +244,7 @@
},
{
"name":"mdId",
"ref":"mdStoreId"
"ref":"nativeMdStoreId"
},
{
"name":"total",

View File

@ -1,12 +1,12 @@
{
"parameters":[
{
"name":"wfId",
"description":"the workflow to be configured",
"defaultValue": "wf-aggr-dc2dmf"
"name":"workflows",
"description":"the workflows to be configured",
"defaultValue": "wf-collection-dc, wf-transform-dc2dmf"
},
{
"name":"destroyWfId",
"name":"destroyWf",
"description":"the workflow to remove the configuration",
"defaultValue": "wf-destroy-dc2dmf"
},
@ -164,12 +164,12 @@
"join": true,
"input":[
{
"name":"wfId",
"ref":"wfId"
"name":"workflows",
"ref":"workflows"
},
{
"name":"destroyWfId",
"ref":"destroyWfId"
"name":"destroyWf",
"ref":"destroyWf"
},
{
"name":"ds",

View File

@ -1,12 +1,12 @@
{
"parameters":[
{
"name":"inputMdStoreId",
"description":"Input Metadata Store ID"
"name":"nativeMdStoreId",
"description":"Native Metadata Store ID"
},
{
"name":"outputMdStoreId",
"description":"Input Metadata Store ID"
"name":"cleanedMdStoreId",
"description":"Cleaned Metadata Store ID"
}
],
"graph":[
@ -37,7 +37,7 @@
"input":[
{
"name":"mdId",
"ref":"inputMdStoreId"
"ref":"nativeMdStoreId"
}
],
"output":[
@ -83,7 +83,7 @@
"input":[
{
"name":"mdId",
"ref":"outputMdStoreId"
"ref":"cleanedMdStoreId"
},
{
"name":"inputStream",
@ -120,7 +120,7 @@
},
{
"name":"mdId",
"ref":"outputMdStoreId"
"ref":"cleanedMdStoreId"
},
{
"name":"total",

View File

@ -215,7 +215,7 @@ export interface WfConf {
schedulingEnabled: boolean,
cronExpression?: string,
cronMinInterval?: number,
workflow: string,
workflows: string[],
destroyWf?: string,
systemParams: Map<string, string>,
userParams: Map<string, string>

View File

@ -1,5 +1,7 @@
<h2>Datasource Manager - API Managent</h2>
TODO : FIX WF CONFIGURATIONS
<p>
<a mat-stroked-button color="primary" routerLink="/dsm/search">
<mat-icon fontIcon="search"></mat-icon>
@ -177,8 +179,9 @@
<mat-icon fontIcon="edit"></mat-icon>
config
</button>
<button mat-stroked-button (click)="showGraphModal(wf.workflow)">
<mat-icon fontIcon="polyline"></mat-icon> graph
<button mat-stroked-button (click)="showGraphModal(wfId)" *ngFor="let wfId of wf.workflows">
<mat-icon fontIcon="polyline"></mat-icon> graph ({{wfId}})
</button>
<button mat-stroked-button (click)="showGraphModal(wf.destroyWf)" *ngIf="wf.destroyWf">
<mat-icon fontIcon="polyline"></mat-icon> graph (destroy)

View File

@ -9,9 +9,9 @@
<ng-template matStepLabel>Choose Workflow</ng-template>
<mat-form-field appearance="fill" floatLabel="always" style="width: 100%;">
<mat-label>Workflow</mat-label>
<mat-label>Workflow(s)</mat-label>
<input matInput disabled="disabled" value="{{data.workflow}}" *ngIf="editMode" />
<mat-select matInput formControlName="workflow" (selectionChange)="onChangeWfTemplate($event)"
<mat-select matInput formControlName="workflows" (selectionChange)="onChangeWfTemplate($event)" multiple
*ngIf="!editMode">
<mat-option *ngFor="let i of wfTemplates" [value]="i.name">{{i.name}} ({{i.subtype}})</mat-option>
</mat-select>

View File

@ -14,10 +14,7 @@
<mat-icon fontIcon="edit"></mat-icon>
configure
</button>
<a href="./api/resources/{{conf.workflow}}/content" mat-stroked-button color="link" target="_blank">
<mat-icon fontIcon="code"></mat-icon>
raw workflow
</a>
<button mat-stroked-button color="warn" (click)="deleteConf()">
<mat-icon fontIcon="delete"></mat-icon>
delete

View File

@ -107,7 +107,7 @@ export class WfConfSingle implements OnInit, OnChanges {
}
launchWfConf() {
if (this.conf?.id && this.conf?.workflow) {
if (this.conf?.id && this.conf?.workflows) {
this.client.startWfConfiguration(this.conf?.id, (data: WfHistoryEntry) => this.snackBar.open('Workflow launched !!!', 'INFO', { duration: 5000 }));
}
}
@ -121,7 +121,7 @@ export class WfConfSingle implements OnInit, OnChanges {
deleteConf() {
if (this.conf?.destroyWf) {
if (this.conf?.id && this.conf?.workflow) {
if (this.conf?.id && this.conf?.workflows) {
this.client.startDestroyWfConfiguration(this.conf?.id, (data: WfHistoryEntry) => this.snackBar.open('Destroy Workflow launched, PLEASE WAIT !!!', 'INFO', { duration: 5000 }));
}
} else if (this.conf?.id) {

View File

@ -2,10 +2,12 @@ package eu.dnetlib.domain.wfs.conf;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.hibernate.annotations.Type;
import io.hypersistence.utils.hibernate.type.array.ListArrayType;
import io.hypersistence.utils.hibernate.type.json.JsonType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
@ -60,8 +62,9 @@ public class WfConfiguration implements Serializable {
@Column(name = "scheduling_min_interval")
private int cronMinInterval;
@Column(name = "workflow")
private String workflow;
@Type(ListArrayType.class)
@Column(name = "workflows", columnDefinition = "text[]")
private List<String> workflows;
@Column(name = "destroy_wf")
private String destroyWf;
@ -78,7 +81,7 @@ public class WfConfiguration implements Serializable {
private String parentId;
public String getId() {
return id;
return this.id;
}
public void setId(final String id) {
@ -86,7 +89,7 @@ public class WfConfiguration implements Serializable {
}
public String getName() {
return name;
return this.name;
}
public void setName(final String name) {
@ -94,7 +97,7 @@ public class WfConfiguration implements Serializable {
}
public String getSection() {
return section;
return this.section;
}
public void setSection(final String section) {
@ -102,7 +105,7 @@ public class WfConfiguration implements Serializable {
}
public Map<String, String> getDetails() {
return details;
return this.details;
}
public void setDetails(final Map<String, String> details) {
@ -110,7 +113,7 @@ public class WfConfiguration implements Serializable {
}
public Integer getPriority() {
return priority;
return this.priority;
}
public void setPriority(final Integer priority) {
@ -118,7 +121,7 @@ public class WfConfiguration implements Serializable {
}
public String getDsId() {
return dsId;
return this.dsId;
}
public void setDsId(final String dsId) {
@ -126,7 +129,7 @@ public class WfConfiguration implements Serializable {
}
public String getDsName() {
return dsName;
return this.dsName;
}
public void setDsName(final String dsName) {
@ -134,7 +137,7 @@ public class WfConfiguration implements Serializable {
}
public String getApiId() {
return apiId;
return this.apiId;
}
public void setApiId(final String apiId) {
@ -142,7 +145,7 @@ public class WfConfiguration implements Serializable {
}
public boolean isEnabled() {
return enabled;
return this.enabled;
}
public void setEnabled(final boolean enabled) {
@ -150,7 +153,7 @@ public class WfConfiguration implements Serializable {
}
public boolean isConfigured() {
return configured;
return this.configured;
}
public void setConfigured(final boolean configured) {
@ -158,7 +161,7 @@ public class WfConfiguration implements Serializable {
}
public boolean isSchedulingEnabled() {
return schedulingEnabled;
return this.schedulingEnabled;
}
public void setSchedulingEnabled(final boolean schedulingEnabled) {
@ -166,7 +169,7 @@ public class WfConfiguration implements Serializable {
}
public String getCronExpression() {
return cronExpression;
return this.cronExpression;
}
public void setCronExpression(final String cronExpression) {
@ -174,23 +177,23 @@ public class WfConfiguration implements Serializable {
}
public int getCronMinInterval() {
return cronMinInterval;
return this.cronMinInterval;
}
public void setCronMinInterval(final int cronMinInterval) {
this.cronMinInterval = cronMinInterval;
}
public String getWorkflow() {
return workflow;
public List<String> getWorkflows() {
return this.workflows;
}
public void setWorkflow(final String workflow) {
this.workflow = workflow;
public void setWorkflows(final List<String> workflows) {
this.workflows = workflows;
}
public String getDestroyWf() {
return destroyWf;
return this.destroyWf;
}
public void setDestroyWf(final String destroyWf) {
@ -198,7 +201,7 @@ public class WfConfiguration implements Serializable {
}
public Map<String, Object> getSystemParams() {
return systemParams;
return this.systemParams;
}
public void setSystemParams(final Map<String, Object> systemParams) {
@ -206,7 +209,7 @@ public class WfConfiguration implements Serializable {
}
public Map<String, Object> getUserParams() {
return userParams;
return this.userParams;
}
public void setUserParams(final Map<String, Object> userParams) {
@ -214,7 +217,7 @@ public class WfConfiguration implements Serializable {
}
public String getParentId() {
return parentId;
return this.parentId;
}
public void setParentId(final String parentId) {

View File

@ -1,25 +1,34 @@
package eu.dnetlib.wfs.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.conf.WfConfiguration;
import eu.dnetlib.domain.wfs.graph.Arc;
import eu.dnetlib.domain.wfs.graph.Node;
import eu.dnetlib.domain.wfs.templates.WfParam;
import eu.dnetlib.domain.wfs.templates.WfTemplate;
public class WfConfigurationUtils {
private static final Log log = LogFactory.getLog(WfConfigurationUtils.class);
public static boolean isConfigured(final List<WfParam> wfTemplateParams, final WfConfiguration conf) {
public static boolean isConfigured(final List<ImmutablePair<String, WfTemplate>> wfTemplates, final WfConfiguration conf) {
final Map<String, Object> confParams = allConfiguredParameters(wfTemplateParams, conf);
final Map<String, Object> confParams = allConfiguredParameters(wfTemplates, conf);
final List<String> invalids = wfTemplateParams.stream()
final List<String> invalids = wfTemplates.stream()
.map(ImmutablePair::getValue)
.map(WfTemplate::getParameters)
.flatMap(List::stream)
.filter(WfParam::isRequired)
.filter(p -> StringUtils.isBlank(p.getDefaultValue()))
.map(WfParam::getName)
@ -34,32 +43,85 @@ public class WfConfigurationUtils {
return invalids.isEmpty();
}
public static Map<String, Object> allConfiguredParameters(final List<WfParam> wfTemplateParams, final WfConfiguration conf) {
public static Map<String, Object> allConfiguredParameters(final List<ImmutablePair<String, WfTemplate>> wfTemplates, final WfConfiguration conf) {
final Map<String, Object> all = new HashMap<>();
if (wfTemplateParams != null) {
wfTemplateParams.stream()
.filter(p -> StringUtils.isNotBlank(p.getDefaultValue()))
.forEach(p -> all.put(p.getName(), p.getDefaultValue()));
}
for (final ImmutablePair<String, WfTemplate> pair : wfTemplates) {
final WfTemplate tmpl = pair.getValue();
if (conf != null) {
if (conf.getUserParams() != null) {
all.putAll(conf.getSystemParams());
if (tmpl.getParameters() != null) {
tmpl.getParameters()
.stream()
.filter(p -> StringUtils.isNotBlank(p.getDefaultValue()))
.forEach(p -> all.put(p.getName(), p.getDefaultValue()));
}
if (conf.getSystemParams() != null) {
all.putAll(conf.getUserParams());
if (conf != null) {
if (conf.getUserParams() != null) {
all.putAll(conf.getSystemParams());
}
if (conf.getSystemParams() != null) {
all.putAll(conf.getUserParams());
}
}
}
if (log.isDebugEnabled()) {
log.debug("** CONFIGURED INPUT WF PARAMETERS **");
all.forEach((k, v) -> log.debug("* " + k + ": " + v));
log.debug("************************************");
if (log.isDebugEnabled()) {
log.debug("** CONFIGURED INPUT WF PARAMETERS **");
all.forEach((k, v) -> log.debug("* " + k + ": " + v));
log.debug("************************************");
}
}
return all;
}
public static List<Node> compositeGraph(final List<ImmutablePair<String, WfTemplate>> wfTemplates) {
if (wfTemplates.size() == 0) { return new ArrayList<>(); }
if (wfTemplates.size() == 1) { return wfTemplates.get(0).getValue().getGraph(); }
final List<Node> res = new ArrayList<>();
for (int i = 0; i < wfTemplates.size(); i++) {
final String wfName = wfTemplates.get(i).getKey();
final List<Node> nodes = wfTemplates.get(i).getValue().getGraph();
final Node beginNode = new Node(wfName + ".BEGIN");
beginNode.setStart(i == 0);
final Node endNode = new Node(wfName + ".END");
endNode.setJoin(true);
if (i < (wfTemplates.size() - 1)) {
endNode.getArcs().add(new Arc(wfTemplates.get(i + 1).getKey() + ".BEGIN"));
} else {
endNode.getArcs().add(new Arc(WorkflowsConstants.SUCCESS_NODE));
}
for (final Node n : nodes) {
final String newName = wfName + "." + n.getName();
n.setName(newName);
if (n.isStart()) {
n.setStart(false);
beginNode.getArcs().add(new Arc(newName));
}
for (final Arc arc : n.getArcs()) {
if (WorkflowsConstants.SUCCESS_NODE.equals(arc.getTo())) {
arc.setTo(endNode.getName());
} else {
arc.setTo(wfName + "." + arc.getTo());
}
}
res.add(n);
}
res.add(beginNode);
res.add(endNode);
}
return res;
}
}

View File

@ -1,11 +1,14 @@
package eu.dnetlib.wfs.nodes.conf;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
@ -14,7 +17,6 @@ import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.conf.WfConfiguration;
import eu.dnetlib.domain.wfs.templates.WfParam;
import eu.dnetlib.domain.wfs.templates.WfTemplate;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
@ -29,10 +31,10 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
private static final int DEFAULT_AGGR_PRIORITY = 75;
@WfInputParam
private String wfId;
private String workflows;
@WfInputParam
private String destroyWfId;
private String destroyWf;
@WfInputParam
private Datasource ds;
@ -60,6 +62,11 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
this.wfConfId = "wf-aggr-" + UUID.randomUUID();
final List<String> wfList = Arrays.stream(StringUtils.split(this.workflows, ","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.toList();
final WfConfiguration conf = new WfConfiguration();
conf.setId(this.wfConfId);
conf.setSection(WorkflowsConstants.AGGREGATION_WF_CONFS_SECTION);
@ -72,8 +79,8 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
conf.setDsName(this.ds.getOfficialname());
conf.setApiId(this.api.getId());
conf.setWorkflow(this.wfId);
conf.setDestroyWf(this.destroyWfId);
conf.setWorkflows(wfList);
conf.setDestroyWf(this.destroyWf);
conf.setEnabled(true);
conf.setPriority(DEFAULT_AGGR_PRIORITY);
@ -83,13 +90,16 @@ public class RegisterWfConfigurationNode extends AbstractJobNode {
conf.setSystemParams(prepareSystemParams());
final List<WfParam> wfParams = this.clientFactory.getClient(SimpleResourceClient.class)
.findResourceContent(WorkflowsConstants.WF_TEMPLATE, conf.getWorkflow(), WfTemplate.class)
.getParameters();
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
conf.setConfigured(WfConfigurationUtils.isConfigured(wfParams, conf));
final List<ImmutablePair<String, WfTemplate>> wfTemplates = wfList.stream()
.map(wf -> new ImmutablePair<>(wf, client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, wf, WfTemplate.class)))
.toList();
conf.setConfigured(WfConfigurationUtils.isConfigured(wfTemplates, conf));
this.wfConfigurationRepository.save(conf);
}
private Map<String, Object> prepareSystemParams() {

View File

@ -1,114 +0,0 @@
package eu.dnetlib.wfs.nodes.launch;
import java.time.LocalDateTime;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.SimpleResourceClient;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.graph.runtime.RuntimeGraph;
import eu.dnetlib.domain.wfs.jobs.JobStatus;
import eu.dnetlib.domain.wfs.jobs.WfRunningJob;
import eu.dnetlib.domain.wfs.templates.WfTemplate;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.utils.GraphUtils;
import eu.dnetlib.wfs.utils.NodeCallback;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WfProcessUtils;
@WfNode("launchWorkflow")
public class LaunchWorkflowJobNode extends ProcessNode {
private static final Log log = LogFactory.getLog(LaunchWorkflowJobNode.class);
@WfInputParam
private String wfId;
@WfInputParam(optional = true)
private Datasource ds;
@WfInputParam(optional = true)
private Api api;
@Autowired
private DnetServiceClientFactory clientFactory;
// NB: the properties should be declared in all the containers (manager and workers)
@Autowired
private Environment environment;
@Override
public final void execute(final Token token, final NodeCallback nodeCallback) {
try {
final SimpleResourceClient client = this.clientFactory.getClient(SimpleResourceClient.class);
final WfTemplate tmpl = client.findResourceContent(WorkflowsConstants.WF_TEMPLATE, this.wfId, WfTemplate.class);
final WorkflowProcess process = getProcess();
final WfRunningJob job = new WfRunningJob();
final RuntimeGraph graph = GraphUtils.prepareRuntineGraph(tmpl.getGraph(), process.getJobDetails().getInputParams(), this.environment);
job.setProcessId(WfProcessUtils.generateProcessId());
job.setPriority(process.getJobDetails().getPriority());
job.setStartDate(null);
job.setLastUpdate(LocalDateTime.now());
job.setEndDate(null);
job.setGraph(graph);
// TODO (MEDIUM PRIORITY): verify if all necessary parameters are correctly passed
job.setInputParams(process.getJobDetails().getInputParams());
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
job.setStatus(JobStatus.accepted);
// TODO (MEDIUM PRIORITY): verify if the job should be saved in the DB
getEngine().startWorkflowJob(job, new ProcessCallback() {
@Override
public void onSuccess(final WorkflowProcess t) {
log.debug("Child workflow has been completed successfully");
nodeCallback.onSuccess(token);
}
@Override
public void onFail(final WorkflowProcess t, final Throwable e) {
log.error("Child workflow is failed");
nodeCallback.onFail(token, e);
}
});
if (log.isDebugEnabled()) {
log.debug("The child workflow [conf: " + process.getJobDetails().getWfConfId() + "] is starting with procId: " + job.getProcessId());
}
token.setProgressMessage("Launched sub workflow, proc: " + job.getProcessId());
} catch (final Throwable e) {
log.error("got exception while launching child workflow", e);
nodeCallback.onFail(token, e);
}
}
public String getWfId() {
return this.wfId;
}
public void setWfId(final String wfId) {
this.wfId = wfId;
}
}

View File

@ -4,7 +4,7 @@
"section": "GC",
"enabled": true,
"priority": 75,
"workflow": "09991db4-79e6-4e7c-a388-5063955bf9d8",
"workflows": ["wf01", "wf02"],
"schedulingEnabled": true,
"cronExpression": "0 30 12 1/1 * ?",
"cronMinInterval": 9600,