- Identify and handle a possible Worker-crash, in "UrlsServiceImpl.postReportResultToWorker()".

- Add/Improve some log messages.
- Update and cleanup dependencies.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-06-15 23:19:36 +03:00
parent 88a74b2c41
commit 798fa09d68
10 changed files with 43 additions and 39 deletions

View File

@ -1,11 +1,11 @@
# UrlsController # UrlsController
The Controller's Application receives requests coming from the [Workers](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) , constructs an assignments-list with data received from a database and returns the list to the workers.<br> The Controller's Application receives requests coming from the [**Workers**](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) (deployed on the cloud), constructs an assignments-list with data received from a database and returns the list to the workers.<br>
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.<br> Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.<br>
<br> <br>
It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.<br> It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.<br>
<br> <br>
For interacting with the database we use [Impala](https://impala.apache.org/).<br> For interacting with the database we use [**Impala**](https://impala.apache.org/).<br>
<br> <br>
**BulkImport API**: **BulkImport API**:

View File

@ -40,10 +40,8 @@ dependencies {
// Enable the validation annotations. // Enable the validation annotations.
//implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2' //implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2'
implementation "org.projectlombok:lombok:1.18.28"
// https://mvnrepository.com/artifact/com.google.guava/guava // https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '32.0.0-jre' implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
@ -117,7 +115,7 @@ dependencies {
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.0' runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.1'
testImplementation 'org.springframework.security:spring-security-test' testImplementation 'org.springframework.security:spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test" testImplementation "org.springframework.boot:spring-boot-starter-test"

View File

@ -16,6 +16,7 @@ import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -90,11 +91,15 @@ public class ScheduledTasks {
// If the workers have shutdown on their own, without been instructed to by the Controller, then the Controller will keep running. // If the workers have shutdown on their own, without been instructed to by the Controller, then the Controller will keep running.
for ( String workerId : UrlsController.workersInfoMap.keySet() ) Set<String> workerIds = UrlsController.workersInfoMap.keySet();
if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value. if ( workerIds.size() > 0 ) {
return; // If at least 1 worker is still active, then do not shut down the server. for ( String workerId : workerIds )
if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value.
return; // If at least 1 worker is still active, then do not shut down the Controller.
logger.info("All workers have already shutdown. Shutting down the Controller..");
} else
logger.info("No workers have participated in the service yet, so the Controller will shut-down immediately.");
logger.info("All workers have already shutdown. Shutting down the Controller..");
Application.gentleAppShutdown(); Application.gentleAppShutdown();
} }

View File

@ -31,8 +31,10 @@ public class ShutdownController {
@PostMapping("shutdownService") @PostMapping("shutdownService")
public ResponseEntity<?> shutdownServiceGracefully(HttpServletRequest request) public ResponseEntity<?> shutdownServiceGracefully(HttpServletRequest request)
{ {
String initMsg = "Received a \"shutdownService\" request. "; String initMsg = "Received a \"shutdownService\" request ";
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(request, initMsg); String remoteAddr = GenericUtils.getRequestorAddress(request);
initMsg += "from [" + remoteAddr + "]. ";
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg);
if ( responseEntity != null ) if ( responseEntity != null )
return responseEntity; return responseEntity;
@ -65,8 +67,10 @@ public class ShutdownController {
@PostMapping("cancelShutdownService") @PostMapping("cancelShutdownService")
public ResponseEntity<?> cancelShutdownServiceGracefully(HttpServletRequest request) public ResponseEntity<?> cancelShutdownServiceGracefully(HttpServletRequest request)
{ {
String initMsg = "Received a \"cancelShutdownService\" request. "; String initMsg = "Received a \"cancelShutdownService\" request ";
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(request, initMsg); String remoteAddr = GenericUtils.getRequestorAddress(request);
initMsg += "from [" + remoteAddr + "]. ";
ResponseEntity<?> responseEntity = shutdownService.passSecurityChecks(remoteAddr, initMsg);
if ( responseEntity != null ) if ( responseEntity != null )
return responseEntity; return responseEntity;
@ -93,7 +97,7 @@ public class ShutdownController {
String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\"."; String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\".";
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) { if ( workerInfo == null ) {
String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service"; String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!";
logger.warn(initMsg + "\n" + errorMsg); logger.warn(initMsg + "\n" + errorMsg);
return ResponseEntity.badRequest().body(errorMsg); return ResponseEntity.badRequest().body(errorMsg);
} }

View File

@ -69,7 +69,7 @@ public class StatsController {
// TODO - Add an endpoint to get the publication year as a param and return the number of payloads for the publications of that year. // TODO - Add an endpoint to get the publication year as a param and return the number of payloads for the publications of that year.
// select count(p.id) from payload p // select count(p.id) from payload p
// join publication pu on pu.id=p.id and pu.pub_year=<GIVEN_YEAR> // join publication pu on pu.id=p.id and pu.year=<GIVEN_YEAR>
@ -79,7 +79,7 @@ public class StatsController {
/* /*
select d.id, d.name, d.type, d.allow_harvest, count(p.id) as payload_count from datasource d select d.id, d.name, d.type, d.allow_harvest, count(p.id) as payload_count from datasource d
join publication pu on pu.datasourceid=d.id join publication pu on pu.datasourceid=d.id
left join payload p on p.id=pu.id -- We want the datasources with 0 payloads too. left join payload p on p.id=pu.id -- We want the datasources with 0 payloads too, so we use "left join".
group by d.id, d.name, d.type, d.allow_harvest group by d.id, d.name, d.type, d.allow_harvest
order by payload_count desc order by payload_count desc
*/ */

View File

@ -112,14 +112,14 @@ public class UrlsController {
if ( !savedWorkerIp.equals(remoteAddr) ) { if ( !savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\"."); logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map. workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
} // In this case, the worker may has previously informed the Controller it has shutdown or it may have crashed. } // In this case, the worker may have previously informed the Controller it has shutdown or it may have crashed.
if ( workerInfo.getHasShutdown() ) { if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted."); logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false); workerInfo.setHasShutdown(false);
} }
} else { } else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "]."); logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
} }

View File

@ -2,11 +2,9 @@ package eu.openaire.urls_controller.services;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import javax.servlet.http.HttpServletRequest;
public interface ShutdownService { public interface ShutdownService {
ResponseEntity<?> passSecurityChecks(HttpServletRequest request, String initMsg); ResponseEntity<?> passSecurityChecks(String remoteAddr, String initMsg);
boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel); boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel);

View File

@ -1,7 +1,6 @@
package eu.openaire.urls_controller.services; package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.UriBuilder; import eu.openaire.urls_controller.util.UriBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -11,7 +10,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import javax.servlet.http.HttpServletRequest;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -26,14 +24,8 @@ public class ShutdownServiceImpl implements ShutdownService {
private static final Pattern PRIVATE_IP_ADDRESSES_RFC_1918 = Pattern.compile("(?:10.|172.(?:1[6-9]|2[0-9]|3[0-1])|192.168.)[0-9.]+"); private static final Pattern PRIVATE_IP_ADDRESSES_RFC_1918 = Pattern.compile("(?:10.|172.(?:1[6-9]|2[0-9]|3[0-1])|192.168.)[0-9.]+");
public ResponseEntity<?> passSecurityChecks(HttpServletRequest request, String initMsg) public ResponseEntity<?> passSecurityChecks(String remoteAddr, String initMsg)
{ {
if ( request == null ) {
logger.error(initMsg + "The \"HttpServletRequest\" is null!");
return ResponseEntity.internalServerError().build();
}
String remoteAddr = GenericUtils.getRequestorAddress(request);
// In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container, // In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container,
// then the request will appear coming from a local (private) IP, instead of localhost. // then the request will appear coming from a local (private) IP, instead of localhost.
if ( ! (remoteAddr.equals("127.0.0.1") || remoteAddr.equals(UriBuilder.ip) || PRIVATE_IP_ADDRESSES_RFC_1918.matcher(remoteAddr).matches()) ) { if ( ! (remoteAddr.equals("127.0.0.1") || remoteAddr.equals(UriBuilder.ip) || PRIVATE_IP_ADDRESSES_RFC_1918.matcher(remoteAddr).matches()) ) {

View File

@ -119,10 +119,10 @@ public class UrlsServiceImpl implements UrlsService {
" left outer join (\n" + " left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" + " union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
" ) as existing\n" + // Here we access the payload-VIEW which includes the three payload-tables. " ) as existing\n" +
" on existing.id=p.id and existing.original_url=pu.url\n" + " on existing.id=p.id and existing.original_url=pu.url\n" +
" where d.allow_harvest=true and existing.id is null\n" + " where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null.
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
(" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") + (" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") +
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" + " and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" +
@ -324,10 +324,10 @@ public class UrlsServiceImpl implements UrlsService {
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging. // This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) { } catch (RuntimeException re) {
String errorMsg = re.getMessage();
ImpalaConnector.databaseLock.lock(); ImpalaConnector.databaseLock.lock();
String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId); String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
String errorMsg = re.getMessage();
if ( assignmentErrorMsg != null ) if ( assignmentErrorMsg != null )
errorMsg += "\n" + assignmentErrorMsg; errorMsg += "\n" + assignmentErrorMsg;
logger.error(errorMsg); logger.error(errorMsg);
@ -400,8 +400,8 @@ public class UrlsServiceImpl implements UrlsService {
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery) private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
{ {
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; final String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
try { try {
jdbcTemplate.execute(createCurrentAssignmentsQuery); jdbcTemplate.execute(createCurrentAssignmentsQuery);
@ -476,7 +476,14 @@ public class UrlsServiceImpl implements UrlsService {
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false; return false;
} catch (Exception e) { } catch (Exception e) {
logger.error("Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId, e); errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId;
Throwable cause = e.getCause();
String exMsg;
if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) {
logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!");
workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
} else
logger.error(errorMsg, e);
return false; return false;
} }
} }

View File

@ -452,7 +452,7 @@ public class FileUtils {
String exMessage = e.getMessage(); String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage); logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage);
if ( exMessage.contains("Connection refused") ) { if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException(); throw new RuntimeException();
} }
return null; return null;