From 798fa09d6898cd1c2c1c4a9e97bdaf9441a247d3 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Thu, 15 Jun 2023 23:19:36 +0300 Subject: [PATCH] - Identify and handle a possible Worker-crash, in "UrlsServiceImpl.postReportResultToWorker()". - Add/Improve some log messages. - Update and cleanup dependencies. - Code polishing. --- README.md | 4 ++-- build.gradle | 6 ++---- .../components/ScheduledTasks.java | 13 ++++++++---- .../controllers/ShutdownController.java | 14 ++++++++----- .../controllers/StatsController.java | 4 ++-- .../controllers/UrlsController.java | 4 ++-- .../services/ShutdownService.java | 4 +--- .../services/ShutdownServiceImpl.java | 10 +-------- .../services/UrlsServiceImpl.java | 21 ++++++++++++------- .../urls_controller/util/FileUtils.java | 2 +- 10 files changed, 43 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 30d8a68..e7c2fad 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # UrlsController -The Controller's Application receives requests coming from the [Workers](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) , constructs an assignments-list with data received from a database and returns the list to the workers.
+The Controller's Application receives requests coming from the [**Workers**](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) (deployed on the cloud), constructs an assignments-list with data received from a database and returns the list to the workers.
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.

It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.

-For interacting with the database we use [Impala](https://impala.apache.org/).
+For interacting with the database we use [**Impala**](https://impala.apache.org/).

**BulkImport API**: diff --git a/build.gradle b/build.gradle index 445aacf..7fcad05 100644 --- a/build.gradle +++ b/build.gradle @@ -40,10 +40,8 @@ dependencies { // Enable the validation annotations. //implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2' - implementation "org.projectlombok:lombok:1.18.28" - // https://mvnrepository.com/artifact/com.google.guava/guava - implementation group: 'com.google.guava', name: 'guava', version: '32.0.0-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' @@ -117,7 +115,7 @@ dependencies { // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus - runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.0' + runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.1' testImplementation 'org.springframework.security:spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 2bd029f..f785ec4 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -16,6 +16,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -90,11 +91,15 @@ public class ScheduledTasks { // If the workers have shutdown on their own, without been instructed to by the Controller, then the Controller will keep running. - for ( String workerId : UrlsController.workersInfoMap.keySet() ) - if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value. - return; // If at least 1 worker is still active, then do not shut down the server. + Set workerIds = UrlsController.workersInfoMap.keySet(); + if ( workerIds.size() > 0 ) { + for ( String workerId : workerIds ) + if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value. + return; // If at least 1 worker is still active, then do not shut down the Controller. + logger.info("All workers have already shutdown. Shutting down the Controller.."); + } else + logger.info("No workers have participated in the service yet, so the Controller will shut-down immediately."); - logger.info("All workers have already shutdown. Shutting down the Controller.."); Application.gentleAppShutdown(); } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java index 50bcfae..b8be1c8 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java @@ -31,8 +31,10 @@ public class ShutdownController { @PostMapping("shutdownService") public ResponseEntity shutdownServiceGracefully(HttpServletRequest request) { - String initMsg = "Received a \"shutdownService\" request. "; - ResponseEntity responseEntity = shutdownService.passSecurityChecks(request, initMsg); + String initMsg = "Received a \"shutdownService\" request "; + String remoteAddr = GenericUtils.getRequestorAddress(request); + initMsg += "from [" + remoteAddr + "]. "; + ResponseEntity responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg); if ( responseEntity != null ) return responseEntity; @@ -65,8 +67,10 @@ public class ShutdownController { @PostMapping("cancelShutdownService") public ResponseEntity cancelShutdownServiceGracefully(HttpServletRequest request) { - String initMsg = "Received a \"cancelShutdownService\" request. "; - ResponseEntity responseEntity = shutdownService.passSecurityChecks(request, initMsg); + String initMsg = "Received a \"cancelShutdownService\" request "; + String remoteAddr = GenericUtils.getRequestorAddress(request); + initMsg += "from [" + remoteAddr + "]. "; + ResponseEntity responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg); if ( responseEntity != null ) return responseEntity; @@ -93,7 +97,7 @@ public class ShutdownController { String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\"."; WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); if ( workerInfo == null ) { - String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service"; + String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!"; logger.warn(initMsg + "\n" + errorMsg); return ResponseEntity.badRequest().body(errorMsg); } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java index 902ef6f..7ab1c71 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java @@ -69,7 +69,7 @@ public class StatsController { // TODO - Add an endpoint to get the publication year as a param and return the number of payloads for the publications of that year. // select count(p.id) from payload p - // join publication pu on pu.id=p.id and pu.pub_year= + // join publication pu on pu.id=p.id and pu.year= @@ -79,7 +79,7 @@ public class StatsController { /* select d.id, d.name, d.type, d.allow_harvest, count(p.id) as payload_count from datasource d join publication pu on pu.datasourceid=d.id - left join payload p on p.id=pu.id -- We want the datasources with 0 payloads too. + left join payload p on p.id=pu.id -- We want the datasources with 0 payloads too, so we use "left join". group by d.id, d.name, d.type, d.allow_harvest order by payload_count desc */ diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 7aef602..2034bac 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -112,14 +112,14 @@ public class UrlsController { if ( !savedWorkerIp.equals(remoteAddr) ) { logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\"."); workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map. - } // In this case, the worker may has previously informed the Controller it has shutdown or it may have crashed. + } // In this case, the worker may have previously informed the Controller it has shutdown or it may have crashed. if ( workerInfo.getHasShutdown() ) { logger.info("The worker with id \"" + workerId + "\" was restarted."); workerInfo.setHasShutdown(false); } } else { - logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "]."); + logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory."); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); } diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java index 0fa8817..cdb0095 100644 --- a/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownService.java @@ -2,11 +2,9 @@ package eu.openaire.urls_controller.services; import org.springframework.http.ResponseEntity; -import javax.servlet.http.HttpServletRequest; - public interface ShutdownService { - ResponseEntity passSecurityChecks(HttpServletRequest request, String initMsg); + ResponseEntity passSecurityChecks(String remoteAddr, String initMsg); boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel); diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java index 62d5bcc..eff8ee7 100644 --- a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java @@ -1,7 +1,6 @@ package eu.openaire.urls_controller.services; import eu.openaire.urls_controller.controllers.UrlsController; -import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.UriBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +10,6 @@ import org.springframework.stereotype.Service; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestTemplate; -import javax.servlet.http.HttpServletRequest; import java.net.ConnectException; import java.util.regex.Pattern; @@ -26,14 +24,8 @@ public class ShutdownServiceImpl implements ShutdownService { private static final Pattern PRIVATE_IP_ADDRESSES_RFC_1918 = Pattern.compile("(?:10.|172.(?:1[6-9]|2[0-9]|3[0-1])|192.168.)[0-9.]+"); - public ResponseEntity passSecurityChecks(HttpServletRequest request, String initMsg) + public ResponseEntity passSecurityChecks(String remoteAddr, String initMsg) { - if ( request == null ) { - logger.error(initMsg + "The \"HttpServletRequest\" is null!"); - return ResponseEntity.internalServerError().build(); - } - String remoteAddr = GenericUtils.getRequestorAddress(request); - // In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container, // then the request will appear coming from a local (private) IP, instead of localhost. if ( ! (remoteAddr.equals("127.0.0.1") || remoteAddr.equals(UriBuilder.ip) || PRIVATE_IP_ADDRESSES_RFC_1918.matcher(remoteAddr).matches()) ) { diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index b3565ee..0b1b970 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -119,10 +119,10 @@ public class UrlsServiceImpl implements UrlsService { " left outer join (\n" + " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + " union all\n" + - " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + - " ) as existing\n" + // Here we access the payload-VIEW which includes the three payload-tables. + " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables. + " ) as existing\n" + " on existing.id=p.id and existing.original_url=pu.url\n" + - " where d.allow_harvest=true and existing.id is null\n" + + " where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null. ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. (" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") + " and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" + @@ -324,10 +324,10 @@ public class UrlsServiceImpl implements UrlsService { logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); // This is a very rare case. At the moment, we just move on with table-merging. } catch (RuntimeException re) { - String errorMsg = re.getMessage(); ImpalaConnector.databaseLock.lock(); String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId); ImpalaConnector.databaseLock.unlock(); + String errorMsg = re.getMessage(); if ( assignmentErrorMsg != null ) errorMsg += "\n" + assignmentErrorMsg; logger.error(errorMsg); @@ -400,8 +400,8 @@ public class UrlsServiceImpl implements UrlsService { private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery) { - String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; - String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; + final String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; + final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; try { jdbcTemplate.execute(createCurrentAssignmentsQuery); @@ -476,7 +476,14 @@ public class UrlsServiceImpl implements UrlsService { logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); return false; } catch (Exception e) { - logger.error("Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId, e); + errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId; + Throwable cause = e.getCause(); + String exMsg; + if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) { + logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!"); + workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future. + } else + logger.error(errorMsg, e); return false; } } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index de5edf6..6e4e594 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -452,7 +452,7 @@ public class FileUtils { String exMessage = e.getMessage(); logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage); if ( exMessage.contains("Connection refused") ) { - logger.error("Since we received a \"Connection refused\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); + logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); throw new RuntimeException(); } return null;