From fb2877dbe895c3f31b37969223f23645ebdb9934 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 9 Oct 2023 17:23:59 +0300 Subject: [PATCH] Upgrade the execution system for the backgroundTasks: - Submit each task immediately for execution, instead of waiting for a scheduling thread to send all gathered tasks (up to that point) to the ExecutorService (and block until they are finished, before it can start again). - Hold the Future of each submitted task to a synchronized-list to check the result of each task at a scheduled time. - Reduce the cpu-time to assure the Service can shut down, by checking if there are "actively" and "about-to-be-executed" tasks, at the same time. Instead of having to rely on the additional checking of the "shutdown"-status of each worker to verify that no active task exist. - Improve the threads' shutdown procedure. --- .../openaire/urls_controller/Application.java | 41 ++++++++-- .../components/ScheduledTasks.java | 79 +++++++++++-------- .../controllers/BulkImportController.java | 19 ++++- .../controllers/UrlsController.java | 21 +++-- src/main/resources/application.yml | 2 +- 5 files changed, 110 insertions(+), 52 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 8710641..d072610 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,5 +1,6 @@ package eu.openaire.urls_controller; +import com.zaxxer.hikari.HikariDataSource; import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.services.UrlsServiceImpl; @@ -9,6 +10,7 @@ import io.micrometer.core.aop.TimedAspect; import io.micrometer.core.instrument.MeterRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -33,6 +35,9 @@ public class Application { private static final Logger logger = LoggerFactory.getLogger(Application.class); + @Autowired + HikariDataSource hikariDataSource; + private static ConfigurableApplicationContext context; @@ -53,8 +58,9 @@ public class Application { } - public static void gentleAppShutdown() + public void gentleAppShutdown() { + shutdownThreads(); int exitCode = 0; try { exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called. (the "context" will be closed automatically (I checked it)) @@ -65,34 +71,55 @@ public class Application { } + private boolean haveThreadsShutdown = false; + @PreDestroy - public void preDestroy() { + public void shutdownThreads() + { + // Normally this methods will have already been executed by "gentleAppShutdown()" just before the "PreDestroy" has called this method again. + // BUT, in case the service was shutdown from the OS, without the use of the "ShutdownService-API, then this will be the 1st time this method is called. + if ( haveThreadsShutdown ) + return; + logger.info("Shutting down the threads.."); - shutdownThreads(UrlsServiceImpl.insertsExecutor); - shutdownThreads(FileUtils.hashMatchingExecutor); - shutdownThreads(UrlsController.backgroundExecutor); - shutdownThreads(BulkImportController.bulkImportExecutor); + shutdownThreadsForExecutorService(UrlsServiceImpl.insertsExecutor); + shutdownThreadsForExecutorService(FileUtils.hashMatchingExecutor); + shutdownThreadsForExecutorService(BulkImportController.bulkImportExecutor); + shutdownThreadsForExecutorService(UrlsController.backgroundExecutor); + + haveThreadsShutdown = true; + + // For some reason the Hikari Datasource cannot close properly by Spring Boot, unless we explicitly call close here. + hikariDataSource.close(); logger.info("Exiting.."); } - private void shutdownThreads(ExecutorService executorService) + private void shutdownThreadsForExecutorService(ExecutorService executorService) throws RuntimeException { executorService.shutdown(); // Define that no new tasks will be scheduled. try { if ( ! executorService.awaitTermination(2, TimeUnit.MINUTES) ) { logger.warn("The working threads did not finish on time! Stopping them immediately.."); executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled (thus terminated). + if ( ! executorService.awaitTermination(1, TimeUnit.MINUTES) ) + logger.warn("The executor " + executorService + " could not be terminated!"); } } catch (SecurityException se) { logger.error("Could not shutdown the threads in any way..!", se); } catch (InterruptedException ie) { try { executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled (thus terminated). + if ( ! executorService.awaitTermination(1, TimeUnit.MINUTES) ) + logger.warn("The executor " + executorService + " could not be terminated!"); } catch (SecurityException se) { logger.error("Could not shutdown the threads in any way..!", se); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } } diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 6899143..489fb0e 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -24,11 +24,8 @@ import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -48,6 +45,9 @@ public class ScheduledTasks { @Autowired UrlsServiceImpl urlsService; + @Autowired + private Application application; + private final StatsController statsController; private final UrlsController urlsController; @@ -87,40 +87,48 @@ public class ScheduledTasks { @Scheduled(initialDelay = 1_800_000, fixedDelay = 900_000) // Execute this method 30 mins from the start and 15 mins after each last execution, in order for some tasks to have been gathered. //@Scheduled(initialDelay = 60_000, fixedDelay = 20_000) // Just for testing (every 20 secs). // The initial delay is larger, because we have to wait some time for at least one worker to finish retrieving the full-texts from thousands of publications, whereas, later we will have a lot of workerReports waiting to be processed. - public void executeBackgroundTasks() + public void checkResultsOfBackgroundTasks() { - List> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Deep-copy the list in order to know what was executed. - // So the items added while this execution happens, will remain in the global-list, while the others will have already been deleted. - // Also, the "Executor.invokeAll()" requires an "unchanged" list, hence the "tempList", otherwise there will be "undefined results". - int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed. - if ( numOfTasks == 0 ) + int sizeOfFutures = UrlsController.futuresOfBackgroundTasks.size(); + if ( sizeOfFutures == 0 ) return; - // Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-processed. - for ( Callable selectedTask : tempList ) - UrlsController.backgroundCallableTasks.remove(selectedTask); + logger.debug("Going to check the results of " + sizeOfFutures + " background tasks."); - logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed."); - // Execute the tasks and wait for them to finish. - try { - List> futures = UrlsController.backgroundExecutor.invokeAll(tempList); - int sizeOfFutures = futures.size(); - for ( int i=0; i < sizeOfFutures; ++i ) { - try { - Boolean value = futures.get(i).get(); // Get and see if an exception is thrown.. - // Add check for the value, if wanted.. (we don't care at the moment) - } catch (ExecutionException ee) { - String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). - logger.error("Task_" + i + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage); - } catch (CancellationException ce) { - logger.error("Task_" + i + " was cancelled: " + ce.getMessage()); - } catch (IndexOutOfBoundsException ioobe) { - logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); - } + // Calling ".get()" on each future will block the current scheduling thread until a result is returned (if the callable task succeeded or failed). + // So this scheduled-task will not be executed again until all current callableTasks have finished executing. + + int numFailedTasks = 0; + + for ( int i=0; i < sizeOfFutures; ++i ) { + Future future = null; + try { + future = UrlsController.futuresOfBackgroundTasks.get(i); + if ( ! future.get() ) // Get and see if an exception is thrown. This blocks the current thread, until the task of the future has finished. + numFailedTasks ++; + } catch (ExecutionException ee) { + String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). + logger.error("Task_" + i + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage); + numFailedTasks ++; + } catch (CancellationException ce) { + logger.error("Task_" + i + " was cancelled: " + ce.getMessage()); + numFailedTasks ++; + } catch (InterruptedException ie) { + logger.error("Task_" + i + " was interrupted: " + ie.getMessage()); + numFailedTasks ++; + } catch (IndexOutOfBoundsException ioobe) { + logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); + // Only here, the "future" will be null. + } finally { + if ( future != null ) + UrlsController.futuresOfBackgroundTasks.remove(future); // We do not need it anymore. This is the safest way to delete them without removing newly added futures as well. } - } catch (Exception e) { - logger.error("", e); } + + if ( numFailedTasks > 0 ) + logger.warn(numFailedTasks + " out of " + sizeOfFutures + " tasks have failed!"); + else + logger.debug("All of the " + sizeOfFutures + " tasks have succeeded."); } @@ -132,9 +140,14 @@ public class ScheduledTasks { return; // Either the service was never instructed to shut down, or the user canceled the request. // Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests. - if ( UrlsController.backgroundCallableTasks.size() > 0 ) + if ( UrlsController.futuresOfBackgroundTasks.size() > 0 ) return; + // Here, the above may have given a result of < 0 >, but a new task may be asked for execution right next and still await for execution.. + // The crawling-jobs can be safely finish, by avoiding to shut-down as long as at least one worker is still running (waiting for the Controller to verify that the assignments-batch is completed). + // The bulk-import procedures have their bulkImport DIR registered in the "bulkImportDirsUnderProcessing", before their takes is being submitted for execution. + // So the Controller will now shut down if either of takes-types have not finished. + // Check whether there are any active bulk-import procedures. if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 ) return; @@ -153,7 +166,7 @@ public class ScheduledTasks { // Any left-over worker-reports are kept to be retried next time the Controller starts. - Application.gentleAppShutdown(); + application.gentleAppShutdown(); } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 41a672e..592e656 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -207,9 +208,18 @@ public class BulkImportController { // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). String finalBulkImportDir = bulkImportDir; String finalRelativeBulkImportDir = relativeBulkImportDir; - UrlsController.backgroundCallableTasks.add(() -> - bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) - ); + try { + UrlsController.futuresOfBackgroundTasks.add( + UrlsController.backgroundExecutor.submit( + () -> bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) + ) + ); + } catch (RejectedExecutionException ree) { + errorMsg = "The bulkImport request for bulkImportReportLocation \"" + bulkImportReportLocation + "\" and provenance " + provenance + " has failed to be executed!"; + bulkImportReport.addEvent(msg); + logger.error(errorMsg, ree); + return ResponseEntity.internalServerError().body(errorMsg); + } // This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes. return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json". @@ -221,6 +231,9 @@ public class BulkImportController { { logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : "")); + // Even if the Service is set to shut down soon, we allow this endpoint to return the report up to the last minute, + // since the Service may be up for another hour, running a bulk-import procedure, for which we want to check its progress. + // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. final StringBuilder stringBuilder = new StringBuilder(25_000); String line; diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 5c05baf..61380a4 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -21,10 +21,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -53,7 +50,7 @@ public class UrlsController { public static ExecutorService backgroundExecutor; - public static final List> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); + public static final List> futuresOfBackgroundTasks = Collections.synchronizedList(new ArrayList<>()); private final String workerReportsDirPath; @@ -187,9 +184,17 @@ public class UrlsController { // The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0). int finalSizeOUrlReports = sizeOfUrlReports; - UrlsController.backgroundCallableTasks.add(() -> - urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) - ); + try { + UrlsController.futuresOfBackgroundTasks.add( + backgroundExecutor.submit( + () -> urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) + ) + ); + } catch (RejectedExecutionException ree) { + String errorMsg = "The WorkerReport from worker \"" + curWorkerId + "\" and assignments_" + curReportAssignmentsCounter + " have failed to be executed!"; + logger.error(errorMsg, ree); + return ResponseEntity.internalServerError().body(errorMsg); + } String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution."; logger.info(msg); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0abbd4e..4527b7f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -80,7 +80,7 @@ spring: ansi: enabled: always lifecycle: - timeout-per-shutdown-phase: 2m + timeout-per-shutdown-phase: 5m hdfs: