input and output params

This commit is contained in:
Michele Artini 2024-01-10 13:45:20 +01:00
parent c709559c06
commit 66437371fc
10 changed files with 57 additions and 52 deletions

View File

@ -22,8 +22,8 @@ CREATE TABLE wf_runtime (
ds_name text,
ds_api text,
graph jsonb,
params jsonb,
details jsonb
input jsonb,
output jsonb
);
@ -38,7 +38,8 @@ CREATE TABLE wf_history (
ds_id text,
ds_name text,
ds_api text,
details jsonb
input jsonb,
output jsonb
);
CREATE VIEW wf_journal_view AS SELECT
@ -53,7 +54,8 @@ CREATE VIEW wf_journal_view AS SELECT
coalesce(r.ds_id, h.ds_id) AS ds_id,
coalesce(r.ds_name, h.ds_name) AS ds_name,
coalesce(r.ds_api, h.ds_api) AS ds_api,
coalesce(r.details, h.details) AS details
coalesce(r.input, h.input) AS input,
coalesce(r.output, h.output) AS output
FROM wf_runtime r FULL OUTER JOIN wf_history h ON (r.process_id = h.process_id);
-- Workflows

View File

@ -67,7 +67,7 @@ public class WfHistoryImporter {
wf.setStatus(JobStatus.failure);
}
final Map<String, String> details = new LinkedHashMap<>();
final Map<String, Object> details = new LinkedHashMap<>();
final Iterator<Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> f = fields.next();
@ -76,7 +76,7 @@ public class WfHistoryImporter {
}
}
wf.setDetails(details);
wf.setOutputParams(details);
this.historyJobRepository.save(wf);

View File

