UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java

205 lines
11 KiB
Java

package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlsController {
private static final Logger logger = LoggerFactory.getLogger(UrlsController.class);
@Autowired
private UrlsService urlsService;
@Autowired
private FileUtils fileUtils;
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final AtomicInteger numOfActiveWorkers = new AtomicInteger(0);
public static ExecutorService backgroundExecutor;
public static final List<Future<Boolean>> futuresOfBackgroundTasks = Collections.synchronizedList(new ArrayList<>());
private final String workerReportsDirPath;
public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath,
@Value("${services.pdfaggregation.controller.numOfBackgroundThreads}") int numOfBackgroundThreads)
{
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
if ( numOfBackgroundThreads <= 0 )
throw new RuntimeException("The given \"numOfBackgroundThreads\" is not a positive number: " + numOfBackgroundThreads);
logger.info("Will use " + numOfBackgroundThreads + " threads for background tasks, such as processing worker-reports or bulk-import procedures.");
backgroundExecutor = Executors.newFixedThreadPool(numOfBackgroundThreads); // At most < numOfBackgroundThreads > tasks will be running in parallel.
}
@GetMapping("")
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested up to " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
if ( ShutdownController.shouldShutdownService ) {
// There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT:
// 1) A worker requests for new assignments, before it can handle the shutDown request given to it.
// 2) A new Worker joins the Service (unexpected, but anyway).
String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!";
logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted.
return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); // The worker will wait 15 mins and upon going to retry it will notice that it should not do a new request then or it may have already shutdown in the meantime.
}
// Do not apply any check for the "ShutdownController.shouldShutdownAllWorkers", since then we have to also make sure it is set to false after all workers have been shutdown, in order for updated workers to ba able to request assignments after they are started again..
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
String remoteAddr = GenericUtils.getRequestorAddress(request);
WorkerInfo workerInfo = workersInfoMap.get(workerId);
if ( workerInfo != null ) { // This worker has already been identified.
String savedWorkerIp = workerInfo.getWorkerIP();
if ( !savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
} // In this case, the worker may have previously informed the Controller it has shutdown or it may have crashed.
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
numOfActiveWorkers.incrementAndGet();
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
numOfActiveWorkers.incrementAndGet();
}
return urlsService.getAssignments(workerId, assignmentsLimit);
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport)
{
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
int sizeOfUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOfUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignmentsCounter = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignmentsCounter + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOfUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Make sure this worker's report directory is created.
Path currentWorkerReportLocationDir = Paths.get(this.workerReportsDirPath, curWorkerId);
try {
Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg);
}
// Create the report file and write the worker-report to it.
String workerReportFile = currentWorkerReportLocationDir + "/" + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report.json";
// In case a file with the same name already exists (e.g. from a previous run of the Service), then it will be overwritten.
logger.debug("Going to write the worker report to json-file: \"" + workerReportFile + "\".");
fileUtils.writeToFile(workerReportFile, workerReport.getJsonReport(), false); // Only one thread is writing to this specific file.
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOfUrlReports;
try {
UrlsController.futuresOfBackgroundTasks.add(
backgroundExecutor.submit(
() -> urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
)
);
} catch (RejectedExecutionException ree) {
String errorMsg = "The WorkerReport from worker \"" + curWorkerId + "\" and assignments_" + curReportAssignmentsCounter + " have failed to be executed!";
logger.error(errorMsg, ree);
return ResponseEntity.internalServerError().body(errorMsg);
}
String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and submitted for execution.";
logger.info(msg);
return ResponseEntity.ok().body(msg);
}
}