a wf history api

This commit is contained in:
Michele Artini 2023-10-20 11:45:24 +02:00
parent c464d37659
commit 1afd9eba5c
43 changed files with 120 additions and 51 deletions

View File

@ -19,13 +19,11 @@ public class MDStoreStreamReader {
private MDStoreService mdStoreService;
private enum Status {
PREPARED,
READING,
COMPLETED,
FAILED
PREPARED, READING, COMPLETED, FAILED
}
// TODO the failure could be throw consuming the stream, so it is necessary to perform a refactoring of this method
// TODO (MEDIUM PRIORITY): CONSIDER TO IMPLEMENT A COMMON MODULE WITH WF EXECUTOR
// the failure could be throw consuming the stream, so it is necessary to perform a refactoring of this method
public Stream<MetadataRecord> prepareMDStoreStream(final String mdstoreId) throws MDStoreManagerException {
final MDStoreWithInfo mdstore = mdStoreService.findMdStore(mdstoreId);
@ -37,15 +35,12 @@ public class MDStoreStreamReader {
@Override
public boolean hasNext() {
if (innerIterator.hasNext()) {
return true;
} else {
try {
complete();
return false;
} catch (final MDStoreManagerException e) {
throw new RuntimeException("Error reading mdstore", e);
}
if (innerIterator.hasNext()) { return true; }
try {
complete();
return false;
} catch (final MDStoreManagerException e) {
throw new RuntimeException("Error reading mdstore", e);
}
}
@ -72,14 +67,14 @@ public class MDStoreStreamReader {
}
private synchronized void complete() throws MDStoreManagerException {
if (status == Status.PREPARED || status == Status.READING) {
if ((status == Status.PREPARED) || (status == Status.READING)) {
status = Status.COMPLETED;
mdStoreService.endReading(mdstoreId);
}
}
private synchronized void fail() throws MDStoreManagerException {
if (status == Status.PREPARED || status == Status.READING) {
if ((status == Status.PREPARED) || (status == Status.READING)) {
status = Status.FAILED;
mdStoreService.endReading(mdstoreId);
}

View File

@ -36,6 +36,11 @@ public class ApiController extends DnetRestController {
return wfManagerService.findHistory(total, from, to);
}
@GetMapping("/history/byDsId/{dsId}")
public List<WfHistoryEntry> historyByDsId(@PathVariable final String dsId) {
return wfManagerService.findHistoryForDsId(dsId);
}
@GetMapping("/history/byConf/{wfConfId}")
public List<WfHistoryEntry> history(@PathVariable final String wfConfId) {
return wfManagerService.findHistoryForConfiguration(wfConfId);

View File

@ -22,6 +22,8 @@ import eu.dnetlib.wfs.utils.WorkflowfProcessUtils;
@Service
public class WorkflowManagerService extends AbstractWfService {
private static final int MAX_HISTORY_SIZE = 50;
@Autowired
private WfHistoryEntryRepository wfHistoryEntryRepository;
@ -49,8 +51,15 @@ public class WorkflowManagerService extends AbstractWfService {
return wfHistoryEntryRepository.save(job);
}
public List<WfHistoryEntry> findHistoryForDsId(final String dsId) {
return wfHistoryEntryRepository.findByDsIdOrderByEndDateDesc(dsId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public List<WfHistoryEntry> findHistory(final int total, final Long from, final Long to) {
if ((from == null) && (to == null)) { return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); }
final int max = ((total > 0) || (total < MAX_HISTORY_SIZE)) ? total : MAX_HISTORY_SIZE;
if ((from == null) && (to == null)) { return wfHistoryEntryRepository.findAll(PageRequest.of(0, max, Sort.by("endDate").descending())).toList(); }
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MIN;
@ -58,11 +67,11 @@ public class WorkflowManagerService extends AbstractWfService {
.getDefault()
.toZoneId()) : LocalDateTime.MAX;
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime, PageRequest.of(0, max)).getContent();
}
public List<WfHistoryEntry> findHistoryForConfiguration(final String wfConfId) {
return wfHistoryEntryRepository.findByWfConfigurationIdOrderByEndDateDesc(wfConfId);
return wfHistoryEntryRepository.findByWfConfigurationIdOrderByEndDateDesc(wfConfId, PageRequest.of(0, MAX_HISTORY_SIZE)).getContent();
}
public WfHistoryEntry findProcessLog(final String processId) {

View File

@ -0,0 +1,81 @@
package eu.dnetlib.common.clients;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.dsm.info.AggregationInfo;
import eu.dnetlib.domain.dsm.info.AggregationStage;
import eu.dnetlib.domain.dsm.info.CollectionInfo;
import eu.dnetlib.domain.dsm.info.CollectionMode;
import eu.dnetlib.domain.dsm.info.TransformationInfo;
import eu.dnetlib.domain.wfs.JobStatus;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.errors.DsmException;
public class WfManagerClient extends DnetServiceClient {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
public List<AggregationInfo> getAggregationHistory(final String dsId) throws DsmException {
final WfHistoryEntry[] history = httpGet("/api/history/byDsId/" + URLEncoder.encode(dsId, StandardCharsets.UTF_8), WfHistoryEntry[].class);
return Arrays.stream(history)
.filter(job -> (job.getStatus() == JobStatus.success) || (job.getStatus() == JobStatus.failure))
.map(job -> {
final Map<String, String> details = job.getDetails();
final AggregationStage stage = AggregationStage.parse(details.get("system:wfName"));
final boolean success = job.getStatus() == JobStatus.success;
final String date = job.getEndDate().format(dateTimeFormatter);
final long total = success ? getNumberOfRecords(details) : 0;
return switch (stage) {
case COLLECT -> {
final CollectionInfo cInfo = new CollectionInfo();
cInfo.setAggregationStage(stage);
cInfo.setCollectionMode(getCollectionMode(details));
cInfo.setNumberOfRecords(total);
cInfo.setDate(date);
cInfo.setCompletedSuccessfully(success);
yield cInfo;
}
case TRANSFORM -> {
final TransformationInfo tInfo = new TransformationInfo();
tInfo.setAggregationStage(stage);
tInfo.setNumberOfRecords(total);
tInfo.setDate(date);
tInfo.setCompletedSuccessfully(success);
yield tInfo;
}
};
})
.collect(Collectors.toList());
}
private CollectionMode getCollectionMode(final Map<String, String> details) {
// TODO (LOW PRIORITY) : verify the terms in the history
return Optional.ofNullable(details.get("system:node:SELECT_MODE:selection"))
.map(CollectionMode::valueOf)
.orElseGet(() -> Optional.ofNullable(details.get("collectionMode"))
.map(CollectionMode::valueOf)
.orElse(null));
}
private Integer getNumberOfRecords(final Map<String, String> details) {
// TODO (LOW PRIORITY) : verify the terms in the history
final String sinkSize = details.get("mainlog:sinkSize");
final String total = details.get("mainlog:total");
if (StringUtils.isNotBlank(sinkSize)) { return NumberUtils.toInt(sinkSize); }
if (StringUtils.isNotBlank(total)) { return NumberUtils.toInt(total); }
return -1;
}
}

View File

@ -1,26 +0,0 @@
package eu.dnetlib.common.clients;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.domain.dsm.info.AggregationInfo;
import eu.dnetlib.errors.DsmException;
public class WfManagerClient extends DnetServiceClient {
private static final Log log = LogFactory.getLog(WfManagerClient.class);
private final static String LOADTIME = "loadtime";
public List<AggregationInfo> getAggregationHistory(final String dsId) throws DsmException {
// TODO (HIGH PRIORITY) Auto-generated method stub
return null;
}
public void dropCache() {
// TODO (HIGH PRIORITY) Auto-generated method stub
}
}

View File

@ -2,7 +2,7 @@ package eu.dnetlib.domain.dsm.info;
public abstract class AggregationInfo {
private int numberOfRecords;
private long numberOfRecords;
private String date;
@ -14,11 +14,11 @@ public abstract class AggregationInfo {
public AggregationInfo() {}
public int getNumberOfRecords() {
public long getNumberOfRecords() {
return numberOfRecords;
}
public void setNumberOfRecords(final int numberOfRecords) {
public void setNumberOfRecords(final long numberOfRecords) {
this.numberOfRecords = numberOfRecords;
}

View File

@ -4,6 +4,8 @@ import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
@ -13,15 +15,18 @@ import eu.dnetlib.domain.wfs.WfHistoryEntry;
public interface WfHistoryEntryRepository extends JpaRepository<WfHistoryEntry, String> {
List<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end);
Page<WfHistoryEntry> findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end, Pageable pageable);
List<WfHistoryEntry> findByWfConfigurationIdOrderByEndDateDesc(String id);
Page<WfHistoryEntry> findByWfConfigurationIdOrderByEndDateDesc(String id, Pageable pageable);
Optional<WfHistoryEntry> findFirstByWfConfigurationIdOrderByEndDateDesc(String id);
Page<WfHistoryEntry> findByDsIdOrderByEndDateDesc(String dsId, Pageable pageable);
List<WfHistoryEntry> findByStatus(JobStatus status);
Optional<WfHistoryEntry> findFirstByWfConfigurationIdOrderByEndDateDesc(String id);
@Modifying
@Query(value = "update WfHistoryEntry set wfExecutor = ?2 where processId = ?1 and wfExecutor is NULL")
void tryAssegnment(String id, String workerId);
}