From 0ea3e2de24f1d37d78948b9dfbfdd3655b6aa2b2 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Wed, 24 May 2023 13:42:29 +0300 Subject: [PATCH] Add the "shutdownService" and "cancelShutdownService" endpoints. The Controller sends the related requests to the Workers and shutdowns gracefully, after all workers have shutdown. --- .../openaire/urls_controller/Application.java | 18 ++- .../components/ScheduledTasks.java | 20 ++++ .../controllers/ShutdownController.java | 106 ++++++++++++++++++ .../controllers/UrlsController.java | 9 ++ .../services/ShutdownService.java | 13 +++ .../services/ShutdownServiceImpl.java | 75 +++++++++++++ src/main/resources/application.yml | 4 + 7 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java create mode 100644 src/main/java/eu/openaire/urls_controller/services/ShutdownService.java create mode 100644 src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 5fc82bd..20579e7 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -10,6 +10,7 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment; import org.springframework.scheduling.annotation.EnableScheduling; @@ -29,8 +30,11 @@ public class Application { private static final Logger logger = LoggerFactory.getLogger(Application.class); + private static ConfigurableApplicationContext context; + + public static void main(String[] args) { - SpringApplication.run(Application.class, args); + context = SpringApplication.run(Application.class, args); } @Bean @@ -46,6 +50,18 @@ public class Application { } + public static void gentleAppShutdown() + { + int exitCode = 0; + try { + exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called. (the "context" will be closed automatically (I checked it)) + } catch (IllegalArgumentException iae) { + logger.error(iae.getMessage()); // This will say "Context must not be null", in case the "gentleAppShutdown()" was called too early in the app's lifetime. But it's ok. + } + System.exit(exitCode); + } + + @PreDestroy public void preDestroy() { logger.info("Shutting down the threads.."); diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index b3bd0bb..65bf762 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -1,5 +1,8 @@ package eu.openaire.urls_controller.components; +import eu.openaire.urls_controller.Application; +import eu.openaire.urls_controller.controllers.ShutdownController; +import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; @@ -56,4 +59,21 @@ public class ScheduledTasks { } } + + @Scheduled(fixedDelay = 7_200_000) // Check every 2 hours. + //@Scheduled(fixedDelay = 20_000) // Just for testing (every 20 secs). + public void checkIfServiceIsReadyForShutdown() + { + if ( ! ShutdownController.shouldShutdownService ) + return; // Either the service was never instructed to shut down, or the user canceled the request. + + for ( String workerId : UrlsController.workersInfoMap.keySet() ) { + if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value. + return; // If at least 1 worker is still active, then do not shut down the server. + } + + logger.info("All workers have already shutdown. Shutting down the Controller.."); + Application.gentleAppShutdown(); + } + } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java new file mode 100644 index 0000000..6276dcf --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java @@ -0,0 +1,106 @@ +package eu.openaire.urls_controller.controllers; + +import eu.openaire.urls_controller.models.WorkerInfo; +import eu.openaire.urls_controller.services.ShutdownService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.servlet.http.HttpServletRequest; + + +@RestController +@RequestMapping("") +public class ShutdownController { + + private static final Logger logger = LoggerFactory.getLogger(ShutdownController.class); + + @Autowired + ShutdownService shutdownService; + + public static boolean shouldShutdownService = false; + + + @PostMapping("shutdownService") + public ResponseEntity shutdownServiceGracefully(HttpServletRequest request) + { + String initMsg = "Received a \"shutdownService\" request. "; + ResponseEntity responseEntity = shutdownService.passSecurityChecks(request, initMsg); + if ( responseEntity != null ) + return responseEntity; + + String finalMsg = ""; + if ( shouldShutdownService ) + finalMsg = "The controller has already received a \"shutdownService\" (which was not canceled afterwards)."; + else { + shouldShutdownService = true; + // Send "shutdownWorker" requests to all Workers. + for ( String workerId : UrlsController.workersInfoMap.keySet() ) + shutdownService.postShutdownOrCancelRequestToWorker(workerId, UrlsController.workersInfoMap.get(workerId).getWorkerIP(), false); + + // That's it for now. The workers may take some hours to finish their work (including delivering the full-text files). + // TODO - Add a scheduler to monitor the "HasShutdown" values for all workers. + // TODO - Once all have the value "true", gently shutdown the Controller, just like we do for the worker. + // TODO - Each worker, upon "shutdown" should send a "workerShutdownReport" to the Controller. + } + + finalMsg += "The service will shutdown, after finishing current work."; + logger.info(initMsg + finalMsg); + return ResponseEntity.ok().body(finalMsg + "\n"); + } + + + @PostMapping("cancelShutdownService") + public ResponseEntity cancelShutdownServiceGracefully(HttpServletRequest request) + { + String initMsg = "Received a \"cancelShutdownService\" request. "; + ResponseEntity responseEntity = shutdownService.passSecurityChecks(request, initMsg); + if ( responseEntity != null ) + return responseEntity; + + shouldShutdownService = false; + // Send "cancelShutdownWorker" requests to all Workers. + for ( String workerId : UrlsController.workersInfoMap.keySet() ) + shutdownService.postShutdownOrCancelRequestToWorker(workerId, UrlsController.workersInfoMap.get(workerId).getWorkerIP(), true); + + String finalMsg = "Any previous \"shutdownService\"-request is canceled."; + logger.info(initMsg + finalMsg); + return ResponseEntity.ok().body(finalMsg + "\n"); + } + + + @PostMapping("workerShutdownReport") + public ResponseEntity workerShutdownReport(@RequestParam String workerId, HttpServletRequest request) + { + String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\"."; + WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); + if ( workerInfo == null ) { + String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service"; + logger.warn(initMsg + "\n" + errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + String remoteAddr = request.getHeader("X-FORWARDED-FOR"); + if ( remoteAddr == null || remoteAddr.isEmpty() ) + remoteAddr = request.getRemoteAddr(); + + if ( ! remoteAddr.equals(workerInfo.getWorkerIP()) ) { + logger.error(initMsg + " The request came from another IP: " + remoteAddr + " | while this worker was registered with the IP: " + workerInfo.getWorkerIP()); + return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); + } + + logger.info(initMsg); + + workerInfo.setHasShutdown(true); // This will update the map. + + // Return "HTTP-OK" to this worker and wait for the scheduler to check and shutdown the service. + return ResponseEntity.ok().build(); + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 2853380..2200e26 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -76,6 +76,15 @@ public class UrlsController { assignmentsLimit = assignmentLimit; } + if ( ShutdownController.shouldShutdownService ) { + // There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT: + // 1) A worker requests for new assignments before the shutDown request in handled by its side. + // 2) A new Worker joins the Service (unexpected, but anyway). + String warnMsg = "The Service is about to shutdown, after all under-processing assignments are handled. No new requests are accepted!"; + logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted. + return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); // The worker will wait 15 mins and upon going to retry it will notice that it should not do a new request or it may have shutdown in the meantime. + } + if ( request == null ) { logger.error("The \"HttpServletRequest\" is null!"); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java new file mode 100644 index 0000000..0fa8817 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java @@ -0,0 +1,13 @@ +package eu.openaire.urls_controller.services; + +import org.springframework.http.ResponseEntity; + +import javax.servlet.http.HttpServletRequest; + +public interface ShutdownService { + + ResponseEntity passSecurityChecks(HttpServletRequest request, String initMsg); + + boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel); + +} diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java new file mode 100644 index 0000000..2e86977 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java @@ -0,0 +1,75 @@ +package eu.openaire.urls_controller.services; + +import eu.openaire.urls_controller.controllers.UrlsController; +import eu.openaire.urls_controller.util.UriBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.client.HttpServerErrorException; +import org.springframework.web.client.RestTemplate; + +import javax.servlet.http.HttpServletRequest; +import java.net.ConnectException; + + +@Service +public class ShutdownServiceImpl implements ShutdownService { + + + private static final Logger logger = LoggerFactory.getLogger(ShutdownServiceImpl.class); + + + public ResponseEntity passSecurityChecks(HttpServletRequest request, String initMsg) + { + if ( request == null ) { + logger.error(initMsg + "The \"HttpServletRequest\" is null!"); + return ResponseEntity.internalServerError().build(); + } + String remoteAddr = request.getHeader("X-FORWARDED-FOR"); + if ( remoteAddr == null || remoteAddr.isEmpty() ) + remoteAddr = request.getRemoteAddr(); + + // In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container, + // then the request will appear coming from a local IP (192.168.X.Y), instead of localhost. + if ( ! (remoteAddr.equals("127.0.0.1") || remoteAddr.startsWith("192.168.") || remoteAddr.equals(UriBuilder.ip) ) ) { + logger.error(initMsg + "The request came from another IP: " + remoteAddr + " | while the Controller has the IP: " + UriBuilder.ip); + return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); + } + + return null; // The checks are passing. + } + + + public boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel) + { + String url = "http://" + workerIp + ":1881/api/" + (shouldCancel ? "cancelShutdownWorker" : "shutdownWorker"); + try { + ResponseEntity responseEntity = new RestTemplate().postForEntity(url, null, String.class); + int responseCode = responseEntity.getStatusCodeValue(); + if ( responseCode != HttpStatus.OK.value() ) { + logger.error("HTTP-Connection problem with the submission of the \"postShutdownOrCancelRequestToWorker\" of worker \"" + workerId + "\"! Error-code was: " + responseCode); + return false; + } else + return true; + } catch (HttpServerErrorException hsee) { + logger.error("The Worker \"" + workerId + "\" failed to handle the \"postShutdownOrCancelRequestToWorker\": " + hsee.getMessage()); + return false; + } catch (Exception e) { + // The Spring RestTemplate may return a "ResourceAccessException", but the actual cause will has to be identified, in order to set the worker as shutdown. + Throwable cause = e.getCause(); // No need to check explicitly for null. + if ( cause instanceof ConnectException ) { // This includes the "ConnectException". + logger.error("Got a \"ConnectException\" when doing a \"postShutdownOrCancelRequestToWorker\", to the Worker: \"" + workerId + "\". | Will register this worker as \"shutdown\".\n" + cause.getMessage()); + UrlsController.workersInfoMap.get(workerId).setHasShutdown(true); + } else { + logger.error("Error for \"postShutdownOrCancelRequestToWorker\", to the Worker: " + workerId, e); + // TODO - What should we do? If there was some error from the Controller, side, it does not mean that the worker has shutdown.. + // For now, let's handle that case manually, by check with that specific worker and sending it a shutdownRequest from inside its VM. + // Then the Worker will automatically send a "shutdownReport" to the Controller, causing it to shutdown (when all other workers have shutdown as well). + } + return false; + } + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 6920bfa..d071076 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,6 +2,7 @@ server: port: 1880 servlet: context-path: /api + shutdown: graceful services: pdfaggregation: @@ -64,6 +65,9 @@ spring: output: ansi: enabled: always + lifecycle: + timeout-per-shutdown-phase: 2m + hdfs: baseUrl: XA