progress counter

This commit is contained in:
Michele Artini 2024-01-18 14:35:03 +01:00
parent 2520e51c3b
commit acba980ea4
9 changed files with 97 additions and 36 deletions

View File

@ -1,6 +1,7 @@
package eu.dnetlib.wfs.nodes;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -69,6 +70,10 @@ public class MdCleanerJobNode extends ProcessNode {
final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.outputMdId);
try {
final AtomicLong counter = new AtomicLong(0);
final long total = inputVersion.getSize();
final Stream<MetadataRecord> stream = this.mdStoreSqlBackend.streamEntries(inputVersion)
.filter(record -> {
try {
@ -84,6 +89,10 @@ public class MdCleanerJobNode extends ProcessNode {
output.setBody(cleaner.transform(input.getBody()));
output.setDateOfTransformation(LocalDateTime.now());
return output;
})
.map(o -> {
updateProgressMessage(counter, total);
return o;
});
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
@ -92,6 +101,8 @@ public class MdCleanerJobNode extends ProcessNode {
mdstoreManager.commitVersion(outputVersion.getId(), size);
updateProgressMessage(size + "/" + size);
this.clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(this.api.getId(), this.outputMdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);

View File

@ -1,6 +1,7 @@
package eu.dnetlib.wfs.nodes;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -71,6 +72,8 @@ public class MdCollectIncrementalJobNode extends ProcessNode {
final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(this.mdId);
final MDStoreVersion outputVersion = mdstoreManager.findVersion(outputMDStore.getCurrentVersion());
try {
final AtomicLong counter = new AtomicLong(0);
final Stream<MetadataRecord> stream = this.collectorService.collect(this.api, fromDate, untilDate)
.map(xml -> {
try {
@ -80,7 +83,11 @@ public class MdCollectIncrementalJobNode extends ProcessNode {
}
})
.filter(filter)
.map(mdBuilder);
.map(mdBuilder)
.map(o -> {
updateProgressMessage(counter, null);
return o;
});
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
@ -88,6 +95,8 @@ public class MdCollectIncrementalJobNode extends ProcessNode {
mdstoreManager.commitVersion(outputVersion.getId(), size);
updateProgressMessage(size + "/" + size);
this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);

View File

@ -1,5 +1,6 @@
package eu.dnetlib.wfs.nodes;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -60,6 +61,8 @@ public class MdCollectRefreshJobNode extends ProcessNode {
final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.mdId);
try {
final AtomicLong counter = new AtomicLong(0);
final Stream<MetadataRecord> stream = this.collectorService.collect(this.api)
.map(xml -> {
try {
@ -69,7 +72,11 @@ public class MdCollectRefreshJobNode extends ProcessNode {
}
})
.filter(filter)
.map(mdBuilder);
.map(mdBuilder)
.map(o -> {
updateProgressMessage(counter, null);
return o;
});
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
@ -77,6 +84,8 @@ public class MdCollectRefreshJobNode extends ProcessNode {
mdstoreManager.commitVersion(outputVersion.getId(), size);
updateProgressMessage(size + "/" + size);
this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size);
} catch (final Throwable e) {
System.err.println("****************** ABORTED");

View File

@ -1,5 +1,7 @@
package eu.dnetlib.wfs.nodes;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
@ -44,26 +46,35 @@ public class MdIndexJobNode extends ProcessNode {
@Override
protected void execute() throws Exception {
final IndexManagerClient indexManager = clientFactory.getClient(IndexManagerClient.class);
final IndexManagerClient indexManager = this.clientFactory.getClient(IndexManagerClient.class);
final IndexConfiguration conf = indexManager.getIndexConfiguration(indexConfId);
final IndexConfiguration conf = indexManager.getIndexConfiguration(this.indexConfId);
if (!solrService.existsIndex(conf.getId())) {
solrService.createIndex(conf);
if (!this.solrService.existsIndex(conf.getId())) {
this.solrService.createIndex(conf);
}
final long start = System.currentTimeMillis();
final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreVersion inputVersion = mdstoreManager.startReading(inputMdId);
final MDStoreVersion inputVersion = mdstoreManager.startReading(this.inputMdId);
try {
solrService.indexRecords(conf, mdStoreSqlBackend.streamEntries(inputVersion).map(MetadataRecord::getBody));
final AtomicLong counter = new AtomicLong(0);
final long total = inputVersion.getSize();
if (!incremental) {
solrService.deleteOldRecords(conf, start);
this.solrService.indexRecords(conf, this.mdStoreSqlBackend.streamEntries(inputVersion)
.map(MetadataRecord::getBody)
.map(o -> {
updateProgressMessage(counter, total);
return o;
}));
if (!this.incremental) {
this.solrService.deleteOldRecords(conf, start);
}
} finally {
mdstoreManager.endReading(inputVersion);
}

View File

@ -3,6 +3,7 @@ package eu.dnetlib.wfs.nodes;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -72,6 +73,9 @@ public class MdTransformJobNode extends ProcessNode {
final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.outputMdId);
try {
final AtomicLong counter = new AtomicLong(0);
final long total = inputVersion.getSize();
final Stream<MetadataRecord> stream = this.mdStoreSqlBackend.streamEntries(inputVersion)
.map(input -> {
final MetadataRecord output = new MetadataRecord();
@ -87,6 +91,10 @@ public class MdTransformJobNode extends ProcessNode {
} catch (final DocumentException e) {
throw new RuntimeException("Invalid record: " + record.getBody());
}
})
.map(o -> {
updateProgressMessage(counter, total);
return o;
});
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
@ -95,6 +103,8 @@ public class MdTransformJobNode extends ProcessNode {
mdstoreManager.commitVersion(outputVersion.getId(), size);
updateProgressMessage(size + "/" + size);
this.clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(this.api.getId(), this.outputMdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);

View File

@ -321,7 +321,7 @@ export class WfRuntimeGraphDialog implements AfterViewInit {
code += 'flowchart TD\n';
code += "\tstart([" + this.graphNodeText('start', undefined) + "])\n";
code += "\tstart([" + this.graphNodeText('start') + "])\n";
code += "\tclass start graphStartNode\n";
for (const [key, value] of Object.entries(this.wf.graph.nodes)) {
@ -332,10 +332,10 @@ export class WfRuntimeGraphDialog implements AfterViewInit {
code += "\t" + from;
if (node.join && node.name != 'success') {
code += '{{' + this.graphNodeText(from, node.type) + '}}';
code += '{{' + this.graphNodeText(from, node.type, node.progressMessage) + '}}';
code += "\n\tclass " + from + " " + cssJoinMap[node.status];
} else {
code += '(' + this.graphNodeText(from, node.type) + ')';
code += '(' + this.graphNodeText(from, node.type, node.progressMessage) + ')';
code += "\n\tclass " + from + " " + cssMap[node.status];
}
@ -360,9 +360,10 @@ export class WfRuntimeGraphDialog implements AfterViewInit {
mermaid.run();
}
graphNodeText(name: string, type?: string) {
graphNodeText(name: string, type?: string, message?: string) {
let res = '"`' + name;
if (type) { res += '\n**(' + type + ')**'; }
if (message) { res += '\n**' + message + '**'; }
res += '`"';
return res;
}

View File

@ -6,6 +6,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -26,6 +27,8 @@ import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class ProcessNode implements BeanNameAware {
private static final int PROGRESS_MESSAGE_INTERVAL = 100;
private static final Log log = LogFactory.getLog(ProcessNode.class);
private String beanName;
@ -42,6 +45,7 @@ public abstract class ProcessNode implements BeanNameAware {
this.graphNode.setnExecutions(this.graphNode.getnExecutions() + 1);
this.graphNode.setStatus(RuntimeNodeStatus.running);
this.engine.updateRunningJob(this.process);
try {
initInputParams(params);
@ -54,14 +58,13 @@ public abstract class ProcessNode implements BeanNameAware {
saveOutputParams(env);
ProcessNode.this.engine.releaseEnv(ProcessNode.this.process, ProcessNode.this.graphNode, env);
} catch (final WorkflowManagerException e) {
onFail(env, e);
fail(env, e);
}
}
@Override
public void onFail(final RuntimeEnv env, final Throwable e) {
log.debug("FAILURE IN NODE " + getNodeName());
ProcessNode.this.engine.completeProcess(ProcessNode.this.process, env, e);
fail(env, e);
}
};
@ -71,11 +74,16 @@ public abstract class ProcessNode implements BeanNameAware {
execute(env, callback);
}
} catch (final Throwable e) {
this.graphNode.setStatus(RuntimeNodeStatus.failed);
this.engine.completeProcess(this.process, env, e);
fail(env, e);
}
}
private final void fail(final RuntimeEnv env, final Throwable e) {
log.debug("FAILURE IN NODE " + getNodeName());
this.graphNode.setStatus(RuntimeNodeStatus.failed);
this.engine.completeProcess(this.process, env, e);
}
private final void execute(final RuntimeEnv env, final NodeCallback callback) {
try {
log.debug("START NODE: " + getBeanName());
@ -255,8 +263,15 @@ public abstract class ProcessNode implements BeanNameAware {
return fields;
}
protected void updateProcessMessage(final String message) {
protected void updateProgressMessage(final String message) {
this.graphNode.setProgressMessage(message);
this.engine.updateRunningJob(this.process);
}
protected void updateProgressMessage(final AtomicLong counter, final Long size) {
final long i = counter.incrementAndGet();
if ((i % PROGRESS_MESSAGE_INTERVAL) == 0) {
updateProgressMessage(i + "/" + ((size != null) ? size : "-"));
}
}
}

View File

@ -181,11 +181,15 @@ public class ProcessEngine {
startProcess(process);
} catch (final Throwable e) {
process.setError(e);
updateRunningJob(process);
processFail(process, e);
}
}
private void processFail(final WorkflowProcess process, final Throwable e) {
process.setError(e);
updateRunningJob(process);
}
public final void killProcess(final String procId) {
this.processRegistry.findProcess(procId).kill();
}
@ -208,8 +212,7 @@ public class ProcessEngine {
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setError(e);
updateRunningJob(process);
processFail(process, e);
}
}
@ -242,8 +245,7 @@ public class ProcessEngine {
}
} catch (final Throwable e) {
log.error("WorkflowProcess node instantiation failed", e);
process.setError(e);
updateRunningJob(process);
processFail(process, e);
}
}
@ -347,7 +349,7 @@ public class ProcessEngine {
this.historyJobRepository.save(historyJob);
}
private void updateRunningJob(final WorkflowProcess process) {
public void updateRunningJob(final WorkflowProcess process) {
log.debug("UPDATING JOB");

View File

@ -25,7 +25,6 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private final String id;
private final WfRunningJob jobDetails;
private List<RuntimeEnv> envs = new CopyOnWriteArrayList<>();
private final Map<String, List<RuntimeEnv>> pausedJoinNodeEnvs = new HashMap<>();
private final Map<String, String> outputParams = new HashMap<>();
@ -97,16 +96,10 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
public void complete(final RuntimeEnv env, final Throwable err) {
final LocalDateTime now = LocalDateTime.now();
this.jobDetails.setLastUpdate(now);
this.jobDetails.setEndDate(now);
this.jobDetails.setStatus(err != null ? JobStatus.failure : JobStatus.success);
if (err != null) {
setError(err);
}
this.error = err; // NULL value is permitted
}
public void setEnvs(final List<RuntimeEnv> envs) {