Upgrade the execution system for the backgroundTasks:

- Submit each task immediately for execution, instead of waiting for a scheduling thread to send all gathered tasks (up to that point) to the ExecutorService (and block until they are finished, before it can start again).
- Hold the Future of each submitted task to a synchronized-list to check the result of each task at a scheduled time.
- Reduce the cpu-time to assure the Service can shut down, by checking if there are "actively" and "about-to-be-executed" tasks, at the same time. Instead of having to rely on the additional checking of the "shutdown"-status of each worker to verify that no active task exist.
- Improve the threads' shutdown procedure.
This commit is contained in:
Lampros Smyrnaios 2023-10-09 17:23:59 +03:00
parent a354da763d
commit fb2877dbe8
5 changed files with 110 additions and 52 deletions

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_controller; package eu.openaire.urls_controller;
import com.zaxxer.hikari.HikariDataSource;
import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.services.UrlsServiceImpl; import eu.openaire.urls_controller.services.UrlsServiceImpl;
@ -9,6 +10,7 @@ import io.micrometer.core.aop.TimedAspect;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -33,6 +35,9 @@ public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class); private static final Logger logger = LoggerFactory.getLogger(Application.class);
@Autowired
HikariDataSource hikariDataSource;
private static ConfigurableApplicationContext context; private static ConfigurableApplicationContext context;
@ -53,8 +58,9 @@ public class Application {
} }
public static void gentleAppShutdown() public void gentleAppShutdown()
{ {
shutdownThreads();
int exitCode = 0; int exitCode = 0;
try { try {
exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called. (the "context" will be closed automatically (I checked it)) exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called. (the "context" will be closed automatically (I checked it))
@ -65,34 +71,55 @@ public class Application {
} }
private boolean haveThreadsShutdown = false;
@PreDestroy @PreDestroy
public void preDestroy() { public void shutdownThreads()
{
// Normally this methods will have already been executed by "gentleAppShutdown()" just before the "PreDestroy" has called this method again.
// BUT, in case the service was shutdown from the OS, without the use of the "ShutdownService-API, then this will be the 1st time this method is called.
if ( haveThreadsShutdown )
return;
logger.info("Shutting down the threads.."); logger.info("Shutting down the threads..");
shutdownThreads(UrlsServiceImpl.insertsExecutor); shutdownThreadsForExecutorService(UrlsServiceImpl.insertsExecutor);
shutdownThreads(FileUtils.hashMatchingExecutor); shutdownThreadsForExecutorService(FileUtils.hashMatchingExecutor);
shutdownThreads(UrlsController.backgroundExecutor); shutdownThreadsForExecutorService(BulkImportController.bulkImportExecutor);
shutdownThreads(BulkImportController.bulkImportExecutor); shutdownThreadsForExecutorService(UrlsController.backgroundExecutor);
haveThreadsShutdown = true;
// For some reason the Hikari Datasource cannot close properly by Spring Boot, unless we explicitly call close here.
hikariDataSource.close();
logger.info("Exiting.."); logger.info("Exiting..");
} }
private void shutdownThreads(ExecutorService executorService) private void shutdownThreadsForExecutorService(ExecutorService executorService) throws RuntimeException
{ {
executorService.shutdown(); // Define that no new tasks will be scheduled. executorService.shutdown(); // Define that no new tasks will be scheduled.
try { try {
if ( ! executorService.awaitTermination(2, TimeUnit.MINUTES) ) { if ( ! executorService.awaitTermination(2, TimeUnit.MINUTES) ) {
logger.warn("The working threads did not finish on time! Stopping them immediately.."); logger.warn("The working threads did not finish on time! Stopping them immediately..");
executorService.shutdownNow(); executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled (thus terminated).
if ( ! executorService.awaitTermination(1, TimeUnit.MINUTES) )
logger.warn("The executor " + executorService + " could not be terminated!");
} }
} catch (SecurityException se) { } catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se); logger.error("Could not shutdown the threads in any way..!", se);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
try { try {
executorService.shutdownNow(); executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled (thus terminated).
if ( ! executorService.awaitTermination(1, TimeUnit.MINUTES) )
logger.warn("The executor " + executorService + " could not be terminated!");
} catch (SecurityException se) { } catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se); logger.error("Could not shutdown the threads in any way..!", se);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
} }
} }

View File

@ -24,11 +24,8 @@ import org.springframework.stereotype.Component;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -48,6 +45,9 @@ public class ScheduledTasks {
@Autowired @Autowired
UrlsServiceImpl urlsService; UrlsServiceImpl urlsService;
@Autowired
private Application application;
private final StatsController statsController; private final StatsController statsController;
private final UrlsController urlsController; private final UrlsController urlsController;
@ -87,40 +87,48 @@ public class ScheduledTasks {
@Scheduled(initialDelay = 1_800_000, fixedDelay = 900_000) // Execute this method 30 mins from the start and 15 mins after each last execution, in order for some tasks to have been gathered. @Scheduled(initialDelay = 1_800_000, fixedDelay = 900_000) // Execute this method 30 mins from the start and 15 mins after each last execution, in order for some tasks to have been gathered.
//@Scheduled(initialDelay = 60_000, fixedDelay = 20_000) // Just for testing (every 20 secs). //@Scheduled(initialDelay = 60_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
// The initial delay is larger, because we have to wait some time for at least one worker to finish retrieving the full-texts from thousands of publications, whereas, later we will have a lot of workerReports waiting to be processed. // The initial delay is larger, because we have to wait some time for at least one worker to finish retrieving the full-texts from thousands of publications, whereas, later we will have a lot of workerReports waiting to be processed.
public void executeBackgroundTasks() public void checkResultsOfBackgroundTasks()
{ {
List<Callable<Boolean>> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Deep-copy the list in order to know what was executed. int sizeOfFutures = UrlsController.futuresOfBackgroundTasks.size();
// So the items added while this execution happens, will remain in the global-list, while the others will have already been deleted. if ( sizeOfFutures == 0 )
// Also, the "Executor.invokeAll()" requires an "unchanged" list, hence the "tempList", otherwise there will be "undefined results".
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 )
return; return;
// Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-processed. logger.debug("Going to check the results of " + sizeOfFutures + " background tasks.");
for ( Callable<Boolean> selectedTask : tempList )
UrlsController.backgroundCallableTasks.remove(selectedTask);
logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed."); // Calling ".get()" on each future will block the current scheduling thread until a result is returned (if the callable task succeeded or failed).
// Execute the tasks and wait for them to finish. // So this scheduled-task will not be executed again until all current callableTasks have finished executing.
try {
List<Future<Boolean>> futures = UrlsController.backgroundExecutor.invokeAll(tempList); int numFailedTasks = 0;
int sizeOfFutures = futures.size();
for ( int i=0; i < sizeOfFutures; ++i ) { for ( int i=0; i < sizeOfFutures; ++i ) {
try { Future<Boolean> future = null;
Boolean value = futures.get(i).get(); // Get and see if an exception is thrown.. try {
// Add check for the value, if wanted.. (we don't care at the moment) future = UrlsController.futuresOfBackgroundTasks.get(i);
} catch (ExecutionException ee) { if ( ! future.get() ) // Get and see if an exception is thrown. This blocks the current thread, until the task of the future has finished.
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). numFailedTasks ++;
logger.error("Task_" + i + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage); } catch (ExecutionException ee) {
} catch (CancellationException ce) { String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + i + " was cancelled: " + ce.getMessage()); logger.error("Task_" + i + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage);
} catch (IndexOutOfBoundsException ioobe) { numFailedTasks ++;
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); } catch (CancellationException ce) {
} logger.error("Task_" + i + " was cancelled: " + ce.getMessage());
numFailedTasks ++;
} catch (InterruptedException ie) {
logger.error("Task_" + i + " was interrupted: " + ie.getMessage());
numFailedTasks ++;
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
// Only here, the "future" will be null.
} finally {
if ( future != null )
UrlsController.futuresOfBackgroundTasks.remove(future); // We do not need it anymore. This is the safest way to delete them without removing newly added futures as well.
} }
} catch (Exception e) {
logger.error("", e);
} }
if ( numFailedTasks > 0 )
logger.warn(numFailedTasks + " out of " + sizeOfFutures + " tasks have failed!");
else
logger.debug("All of the " + sizeOfFutures + " tasks have succeeded.");
} }
@ -132,9 +140,14 @@ public class ScheduledTasks {
return; // Either the service was never instructed to shut down, or the user canceled the request. return; // Either the service was never instructed to shut down, or the user canceled the request.
// Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests. // Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests.
if ( UrlsController.backgroundCallableTasks.size() > 0 ) if ( UrlsController.futuresOfBackgroundTasks.size() > 0 )
return; return;
// Here, the above may have given a result of < 0 >, but a new task may be asked for execution right next and still await for execution..
// The crawling-jobs can be safely finish, by avoiding to shut-down as long as at least one worker is still running (waiting for the Controller to verify that the assignments-batch is completed).
// The bulk-import procedures have their bulkImport DIR registered in the "bulkImportDirsUnderProcessing", before their takes is being submitted for execution.
// So the Controller will now shut down if either of takes-types have not finished.
// Check whether there are any active bulk-import procedures. // Check whether there are any active bulk-import procedures.
if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 ) if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 )
return; return;
@ -153,7 +166,7 @@ public class ScheduledTasks {
// Any left-over worker-reports are kept to be retried next time the Controller starts. // Any left-over worker-reports are kept to be retried next time the Controller starts.
Application.gentleAppShutdown(); application.gentleAppShutdown();
} }

