
422 lines
23 KiB
Raw Normal View History

2021-03-16 14:25:15 +01:00
package eu.openaire.urls_controller.components;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.StatsController;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import io.micrometer.core.instrument.MeterRegistry;
2021-03-16 14:25:15 +01:00
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
2021-03-16 14:25:15 +01:00
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
2021-03-16 14:25:15 +01:00
public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
FileUtils fileUtils;
2021-03-16 14:25:15 +01:00
UrlsServiceImpl urlsService;
private final StatsController statsController;
private final UrlsController urlsController;
private int assignmentsLimit;
private final String workerReportsDirPath;
public static final AtomicInteger numOfAllPayloads = new AtomicInteger(0);
public static final AtomicInteger numOfPayloadsAggregatedByServiceThroughCrawling = new AtomicInteger(0);
public static final AtomicInteger numOfPayloadsAggregatedByServiceThroughBulkImport = new AtomicInteger(0);
public static final AtomicInteger numOfPayloadsAggregatedByService = new AtomicInteger(0);
public static final AtomicInteger numOfLegacyPayloads = new AtomicInteger(0);
public static final AtomicInteger numOfRecordsInspectedByServiceThroughCrawling = new AtomicInteger(0);
public ScheduledTasks(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath, StatsController statsController, UrlsController urlsController, MeterRegistry registry)
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
this.statsController = statsController;
this.urlsController = urlsController;
jsonStringBuilder = new StringBuilder(assignmentsLimit * 500);
registry.gauge("numOfAllPayloads", numOfAllPayloads);
registry.gauge("numOfPayloadsAggregatedByServiceThroughCrawling", numOfPayloadsAggregatedByServiceThroughCrawling);
registry.gauge("numOfPayloadsAggregatedByServiceThroughBulkImport", numOfPayloadsAggregatedByServiceThroughBulkImport);
registry.gauge("numOfPayloadsAggregatedByService", numOfPayloadsAggregatedByService);
registry.gauge("numOfLegacyPayloads", numOfLegacyPayloads);
registry.gauge("numOfRecordsInspectedByServiceThroughCrawling", numOfRecordsInspectedByServiceThroughCrawling);
@Scheduled(initialDelay = 1_800_000, fixedDelay = 900_000) // Execute this method 30 mins from the start and 15 mins after each last execution, in order for some tasks to have been gathered.
//@Scheduled(initialDelay = 60_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
// The initial delay is larger, because we have to wait some time for at least one worker to finish retrieving the full-texts from thousands of publications, whereas, later we will have a lot of workerReports waiting to be processed.
public void executeBackgroundTasks()
2023-05-29 11:21:48 +02:00
List<Callable<Boolean>> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Copy the list in order to know what was executed.
// So the items added while this execution happens, will remain in the global-list, while the others will have already been deleted.
// Also, the "Executor.invokeAll()" requires an "unchanged" list, otherwise there will be "undefined results".
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 )
// Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-processed.
for ( Callable<Boolean> selectedTask : tempList )
2023-05-29 11:21:48 +02:00
logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed.");
// Execute the tasks and wait for them to finish.
try {
2023-05-29 11:21:48 +02:00
List<Future<Boolean>> futures = UrlsController.backgroundExecutor.invokeAll(tempList);
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {
Boolean value = futures.get(i).get(); // Get and see if an exception is thrown..
// Add check for the value, if wanted.. (we don't care at the moment)
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
} catch (Exception e) {
logger.error("", e);
2021-03-16 14:25:15 +01:00
@Scheduled(initialDelay = 600_000, fixedDelay = 7_200_000) // Check every 2 hours. The initial delay is 10 minutes, to allow to shut down quickly in case of problem when starting, but also account for the initial communication with the Workers, where a problem may appear.
//@Scheduled(initialDelay = 60_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
public void checkIfServiceIsReadyForShutdown()
if ( ! ShutdownController.shouldShutdownService )
return; // Either the service was never instructed to shut down, or the user canceled the request.
// Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests.
if ( UrlsController.backgroundCallableTasks.size() > 0 )
Set<String> workerIds = UrlsController.workersInfoMap.keySet();
if ( workerIds.size() > 0 ) {
for ( String workerId : workerIds )
if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value.
return; // If at least 1 worker is still active, then do not shut down the Controller.
logger.info("All workers have already shutdown. Shutting down the Controller..");
} else
logger.info("No workers have participated in the service yet, so the Controller will shut-down immediately.");
// If one worker has crashed, then it will have not informed the Controller. So the controller will think that it is still running and will not shut down.
// Any left-over worker-reports are kept to be retried next time the Controller starts.
private boolean isTestEnvironment;
@Scheduled(initialDelay = 43_200_000, fixedDelay = Long.MAX_VALUE) // Run 12 hours after startup (in order for all workers to be back online) and then never again.
//@Scheduled(initialDelay = 420_000, fixedDelay = Long.MAX_VALUE) // Just for testing (7 mins after startup, only once).
public void checkAndProcessOldAndUnsuccessfulWorkerReports()
// We make sure an initial delay of some minutes is in place before this is executed, since we have to make sure all workers are up and running in order for them to be able to answer the full-texts-requests.
if ( UrlsController.numOfWorkers.get() == 0 ) {
long timeToWait = (isTestEnvironment ? 1_200_000 : 43_200_000); // 10 mins | 12 hours
logger.warn("None of the workers have participated in the service yet. Will wait " + ((timeToWait /1000) /60) + " minutes and try again..");
try {
} catch (InterruptedException ie) {
logger.warn("The wait-period was interrupted! Will try either way.");
if ( UrlsController.numOfWorkers.get() == 0 ) {
logger.error("None of the workers have participated in the service yet again. Will not process any leftover workerReports!");
// In case the Controller processes older, failed workerReports, it may be the case that the worker to which the report comes from, is not currently online.
// So its IP won't be stored in the map. Even if we have its IP in the report itself, it's of no use if the worker is offline, since we cannot get the full-texts.
// We do not care for attempts, if the payloads are not there.
// In case a worker-report among those "left-overs" fails again, then it will just be removed by the scheduler, when 7 days pass, and it's still there.
@Scheduled(initialDelay = 86_400_000, fixedDelay = 86_400_000) // Run after one day, every day.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndProcessRecentUnsuccessfulWorkerReports()
// After some hours from their failure and before the worker delete the full-texts as obsolete, we re-try these workerReports,
// hopping that any previous error was temporary.
@Scheduled(initialDelay = 604_800_000, fixedDelay = 604_800_000) // Run every 7 days.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteOldWorkerReports()
// The failed workerReports are kept for 7 days, for manual debugging purposes. Even if the service does not make more than 2 attempts to import them.
// Scheduled Metrics for Prometheus.
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.
@Scheduled(fixedDelay = 21_600_000) // Every 6 hours run the following queries to the database and register the metric.
//@Scheduled(initialDelay = 60_000, fixedDelay = 1_200_000) // For general testing only.
//@Scheduled(initialDelay = 60_000, fixedDelay = 120_000) // For debug testing only.
public void updatePrometheusMetrics()
ResponseEntity<?> responseEntity = statsController.getNumberOfAllPayloads(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfAllPayloads.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughCrawling(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfPayloadsAggregatedByServiceThroughCrawling.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfPayloadsAggregatedByServiceThroughBulkImport(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfPayloadsAggregatedByServiceThroughBulkImport.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfPayloadsAggregatedByService(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfPayloadsAggregatedByService.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfLegacyPayloads(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfLegacyPayloads.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
responseEntity = statsController.getNumberOfRecordsInspectedByServiceThroughCrawling(true);
if ( responseEntity.getStatusCode().value() == 200 ) {
numOfRecordsInspectedByServiceThroughCrawling.set(Integer.parseInt(responseEntity.getBody().toString())); // (any other cast method fails)
} // Any error is already logged.
// TODO - Export more complex data; <numOfAllPayloadsPerDatasource>, <numOfAllPayloadsPerYear>,
// <numOfAggregatedPayloadsPerDatasource>, ..., <numOfBulkImportedPayloadsPerDatasource>, ...
enum ActionForWorkerReports {process_previous_failed, process_current_failed, delete_old}
// TODO - Maybe make these numbers configurable from the "application.yml" file.
private static final double daysToWaitBeforeDeletion = 7.0;
private static final double daysToWaitBeforeProcessing = 0.5; // 12 hours
private static final double maxDaysToAllowProcessing = 1.9; // 45.6 hours
// The Workers wait at most 48 hours before deleting the full-text files. So there is no point to try and process the report after that time-frame.
// These reports will have to wait a bit for the scheduler to assign them to threads, before actually being processed.
private static final Gson gson = new Gson(); // This is "transient" by default.
private static StringBuilder jsonStringBuilder = null;
private static final int daysDivisor = (1000 * 60 * 60 * 24); // In order to get the time-difference in days. We divide with: /1000 to get seconds, /60 to get minutes, /60 to get hours and /24 to get days.
private void inspectWorkerReportsAndTakeAction(ActionForWorkerReports actionForWorkerReports)
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed )
logger.debug("Going to check and process any unsuccessful workerReports from the previous run.");
else if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed )
logger.debug("Going to check and process any unsuccessful workerReports which are between " + daysToWaitBeforeProcessing + " and " + daysToWaitBeforeDeletion + " days old (inclusive).");
logger.debug("Going to check and remove any leftover workerReports, which are more than " + daysToWaitBeforeDeletion + " days old.");
// The failed workerReports are kept for 7 days, for manual debugging purposes. Even if the service does not make more than 2 attempts to import them.
try {
File[] workerReportSubDirs = getWorkerReportSubDirs();
if ( workerReportSubDirs == null )
long currentTime = 0L;
if ( actionForWorkerReports != ActionForWorkerReports.process_previous_failed )
currentTime = System.currentTimeMillis();
int numFilesHandled = 0;
for ( File workerReportSubDir : workerReportSubDirs )
File[] workerReportFiles = workerReportSubDir.listFiles(File::isFile);
if (workerReportFiles == null) {
logger.error("There was an error when getting the workerReports of \"workerReportSubDir\": " + workerReportSubDir);
} else if (workerReportFiles.length == 0) {
logger.debug("The \"workerReportsDir\" is empty, so there is nothing to take action on.");
for ( File workerReportFile : workerReportFiles )
String workerReportName = workerReportFile.getName();
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed ) {
if ( workerReportName.contains("failed") ) {
processWorkerReport(workerReportFile, workerReportName);
numFilesHandled ++;
} else {
long lastModified = workerReportFile.lastModified();
if ( logger.isTraceEnabled() )
logger.trace("The workerReport \"" + workerReportName + "\" was last accessed in: " + new Date(lastModified));
double elapsedDays = (double) (currentTime - lastModified) / daysDivisor;
if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed ) {
if ( (elapsedDays >= daysToWaitBeforeProcessing) && (elapsedDays <= maxDaysToAllowProcessing)
&& workerReportName.contains("failed") ) {
processWorkerReport(workerReportFile, workerReportName);
numFilesHandled ++;
} else { // Deletion..
if ( elapsedDays > daysToWaitBeforeDeletion ) {
// Enough time has passed, the directory should be deleted immediately.
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
numFilesHandled ++;
if ( workerReportName.contains("failed") ) // (For the successful, they have already been deleted)
}// end reports loop
}// end sub-dirs loop
logger.debug("The action \"" + actionForWorkerReports.toString() + "\" was imposed to " + numFilesHandled + " workerReports.");
} catch (Exception e) {
logger.error("", e);
private File[] getWorkerReportSubDirs() throws Exception
File workerReportsDir = new File(workerReportsDirPath);
if ( !workerReportsDir.isDirectory() ) {
logger.error("The \"workerReportsDir\" (" + workerReportsDirPath + ") does not exist!"); // This base dir should always exist!
return null;
// The worker reports are inside "worker_X" sub-dirs.
File[] workerReportSubDirs = workerReportsDir.listFiles(File::isDirectory);
if ( workerReportSubDirs == null ) {
logger.error("There was an error when getting the subDirs of \"workerReportsDir\": " + workerReportsDir);
return null;
} else if ( workerReportSubDirs.length == 0 ) { // The worker_X sub-dirs do not normally get deleted. So this is a warning that either no workers are running (wasting time) or that something bad happened to thios directories.
logger.warn("The \"workerReportsDir\" is empty. None of the workers returned a workerReport, so there is nothing to process.");
return null;
} else
return workerReportSubDirs;
private void processWorkerReport(File workerReportFile, String workerReportName)
logger.debug("Going to load and parse the workerReport: " + workerReportName);
// Load the file's json content into a "WorkerReport" object.
try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile)) ) { // The default size is sufficient here.
} catch (Exception e) {
logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\"");
WorkerReport workerReport = null;
try {
workerReport = gson.fromJson(jsonStringBuilder.toString(), WorkerReport.class);
} catch (JsonSyntaxException jse) {
logger.error("Problem when parsing the workerReport \"" + workerReportName + "\": " + jse.getMessage());
this.urlsController.addWorkerReport(workerReport); // This will check and add the workerReport to the background jobs' scheduler.
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.
private static final Pattern ASSIGNMENTS_COUNTER_REPORT_FILTER = Pattern.compile(".*([\\d]+)_report[\\w]*.json$");
private void extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(String workerReportName)
// We need to delete the records from the "assignment" table, in order for them to be retried in the future.
// Take the counter from workerReportName.
Matcher matcher = ASSIGNMENTS_COUNTER_REPORT_FILTER.matcher(workerReportName);
if ( ! matcher.matches() ) {
logger.error("Could not match the report \"" + workerReportName + "\" with regex: " + ASSIGNMENTS_COUNTER_REPORT_FILTER);
String counterString = matcher.group(1);
if ( (counterString == null) || counterString.isEmpty() ) {
logger.error("Could not extract the \"assignmentCounter\" from report: " + workerReportName);
int curReportAssignmentsCounter;
try {
curReportAssignmentsCounter = Integer.parseInt(counterString);
} catch (NumberFormatException nfe) {
logger.error("Could not parse the \"curReportAssignmentsCounter\" (" + counterString + ") which was extracted from report: " + workerReportName);
urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside.