UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java

205 lines
11 KiB
Java

package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlsController {
private static final Logger logger = LoggerFactory.getLogger(UrlsController.class);
@Autowired
private UrlsService urlsService;
@Autowired
private FileUtils fileUtils;
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final AtomicInteger numOfWorkers = new AtomicInteger(0);
public static ExecutorService backgroundExecutor;
public static final List<Future<Boolean>> futuresOfBackgroundTasks = Collections.synchronizedList(new ArrayList<>());
private final String workerReportsDirPath;
public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath,
@Value("${services.pdfaggregation.controller.numOfBackgroundThreads}") int numOfBackgroundThreads)
{
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
if ( numOfBackgroundThreads <= 0 )
throw new RuntimeException("The given \"numOfBackgroundThreads\" is not a positive number: " + numOfBackgroundThreads);
logger.info("Will use " + numOfBackgroundThreads + " threads for background tasks, such as processing worker-reports or bulk-import procedures.");
backgroundExecutor = Executors.newFixedThreadPool(numOfBackgroundThreads); // At most < numOfBackgroundThreads > tasks will be running in parallel.
}
@GetMapping("")
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested up to " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
if ( ShutdownController.shouldShutdownService ) {
// There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT:
// 1) A worker requests for new assignments before the shutDown request in handled by its side.
// 2) A new Worker joins the Service (unexpected, but anyway).
String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!";
logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted.
return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); // The worker will wait 15 mins and upon going to retry it will notice that it should not do a new request then or it may have already shutdown in the meantime.
}
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
String remoteAddr = GenericUtils.getRequestorAddress(request);
WorkerInfo workerInfo = workersInfoMap.get(workerId);
if ( workerInfo != null ) { // This worker has already been identified.
String savedWorkerIp = workerInfo.getWorkerIP();
if ( !savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
} // In this case, the worker may have previously informed the Controller it has shutdown or it may have crashed.
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
numOfWorkers.incrementAndGet();
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
numOfWorkers.incrementAndGet();
}
return urlsService.getAssignments(workerId, assignmentsLimit);
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport)
{
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
int sizeOfUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOfUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignmentsCounter = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignmentsCounter + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOfUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Make sure this worker's report directory is created.
Path currentWorkerReportLocationDir = Paths.get(this.workerReportsDirPath, curWorkerId);
try {
Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg);
}
// Create the report file and write the worker-report to it.
String workerReportFile = currentWorkerReportLocationDir + "/" + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report.json";
// In case a file with the same name already exists (e.g. from a previous run of the Service), then it will be overwritten.
logger.debug("Going to write the worker report to json-file: \"" + workerReportFile + "\".");
fileUtils.writeToFile(workerReportFile, workerReport.getJsonReport(), false); // Only one thread is writing to this specific file.
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOfUrlReports;
try {
UrlsController.futuresOfBackgroundTasks.add(
backgroundExecutor.submit(
() -> urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
)
);
} catch (RejectedExecutionException ree) {
String errorMsg = "The WorkerReport from worker \"" + curWorkerId + "\" and assignments_" + curReportAssignmentsCounter + " have failed to be executed!";
logger.error(errorMsg, ree);
return ResponseEntity.internalServerError().body(errorMsg);
}
String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution.";
logger.info(msg);
return ResponseEntity.ok().body(msg);
}
}