View File

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -207,9 +208,18 @@ public class BulkImportController {
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir; String finalBulkImportDir = bulkImportDir;
String finalRelativeBulkImportDir = relativeBulkImportDir; String finalRelativeBulkImportDir = relativeBulkImportDir;
UrlsController.backgroundCallableTasks.add(() -> try {
bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) UrlsController.futuresOfBackgroundTasks.add(
); UrlsController.backgroundExecutor.submit(
() -> bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
)
);
} catch (RejectedExecutionException ree) {
errorMsg = "The bulkImport request for bulkImportReportLocation \"" + bulkImportReportLocation + "\" and provenance " + provenance + " has failed to be executed!";
bulkImportReport.addEvent(msg);
logger.error(errorMsg, ree);
return ResponseEntity.internalServerError().body(errorMsg);
}
// This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes. // This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes.
return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json". return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json".
@ -221,6 +231,9 @@ public class BulkImportController {
{ {
logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : "")); logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : ""));
// Even if the Service is set to shut down soon, we allow this endpoint to return the report up to the last minute,
// since the Service may be up for another hour, running a bulk-import procedure, for which we want to check its progress.
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.
final StringBuilder stringBuilder = new StringBuilder(25_000); final StringBuilder stringBuilder = new StringBuilder(25_000);
String line; String line;

