fixed progress messages

This commit is contained in:
Michele Artini 2024-01-22 12:38:53 +01:00
parent cea27b4612
commit dc1be1e9b0
12 changed files with 66 additions and 70 deletions

View File

@ -1,5 +1,7 @@
package eu.dnetlib.wfs.nodes;
import static eu.dnetlib.wfs.nodes.ProcessNode.PROGRESS_MESSAGE_INTERVAL;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
@ -24,7 +26,6 @@ import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_clean")
public class MdCleanerJobNode extends ProcessNode {
@ -57,7 +58,6 @@ public class MdCleanerJobNode extends ProcessNode {
private MDStoreSqlBackend mdStoreSqlBackend;
@Override
@Transactional
protected void execute() throws Exception {
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
@ -90,10 +90,7 @@ public class MdCleanerJobNode extends ProcessNode {
output.setDateOfTransformation(LocalDateTime.now());
return output;
})
.map(o -> {
updateProgressMessage(counter, total);
return o;
});
.map(o -> updateProgressMessage(o, counter, total, PROGRESS_MESSAGE_INTERVAL));
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);

View File

@ -25,7 +25,6 @@ import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.collector.CollectorService;
import eu.dnetlib.wfs.utils.MdBuilderFactory;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_collect_incremental")
public class MdCollectIncrementalJobNode extends ProcessNode {
@ -58,7 +57,6 @@ public class MdCollectIncrementalJobNode extends ProcessNode {
private MDStoreSqlBackend mdStoreSqlBackend;
@Override
@Transactional
protected void execute() throws Exception {
final LocalDateTime fromDate = this.overrideFromDate != null ? this.overrideFromDate : findLastCollDate(this.api);
@ -84,10 +82,7 @@ public class MdCollectIncrementalJobNode extends ProcessNode {
})
.filter(filter)
.map(mdBuilder)
.map(o -> {
updateProgressMessage(counter, null);
return o;
});
.map(o -> updateProgressMessage(o, counter, null, PROGRESS_MESSAGE_INTERVAL));
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);

View File

@ -23,7 +23,6 @@ import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.collector.CollectorService;
import eu.dnetlib.wfs.utils.MdBuilderFactory;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_collect_refresh")
public class MdCollectRefreshJobNode extends ProcessNode {
@ -50,7 +49,6 @@ public class MdCollectRefreshJobNode extends ProcessNode {
private MDStoreSqlBackend mdStoreSqlBackend;
@Override
@Transactional
protected void execute() throws Exception {
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
@ -73,10 +71,7 @@ public class MdCollectRefreshJobNode extends ProcessNode {
})
.filter(filter)
.map(mdBuilder)
.map(o -> {
updateProgressMessage(counter, null);
return o;
});
.map(o -> updateProgressMessage(o, counter, null, PROGRESS_MESSAGE_INTERVAL));
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);

View File

@ -21,7 +21,6 @@ import eu.dnetlib.errors.DnetException;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.output.oai.OaiBackend;
import jakarta.transaction.Transactional;
@WfNode("md_oai_export")
public class MdExportOaiJobNode extends ProcessNode {
@ -51,40 +50,39 @@ public class MdExportOaiJobNode extends ProcessNode {
private OaiBackend oaiBackend;
@Override
@Transactional
protected void execute() throws Exception {
if (StringUtils.isBlank(oaiSetSpec)) {
oaiSetSpec = DigestUtils.md5Hex(ds.getId()).substring(0, 8);
if (StringUtils.isBlank(this.oaiSetSpec)) {
this.oaiSetSpec = DigestUtils.md5Hex(this.ds.getId()).substring(0, 8);
}
final OaiManagerClient oaiManager = clientFactory.getClient(OaiManagerClient.class);
final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class);
final OaiManagerClient oaiManager = this.clientFactory.getClient(OaiManagerClient.class);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final OaiConfiguration conf = oaiManager.getOaiConfiguration();
final MDStoreWithInfo mdstore = mdstoreManager.findMDStore(inputMdId);
final MDStoreWithInfo mdstore = mdstoreManager.findMDStore(this.inputMdId);
if (mdstore.getFormat().equals(conf.getNativeFormat())) {
final boolean isSetRegistered = oaiManager.listSets()
.stream()
.map(OaiSet::getSetSpec)
.filter(s -> s.equals(oaiSetSpec))
.filter(s -> s.equals(this.oaiSetSpec))
.findFirst()
.isPresent();
if (!isSetRegistered) {
final OaiSet oaiSet = new OaiSet();
oaiSet.setSetSpec(oaiSetSpec);
oaiSet.setSetName(ds.getOfficialname());
oaiSet.setDescription("Publications of " + ds.getOfficialname());
oaiSet.setDsId(ds.getId());
oaiSet.setSetSpec(this.oaiSetSpec);
oaiSet.setSetName(this.ds.getOfficialname());
oaiSet.setDescription("Publications of " + this.ds.getOfficialname());
oaiSet.setDsId(this.ds.getId());
oaiManager.createOaiSet(oaiSet);
}
final MDStoreVersion inputVersion = mdstoreManager.startReading(inputMdId);
final MDStoreVersion inputVersion = mdstoreManager.startReading(this.inputMdId);
final Stream<MetadataRecord> records = mdStoreSqlBackend.streamEntries(inputVersion);
final Stream<MetadataRecord> records = this.mdStoreSqlBackend.streamEntries(inputVersion);
try {
oaiBackend.addRecords(records, oaiSetSpec, oaiRefresh);
this.oaiBackend.addRecords(records, this.oaiSetSpec, this.oaiRefresh);
} finally {
mdstoreManager.endReading(inputVersion);
}

View File

@ -66,10 +66,7 @@ public class MdIndexJobNode extends ProcessNode {
this.solrService.indexRecords(conf, this.mdStoreSqlBackend.streamEntries(inputVersion)
.map(MetadataRecord::getBody)
.map(o -> {
updateProgressMessage(counter, total);
return o;
}));
.map(o -> updateProgressMessage(o, counter, total, PROGRESS_MESSAGE_INTERVAL)));
if (!this.incremental) {
this.solrService.deleteOldRecords(conf, start);

View File

@ -26,7 +26,6 @@ import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_transform_xslt")
public class MdTransformJobNode extends ProcessNode {
@ -59,7 +58,6 @@ public class MdTransformJobNode extends ProcessNode {
private MDStoreSqlBackend mdStoreSqlBackend;
@Override
@Transactional
protected void execute() throws Exception {
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
@ -92,10 +90,7 @@ public class MdTransformJobNode extends ProcessNode {
throw new RuntimeException("Invalid record: " + record.getBody());
}
})
.map(o -> {
updateProgressMessage(counter, total);
return o;
});
.map(o -> updateProgressMessage(o, counter, total, PROGRESS_MESSAGE_INTERVAL));
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);

View File

@ -33,5 +33,9 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -6,6 +6,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
@JsonInclude(Include.NON_NULL)
public class RuntimeGraph implements Serializable {
private static final long serialVersionUID = 6472378218385082376L;

View File

@ -16,7 +16,6 @@ import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecord.MetadataRecordEncoding;
import eu.dnetlib.domain.mdstore.records.Provenance;
import eu.dnetlib.errors.MDStoreManagerException;
import jakarta.transaction.Transactional;
public class MDStoreSqlBackend implements MDStoreBackend {
@ -68,7 +67,6 @@ public class MDStoreSqlBackend implements MDStoreBackend {
}
@Override
@Transactional
public void saveRecords(final MDStoreVersion version, final Stream<MetadataRecord> inputStream) {
final String table = asTableName(version.getId());

View File

@ -27,7 +27,7 @@ import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class ProcessNode implements BeanNameAware {
private static final int PROGRESS_MESSAGE_INTERVAL = 100;
protected static final long PROGRESS_MESSAGE_INTERVAL = 100;
private static final Log log = LogFactory.getLog(ProcessNode.class);
@ -264,15 +264,15 @@ public abstract class ProcessNode implements BeanNameAware {
}
protected void updateProgressMessage(final String message) {
log.debug("* PROGRESS MESSAGE: " + message);
this.graphNode.setProgressMessage(message);
this.engine.updateRunningJob(this.process);
}
protected void updateProgressMessage(final AtomicLong counter, final Long size) {
protected <T> T updateProgressMessage(final T res, final AtomicLong counter, final Long size, final Long interval) {
final long i = counter.incrementAndGet();
if ((i % PROGRESS_MESSAGE_INTERVAL) == 0) {
if ((interval == null) || ((i % interval) == 0)) {
updateProgressMessage(i + "/" + ((size != null) ? size : "-"));
}
return res;
}
}

View File

@ -2,30 +2,25 @@ package eu.dnetlib.wfs.nodes.test;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.ProcessNode;
@WfNode("test01")
public class Test01Node extends ProcessNode {
@WfInputParam
private String message;
@WfInputParam
private int times;
@WfOutputParam
private String response;
private long times;
@Override
protected void execute() throws Exception {
System.out.println("************************");
System.out.println("* Instance: " + toString());
for (int i = 0; i < times; i++) {
System.out.println("* " + i + " - " + message);
for (long i = 0; i < this.times; i++) {
System.out.println("* " + i);
updateProgressMessage(i + "/" + this.times);
Thread.sleep(2000);
}
System.out.println("************************");
response = "I printed the message [" + message + "]";
}
}

View File

@ -10,6 +10,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
@ -25,6 +27,10 @@ import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import eu.dnetlib.common.app.ServiceStatusRegistry;
import eu.dnetlib.domain.wfs.WorkflowsConstants;
import eu.dnetlib.domain.wfs.graph.runtime.RuntimeNode;
@ -48,6 +54,8 @@ public class ProcessEngine {
private static final Log log = LogFactory.getLog(ProcessEngine.class);
private static final int MAX_RUNNING_WFS = 20;
@Autowired
private EmailSender emailSender;
@ -68,6 +76,8 @@ public class ProcessEngine {
private final Set<String> validNodeTypes = new HashSet<>();
private final ExecutorService wfsThreadPool = Executors.newFixedThreadPool(MAX_RUNNING_WFS);
@Value("${dnet.wf.procs.size:20}")
private int maxSize;
@ -142,7 +152,10 @@ public class ProcessEngine {
while (true) {
try {
tryToStartWf(this.queue.take(), ServiceStatusRegistry.getStatus().getName());
final String processId = this.queue.take();
if (tryToAccept(processId, ServiceStatusRegistry.getStatus().getName())) {
this.wfsThreadPool.execute(() -> startWorkflowJob(this.jobRepository.findById(processId).get()));
}
} catch (final Throwable e) {
log.error("Error accepting new wfs", e);
}
@ -150,20 +163,16 @@ public class ProcessEngine {
}
@Transactional
private void tryToStartWf(final String processId, final String executor) {
private boolean tryToAccept(final String processId, final String executor) {
log.debug("Trying to start " + processId);
this.jobRepository.tryAssegnment(processId, executor, JobStatus.accepted, LocalDateTime.now());
final Optional<WfRunningJob> job = this.jobRepository.findById(processId);
if (job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor)) {
startWorkflowJob(job.get());
}
return job.isPresent() && (job.get().getStatus() == JobStatus.accepted) && job.get().getWfExecutor().equals(executor);
}
@Transactional
public void startWorkflowJob(final WfRunningJob job) {
final WorkflowProcess process = new WorkflowProcess(job);
@ -351,8 +360,6 @@ public class ProcessEngine {
public void updateRunningJob(final WorkflowProcess process) {
log.debug("UPDATING JOB");
final LocalDateTime now = LocalDateTime.now();
process.getJobDetails().setLastUpdate(LocalDateTime.now());
@ -372,6 +379,17 @@ public class ProcessEngine {
process.getJobDetails().setStatus(JobStatus.running);
}
if (log.isDebugEnabled()) {
log.debug("UPDATING JOB: " + process.getId());
try {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
log.debug(mapper.writeValueAsString(process.getJobDetails()));
} catch (final JsonProcessingException e) {
log.warn(e.getMessage(), e);
}
}
this.jobRepository.save(process.getJobDetails());
}