UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java

156 lines
8.7 KiB
Java
Raw Normal View History

2021-03-16 14:25:15 +01:00
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
2021-06-22 04:38:48 +02:00
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.ParquetFileUtils;
2021-03-16 14:25:15 +01:00
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
2022-01-30 21:14:52 +01:00
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
2021-06-22 04:38:48 +02:00
import org.springframework.http.HttpStatus;
2021-03-16 14:25:15 +01:00
import org.springframework.http.ResponseEntity;
2021-06-22 04:38:48 +02:00
import org.springframework.web.bind.annotation.*;
2021-03-16 14:25:15 +01:00
import javax.servlet.http.HttpServletRequest;
2022-01-30 21:14:52 +01:00
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
2021-03-16 14:25:15 +01:00
2021-03-16 14:25:15 +01:00
@RestController
@RequestMapping("/urls")
2023-03-13 11:39:39 +01:00
public class UrlsController {
2021-03-16 14:25:15 +01:00
2023-03-13 11:39:39 +01:00
private static final Logger logger = LoggerFactory.getLogger(UrlsController.class);
2021-03-16 14:25:15 +01:00
2022-01-30 21:14:52 +01:00
@Autowired
private UrlsService urlsService;
2022-01-30 21:14:52 +01:00
@Autowired
private ParquetFileUtils parquetFileUtils;
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
2022-01-31 12:49:14 +01:00
@Value("${services.pdfaggregation.controller.assignmentLimit}")
2022-01-30 21:14:52 +01:00
private int assignmentLimit;
2021-03-16 14:25:15 +01:00
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
// TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything.
// The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP)
// (some shutdown may fail (for any reason), but that should not halt the process ?)
// after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress.
// TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process.
// This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished.
// So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller.
// TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..?
// In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker.
// TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close.
2021-03-16 14:25:15 +01:00
@GetMapping("")
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) {
2021-03-16 14:25:15 +01:00
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
2022-01-30 21:14:52 +01:00
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
2021-03-16 14:25:15 +01:00
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
2022-01-30 21:14:52 +01:00
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( (remoteAddr == null) || remoteAddr.isEmpty() )
remoteAddr = request.getRemoteAddr();
WorkerInfo workerInfo = workersInfoMap.get(workerId);
if ( workerInfo != null ) { // This worker has already been identified.
String savedWorkerIp = workerInfo.getWorkerIP();
if ( savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
} // In this case, the worker may has previously informed the Controller it has shutdown or it may have crashed.
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP and create the remote parquet subdirectories (in HDFS).");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
// Create extra subdirectories in HDFS parquet-directories, so that the parquet directory does not become empty right before "loading" the data to the DB, in case another worker loaded multiple-worker's data to the DB.
String endingDirAndParams = workerId + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingDirAndParams) )
{
String errorMsg = "Error when creating the HDFS sub-directories for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
}
2023-03-07 15:55:41 +01:00
return urlsService.getAssignments(workerId, assignmentsLimit);
2021-03-16 14:25:15 +01:00
}
2021-06-22 04:38:48 +02:00
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
2021-06-22 04:38:48 +02:00
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
int sizeOUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
2021-06-22 04:38:48 +02:00
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
2021-06-22 04:38:48 +02:00
return urlsService.addWorkerReport(curWorkerId, curReportAssignments, urlReports, sizeOUrlReports, request);
}
2021-03-16 14:25:15 +01:00
}