@ -180,14 +180,14 @@ public class WorkflowManagerService {
job.setStatus(JobStatus.created);
job.setDetails(new LinkedHashMap<>());
job.setOutputParams(new LinkedHashMap<>());
job.setStartDate(null);
job.setEndDate(null);
job.setWfExecutor(null);
job.setLastUpdate(LocalDateTime.now());
job.setGraph(graph);
job.setParams(params);
job.setInputParams(params);
final WfRunningJob saved = this.jobRepository.save(job);

View File

@ -240,7 +240,8 @@ export interface WfHistoryEntry {
dsId?: string,
dsName?: string,
dsApi?: string,
details: Map<string, string>
inputParams: Map<string, string>
outputParams: Map<string, string>
}
export interface WfSubscription {

View File

@ -161,17 +161,17 @@ export class WfHistoryDetailsDialog {
@Inject(MAT_DIALOG_DATA) public data: WfHistoryEntry,
) {
let list: KeyValue[] = [];
let map = new Map(Object.entries(data.details));
for (let [key, value] of map) {
list.push({ k: key, v: value });
for (let [key, value] of new Map(Object.entries(data.inputParams))) {
list.push({ k: '[INPUT] ' + key, v: value });
}
for (let [key, value] of new Map(Object.entries(data.outputParams))) {
list.push({ k: '[OUTPUT] ' + key, v: value });
}
this.wfDatasource.data = list;
this.startDate = data.startDate;
this.endDate = data.endDate;
this.duration = this.calculateDateDiff(
parseInt(map.get('system:startDate')),
parseInt(map.get('system:endDate'))
);
this.duration = this.calculateDateDiff(new Date(data.startDate), new Date(data.endDate));
}
applyFilter(event: Event) {
const filterValue = (event.target as HTMLInputElement).value.trim().toLowerCase();
@ -185,7 +185,11 @@ export class WfHistoryDetailsDialog {
this.selectedElement = elem;
}
calculateDateDiff(start: number, end: number): string {
calculateDateDiff(from: Date, to: Date): string {
let start = from.getTime();
let end = to.getTime();
if (start <= 0 || end <= 0) {
return '-';
}

View File

@ -29,9 +29,9 @@ public class WfManagerClient extends DnetServiceClient {
return Arrays.stream(history)
.filter(job -> (job.getStatus() == JobStatus.success) || (job.getStatus() == JobStatus.failure))
.map(job -> {
final Map<String, String> details = job.getDetails();
final Map<String, Object> details = job.getOutputParams();
final AggregationStage stage = AggregationStage.parse(details.get("system:wfName"));
final AggregationStage stage = AggregationStage.parse((String) details.get("system:wfName"));
final boolean success = job.getStatus() == JobStatus.success;
final String date = job.getEndDate().format(dateTimeFormatter);
final long total = success ? getNumberOfRecords(details) : 0;
@ -58,19 +58,19 @@ public class WfManagerClient extends DnetServiceClient {
.collect(Collectors.toList());
}
private CollectionMode getCollectionMode(final Map<String, String> details) {
private CollectionMode getCollectionMode(final Map<String, Object> details) {
// TODO (LOW PRIORITY) : verify the terms in the history
return Optional.ofNullable(details.get("system:node:SELECT_MODE:selection"))
return Optional.ofNullable((String) details.get("system:node:SELECT_MODE:selection"))
.map(CollectionMode::valueOf)
.orElseGet(() -> Optional.ofNullable(details.get("collectionMode"))
.orElseGet(() -> Optional.ofNullable((String) details.get("collectionMode"))
.map(CollectionMode::valueOf)
.orElse(null));
}
private Integer getNumberOfRecords(final Map<String, String> details) {
private Integer getNumberOfRecords(final Map<String, Object> details) {
// TODO (LOW PRIORITY) : verify the terms in the history
final String sinkSize = details.get("mainlog:sinkSize");
final String total = details.get("mainlog:total");
final String sinkSize = (String) details.get("mainlog:sinkSize");
final String total = (String) details.get("mainlog:total");
if (StringUtils.isNotBlank(sinkSize)) { return NumberUtils.toInt(sinkSize); }
if (StringUtils.isNotBlank(total)) { return NumberUtils.toInt(total); }
return -1;

View File

@ -52,8 +52,12 @@ public abstract class AbstractJob implements Serializable {
private String apiId;
@Type(JsonType.class)
@Column(name = "details")
private Map<String, String> details = new HashMap<>();
@Column(name = "input")
private Map<String, Object> inputParams = new HashMap<>();
@Type(JsonType.class)
@Column(name = "output")
private Map<String, Object> outputParams = new HashMap<>();
public String getProcessId() {
return this.processId;
@ -135,12 +139,20 @@ public abstract class AbstractJob implements Serializable {
this.apiId = apiId;
}
public Map<String, String> getDetails() {
return this.details;
public Map<String, Object> getInputParams() {
return this.inputParams;
}
public void setDetails(final Map<String, String> details) {
this.details = details;
public void setInputParams(final Map<String, Object> inputParams) {
this.inputParams = inputParams;
}
public Map<String, Object> getOutputParams() {
return this.outputParams;
}
public void setOutputParams(final Map<String, Object> outputParams) {
this.outputParams = outputParams;
}
}

View File

@ -2,9 +2,7 @@ package eu.dnetlib.domain.wfs.jobs;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.hibernate.annotations.Type;
@ -33,10 +31,6 @@ public class WfRunningJob extends AbstractJob {
@Column(name = "graph")
private List<WfTemplate.Node> graph = new ArrayList<>();
@Type(JsonType.class)
@Column(name = "params")
private Map<String, Object> params = new HashMap<>();
public String getWfExecutor() {
return this.wfExecutor;
}
@ -69,12 +63,4 @@ public class WfRunningJob extends AbstractJob {
this.graph = graph;
}
public Map<String, Object> getParams() {
return this.params;
}
public void setParams(final Map<String, Object> params) {
this.params = params;
}
}

View File

@ -60,7 +60,7 @@ public class LaunchWorkflowJobNode extends ProcessNode {
job.setEndDate(null);
job.setGraph(tmpl.getGraph());
// TODO (MEDIUM PRIORITY): verify if all necessary parameters are correctly passed
job.setParams(process.getJobDetails().getParams());
job.setInputParams(process.getJobDetails().getInputParams());
job.setWfExecutor(ServiceStatusRegistry.getStatus().getName());
job.setStatus(JobStatus.accepted);

View File

@ -148,7 +148,7 @@ public class ProcessEngine {
final WorkflowProcess process = new WorkflowProcess(job);
try {
process.setGraph(this.graphLoader.loadGraph(job.getGraph(), job.getParams()));
process.setGraph(this.graphLoader.loadGraph(job.getGraph(), job.getInputParams()));
process.setCallback(callback);
this.processRegistry.registerProcess(process);
@ -183,7 +183,7 @@ public class ProcessEngine {
try {
for (final GraphNode graphNode : process.getGraph().startNodes()) {
final Token token = process.newToken(process.getJobDetails().getParams());
final Token token = process.newToken(process.getJobDetails().getInputParams());
executeNode(process, graphNode, token);
}
} catch (final Throwable e) {
@ -307,7 +307,7 @@ public class ProcessEngine {
historyJob.setStartDate(process.getJobDetails().getStartDate());
historyJob.setEndDate(process.getJobDetails().getEndDate());
final Map<String, String> details = new LinkedHashMap<>();
final Map<String, Object> details = new LinkedHashMap<>();
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, ExceptionUtils.getStackTrace(process.getError()));
@ -317,7 +317,7 @@ public class ProcessEngine {
}
details.putAll(process.getOutputParams());
historyJob.setDetails(details);
historyJob.setOutputParams(details);
this.historyJobRepository.save(historyJob);
}
@ -330,7 +330,7 @@ public class ProcessEngine {
process.getJobDetails().setLastUpdate(LocalDateTime.now());
if (process.isTerminated()) {
final Map<String, String> details = new LinkedHashMap<>();
final Map<String, Object> details = new LinkedHashMap<>();
details.putAll(process.getOutputParams());
if (process.getError() != null) {
details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError().getMessage());
@ -339,7 +339,7 @@ public class ProcessEngine {
} else {
process.getJobDetails().setStatus(JobStatus.success);
}
process.getJobDetails().setDetails(details);
process.getJobDetails().setOutputParams(details);
process.getJobDetails().setEndDate(now);
} else {
process.getJobDetails().setStatus(JobStatus.running);