View File

@ -21,10 +21,7 @@ import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -53,7 +50,7 @@ public class UrlsController {
public static ExecutorService backgroundExecutor; public static ExecutorService backgroundExecutor;
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); public static final List<Future<Boolean>> futuresOfBackgroundTasks = Collections.synchronizedList(new ArrayList<>());
private final String workerReportsDirPath; private final String workerReportsDirPath;
@ -187,9 +184,17 @@ public class UrlsController {
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0). // The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOfUrlReports; int finalSizeOUrlReports = sizeOfUrlReports;
UrlsController.backgroundCallableTasks.add(() -> try {
urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) UrlsController.futuresOfBackgroundTasks.add(
); backgroundExecutor.submit(
() -> urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
)
);
} catch (RejectedExecutionException ree) {
String errorMsg = "The WorkerReport from worker \"" + curWorkerId + "\" and assignments_" + curReportAssignmentsCounter + " have failed to be executed!";
logger.error(errorMsg, ree);
return ResponseEntity.internalServerError().body(errorMsg);
}
String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution."; String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution.";
logger.info(msg); logger.info(msg);

View File

@ -80,7 +80,7 @@ spring:
ansi: ansi:
enabled: always enabled: always
lifecycle: lifecycle:
timeout-per-shutdown-phase: 2m timeout-per-shutdown-phase: 5m
hdfs: hdfs: