- Expose the "numOfAllPayloads" and "numOfInspectedRecords" DB-stats to Prometheus, by using a scheduling task to request the numbers from the DB, every 6 hours.

- Update the "StatsServiceImpl.getNumberOfPayloadsAggregatedByService()" to use the new table "payload_aggregated", instead of casting and checking the date of the records.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-06-19 14:42:00 +03:00
parent 798fa09d68
commit b9712bed85
3 changed files with 55 additions and 7 deletions

View File

@ -2,13 +2,16 @@ package eu.openaire.urls_controller.components;
import eu.openaire.urls_controller.Application; import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.controllers.ShutdownController; import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.StatsController;
import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.GenericUtils;
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -21,6 +24,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@Component @Component
@ -31,16 +35,26 @@ public class ScheduledTasks {
@Autowired @Autowired
FileUtils fileUtils; FileUtils fileUtils;
private StatsController statsController;
private final String workerReportsDirPath; private final String workerReportsDirPath;
AtomicInteger numOfAllPayloads = new AtomicInteger(0);
AtomicInteger numOfInspectedRecords = new AtomicInteger(0);
public ScheduledTasks(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath)
public ScheduledTasks(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath, StatsController statsController, MeterRegistry registry)
{ {
if ( !workerReportsDirPath.endsWith("/") ) if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/"; workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later. this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
this.statsController = statsController;
registry.gauge("numOfAllPayloads", numOfAllPayloads);
registry.gauge("numOfInspectedRecords", numOfInspectedRecords);
} }
@ -152,4 +166,27 @@ public class ScheduledTasks {
} }
} }
// Scheduled Metrics for Prometheus.
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.
@Scheduled(fixedDelay = 21_600_000) // Every 6 hours run thw following queries to the database and register the metric.
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For testing only.
public void updatePrometheusMetrics()
{
ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfAllPayloads.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfRecordsInspected(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfInspectedRecords.set(Integer.valueOf(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
// TODO - Export more complex data; <numOfAllPayloadsPerDatasource>, <numOfAllPayloadsPerYear>,
// <numOfAggregatedPayloadsPerDatasource>, ..., <numOfBulkImportedPayloadsPerDatasource>, ...
}
} }

View File

@ -30,8 +30,11 @@ public class StatsController {
* This includes the payloads created by other pieces of software, before the PDF-Aggregation-Service was created. * This includes the payloads created by other pieces of software, before the PDF-Aggregation-Service was created.
* */ * */
@GetMapping("getNumberOfAllPayloads") @GetMapping("getNumberOfAllPayloads")
public ResponseEntity<?> getNumberOfAllPayloads() { public ResponseEntity<?> getNumberOfAllPayloads(boolean isCalledFromScheduler)
logger.info("Received a \"getNumberOfAllPayloads\" request."); {
if ( ! isCalledFromScheduler )
logger.info("Received a \"getNumberOfAllPayloads\" request.");
final String getPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload"; final String getPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload";
return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "payloads"); return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "payloads");
} }
@ -44,7 +47,7 @@ public class StatsController {
@GetMapping("getNumberOfPayloadsAggregatedByService") @GetMapping("getNumberOfPayloadsAggregatedByService")
public ResponseEntity<?> getNumberOfPayloadsAggregatedByService() { public ResponseEntity<?> getNumberOfPayloadsAggregatedByService() {
logger.info("Received a \"getNumberOfPayloadsAggregatedByService\" request."); logger.info("Received a \"getNumberOfPayloadsAggregatedByService\" request.");
String getPayloadsAggregatedQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload where `date` >= cast(cast('2021-01-01' as timestamp) as bigint)"; String getPayloadsAggregatedQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_aggregated";
return statsService.getNumberOfPayloads(getPayloadsAggregatedQuery, "payloads retrieved by the PDF Aggregation Service"); return statsService.getNumberOfPayloads(getPayloadsAggregatedQuery, "payloads retrieved by the PDF Aggregation Service");
} }
@ -86,6 +89,13 @@ public class StatsController {
// TODO - Add an endpoint to return the number of payloads found for each publication-year, in descending order..
// For example the number of payloads for publications published in 2016 is <number>
// --//-- the number for 2017 is <number>
// Add a "limit" parameter for the user to specify that wants only the last 5 years (2019-2023)
/** /**
* This endpoint returns the total number of distinct full-text files existing in the database. * This endpoint returns the total number of distinct full-text files existing in the database.
* */ * */
@ -101,8 +111,11 @@ public class StatsController {
* This endpoint returns the number of records inspected by the PDF-Aggregation-Service. * This endpoint returns the number of records inspected by the PDF-Aggregation-Service.
* */ * */
@GetMapping("getNumberOfRecordsInspected") @GetMapping("getNumberOfRecordsInspected")
public ResponseEntity<?> getNumberOfRecordsInspected() public ResponseEntity<?> getNumberOfRecordsInspected(boolean isCalledFromScheduler)
{ {
if ( ! isCalledFromScheduler )
logger.info("Received a \"getNumberOfRecordsInspected\" request.");
return statsService.getNumberOfRecordsInspected(); return statsService.getNumberOfRecordsInspected();
} }

View File

@ -48,8 +48,6 @@ public class StatsServiceImpl implements StatsService {
// So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records. // So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records.
// Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table. // Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table.
logger.info("Received a \"getNumberOfRecordsInspected\" request.");
final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + ImpalaConnector.databaseName + ".attempt) as dist"; final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + ImpalaConnector.databaseName + ".attempt) as dist";
try { try {
Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class); Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class);