Add the "shutdownAllWorkersGracefully" and "cancelShutdownAllWorkersGracefully" endpoints, in order to be able to shut them down at once and update them, without shutting down the whole Service. So in this case the bulk-import procedures will continue to work.
This commit is contained in:
parent
d20c9a7d2e
commit
d90ad51609
|
@ -26,6 +26,7 @@ public class ShutdownController {
|
|||
ShutdownService shutdownService;
|
||||
|
||||
public static boolean shouldShutdownService = false;
|
||||
public static boolean shouldShutdownAllWorkers = false;
|
||||
|
||||
|
||||
@PostMapping("shutdownService")
|
||||
|
@ -40,21 +41,14 @@ public class ShutdownController {
|
|||
|
||||
String endingMsg;
|
||||
if ( shouldShutdownService ) {
|
||||
endingMsg = "The controller has already received a \"shutdownService\" (which was not canceled afterwards).";
|
||||
endingMsg = "The controller has already received a \"shutdownService\" request (which was not canceled afterwards).";
|
||||
logger.info(initMsg + endingMsg);
|
||||
} else {
|
||||
shouldShutdownService = true;
|
||||
endingMsg = "The service will shutdown, after finishing current work.";
|
||||
logger.info(initMsg + endingMsg);
|
||||
|
||||
// Send "shutdownWorker" requests to all active Workers.
|
||||
for ( String workerId : UrlsController.workersInfoMap.keySet() ) {
|
||||
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
|
||||
if ( ! workerInfo.getHasShutdown() ) // A worker may have shutdown on its own (by sending it a shutDown request manually), so it will have told the Controller when it shut down. In case of a Worker-crash, the Controller will not know about it.
|
||||
shutdownService.postShutdownOrCancelRequestToWorker(workerId, workerInfo.getWorkerIP(), false);
|
||||
else
|
||||
logger.warn("Will not post ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
|
||||
}
|
||||
shutdownService.postShutdownOrCancelRequestsToAllWorkers(false);
|
||||
|
||||
// That's it for now. The workers may take some hours to finish their work (including delivering the full-text files).
|
||||
// A scheduler monitors the shutdown of the workers. Once all worker have shutdown, the Controller shuts down as well.
|
||||
|
@ -78,19 +72,67 @@ public class ShutdownController {
|
|||
String endingMsg = "Any previous \"shutdownService\"-request is canceled.";
|
||||
logger.info(initMsg + endingMsg);
|
||||
|
||||
// Send "cancelShutdownWorker" requests to all active Workers.
|
||||
for ( String workerId : UrlsController.workersInfoMap.keySet() ) {
|
||||
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
|
||||
if ( ! workerInfo.getHasShutdown() ) // A worker may have shutdown on its own (by sending it a shutDown request manually), so it will have told the Controller when it shut down. In case of a Worker-crash, the Controller will not know about it.
|
||||
shutdownService.postShutdownOrCancelRequestToWorker(workerId, workerInfo.getWorkerIP(), true);
|
||||
else
|
||||
logger.warn("Will not post CancelShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
|
||||
// Cancel the shutdown of the workers, if we are able to catch up with them before they have already shutdown..
|
||||
shutdownService.postShutdownOrCancelRequestsToAllWorkers(true);
|
||||
|
||||
return ResponseEntity.ok().body(endingMsg + GenericUtils.endOfLine);
|
||||
}
|
||||
|
||||
|
||||
// The "shutdownAllWorkers" and a "cancelShutdownAllWorkers" endpoints help when updating only the workers,
|
||||
// while keeping the Controller running and accepting bulk-import requests.
|
||||
|
||||
@PostMapping("shutdownAllWorkers")
|
||||
public ResponseEntity<?> shutdownAllWorkersGracefully(HttpServletRequest request)
|
||||
{
|
||||
String initMsg = "Received a \"shutdownAllWorkers\" request ";
|
||||
String remoteAddr = GenericUtils.getRequestorAddress(request);
|
||||
initMsg += "from [" + remoteAddr + "]. ";
|
||||
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg);
|
||||
if ( responseEntity != null )
|
||||
return responseEntity;
|
||||
|
||||
String endingMsg;
|
||||
if ( shouldShutdownAllWorkers ) {
|
||||
endingMsg = "The controller has already received a \"shutdownAllWorkers\" request (which was not canceled afterwards).";
|
||||
logger.info(initMsg + endingMsg);
|
||||
} else {
|
||||
shouldShutdownAllWorkers = true;
|
||||
endingMsg = "All workers will shutdown, after finishing current work.";
|
||||
logger.info(initMsg + endingMsg);
|
||||
|
||||
shutdownService.postShutdownOrCancelRequestsToAllWorkers(false);
|
||||
|
||||
// That's it for now. The workers may take some hours to finish their work (including delivering the full-text files).
|
||||
// The service will continue to run and handle bulk-import requests.
|
||||
// Once the workers are ready to work again, they can be started without any additional configuration.
|
||||
}
|
||||
|
||||
return ResponseEntity.ok().body(endingMsg + GenericUtils.endOfLine);
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("cancelShutdownAllWorkers")
|
||||
public ResponseEntity<?> cancelShutdownAllWorkersGracefully(HttpServletRequest request)
|
||||
{
|
||||
String initMsg = "Received a \"cancelShutdownAllWorkers\" request ";
|
||||
String remoteAddr = GenericUtils.getRequestorAddress(request);
|
||||
initMsg += "from [" + remoteAddr + "]. ";
|
||||
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg);
|
||||
if ( responseEntity != null )
|
||||
return responseEntity;
|
||||
|
||||
shouldShutdownAllWorkers = false;
|
||||
String endingMsg = "Any previous \"shutdownAllWorkers\"-request is canceled.";
|
||||
logger.info(initMsg + endingMsg);
|
||||
|
||||
// Cancel the shutdown of the workers, if we are able to catch up with them before they have already shutdown..
|
||||
shutdownService.postShutdownOrCancelRequestsToAllWorkers(true);
|
||||
|
||||
return ResponseEntity.ok().body(endingMsg + GenericUtils.endOfLine);
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("workerShutdownReport")
|
||||
public ResponseEntity<?> workerShutdownReport(@RequestParam String workerId, HttpServletRequest request)
|
||||
{
|
||||
|
|
|
@ -96,12 +96,13 @@ public class UrlsController {
|
|||
|
||||
if ( ShutdownController.shouldShutdownService ) {
|
||||
// There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT:
|
||||
// 1) A worker requests for new assignments before the shutDown request in handled by its side.
|
||||
// 1) A worker requests for new assignments before the shutDown request is handled by its side.
|
||||
// 2) A new Worker joins the Service (unexpected, but anyway).
|
||||
String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!";
|
||||
logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted.
|
||||
return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); // The worker will wait 15 mins and upon going to retry it will notice that it should not do a new request then or it may have already shutdown in the meantime.
|
||||
}
|
||||
// Do not apply any check for the "ShutdownController.shouldShutdownAllWorkers", since then we have to also make sure it is set to false after all workers have been shutdown, in order for updated workers to ba able to request assignments after they are started again..
|
||||
|
||||
if ( request == null ) {
|
||||
logger.error("The \"HttpServletRequest\" is null!");
|
||||
|
|
|
@ -6,6 +6,8 @@ public interface ShutdownService {
|
|||
|
||||
ResponseEntity<?> passSecurityChecks(String remoteAddr, String initMsg);
|
||||
|
||||
void postShutdownOrCancelRequestsToAllWorkers(boolean shouldCancel);
|
||||
|
||||
boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package eu.openaire.urls_controller.services;
|
||||
|
||||
import eu.openaire.urls_controller.controllers.UrlsController;
|
||||
import eu.openaire.urls_controller.models.WorkerInfo;
|
||||
import eu.openaire.urls_controller.util.UriBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -36,6 +37,19 @@ public class ShutdownServiceImpl implements ShutdownService {
|
|||
}
|
||||
|
||||
|
||||
public void postShutdownOrCancelRequestsToAllWorkers(boolean shouldCancel)
|
||||
{
|
||||
// Send "shutdownWorker" requests to all active Workers.
|
||||
for ( String workerId : UrlsController.workersInfoMap.keySet() ) {
|
||||
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
|
||||
if ( ! workerInfo.getHasShutdown() ) // A worker may have shutdown on its own (by sending it a shutDown request manually), so it will have told the Controller when it shut down. In case of a Worker-crash, the Controller will not know about it.
|
||||
postShutdownOrCancelRequestToWorker(workerId, workerInfo.getWorkerIP(), shouldCancel);
|
||||
else
|
||||
logger.warn("Will not post " + (shouldCancel ? "Cancel-" : "") + " ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final RestTemplate restTemplate = new RestTemplate();
|
||||
|
||||
public boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel)
|
||||
|
|
Loading…
Reference in New Issue