UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java

164 lines
9.5 KiB
Java

package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlsController {
private static final Logger logger = LoggerFactory.getLogger(UrlsController.class);
@Autowired
private UrlsService urlsService;
@Autowired
private ParquetFileUtils parquetFileUtils;
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
// TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything.
// The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP)
// (some shutdown may fail (for any reason), but that should not halt the process ?)
// after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress.
// TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process.
// This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished.
// So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller.
// TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..?
// In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker.
// TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close.
@GetMapping("")
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
if ( ShutdownController.shouldShutdownService ) {
// There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT:
// 1) A worker requests for new assignments before the shutDown request in handled by its side.
// 2) A new Worker joins the Service (unexpected, but anyway).
String warnMsg = "The Service is about to shutdown, after all under-processing assignments are handled. No new requests are accepted!";
logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted.
return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); // The worker will wait 15 mins and upon going to retry it will notice that it should not do a new request or it may have shutdown in the meantime.
}
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( (remoteAddr == null) || remoteAddr.isEmpty() )
remoteAddr = request.getRemoteAddr();
WorkerInfo workerInfo = workersInfoMap.get(workerId);
if ( workerInfo != null ) { // This worker has already been identified.
String savedWorkerIp = workerInfo.getWorkerIP();
if ( !savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
} // In this case, the worker may has previously informed the Controller it has shutdown or it may have crashed.
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP and create the remote parquet subdirectories (in HDFS).");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
// Create extra subdirectories in HDFS parquet-directories, so that the parquet directory does not become empty right before "loading" the data to the DB, in case another worker loaded multiple-worker's data to the DB.
String endingDirAndParams = workerId + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingDirAndParams) )
{
String errorMsg = "Error when creating the HDFS sub-directories for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
}
return urlsService.getAssignments(workerId, assignmentsLimit);
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
int sizeOUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
return urlsService.addWorkerReport(curWorkerId, curReportAssignments, urlReports, sizeOUrlReports, request);
}
}