UrlsController/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java

269 lines
15 KiB
Java

package eu.openaire.urls_controller.controllers;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.BulkImportResponse;
import eu.openaire.urls_controller.services.BulkImportService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.MissingServletRequestParameterException;
import org.springframework.web.bind.annotation.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.file.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@RestController
@RequestMapping("")
public class BulkImportController {
private static final Logger logger = LoggerFactory.getLogger(BulkImportController.class);
@Autowired
private FileUtils fileUtils;
private final BulkImportService bulkImportService;
private final String baseBulkImportLocation;
private final String bulkImportReportLocation;
private final HashMap<String, BulkImport.BulkImportSource> bulkImportSources;
public static final Set<String> bulkImportDirsUnderProcessing = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static int numOfThreadsForBulkImportProcedures;
public static ExecutorService bulkImportExecutor;
public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport)
{
String bulkImportReportLocationTemp;
this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation();
this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
bulkImportReportLocationTemp = bulkImport.getBulkImportReportLocation();
if ( !bulkImportReportLocationTemp.endsWith("/") )
bulkImportReportLocationTemp += "/";
this.bulkImportReportLocation = bulkImportReportLocationTemp;
this.bulkImportService = bulkImportService;
numOfThreadsForBulkImportProcedures = bulkImport.getNumOfThreadsForBulkImportProcedures();
if ( numOfThreadsForBulkImportProcedures <= 0 )
throw new RuntimeException("The given \"numOfThreadsForBulkImportProcedures\" is not a positive number: " + numOfThreadsForBulkImportProcedures);
logger.info("Will use " + numOfThreadsForBulkImportProcedures + " threads for the bulk-import procedures.");
bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsForBulkImportProcedures); // At most < numOfThreadsForBulkImportProcedures > threads will be used per bulk-import procedure..
}
private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+/)$");
// This method shows the parameters which are missing when dealing with the bulk-import API.
// Spring Boot does not show any specific messages to the user (like stacktraces), for security reasons.
@ExceptionHandler(MissingServletRequestParameterException.class)
public ResponseEntity<?> handleMissingParams(MissingServletRequestParameterException ex) {
return ResponseEntity.badRequest().body(String.format("Missing parameter: %s\n", ex.getParameterName()));
}
@PostMapping("bulkImportFullTexts")
public ResponseEntity<?> bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish)
{
BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance);
if ( bulkImportSource == null ) {
String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg); // It's the user's fault that gave an unsupported provenance.
}
// Check if the given directory parameter exists.
if ( bulkImportDir.isEmpty() ) {
String errorMsg = "The \"bulkImportDir\" was missing from the request!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolute path.
// Make sure the whole path ends with "/", so that we can easily append file-names later.
if ( !bulkImportDir.endsWith("/") )
bulkImportDir += "/";
String relativeBulkImportDir = null;
// Check if we have "relative-path" so that we can append it to the "baseBulkImportLocation".
if ( !bulkImportDir.startsWith("/") ) {
// A relative path was given.
relativeBulkImportDir = bulkImportDir;
bulkImportDir = baseBulkImportLocation + bulkImportDir;
} else {
String errMsg = "The bulkImportDir \"" + bulkImportDir + "\" was problematic!";
Matcher matcher = LAST_DIR_REGEX.matcher(bulkImportDir);
if ( !matcher.matches() ) {
logger.error(errMsg);
return ResponseEntity.badRequest().body(errMsg);
}
relativeBulkImportDir = matcher.group(1);
if ( (relativeBulkImportDir == null) || relativeBulkImportDir.isEmpty() ) {
logger.error(errMsg);
return ResponseEntity.badRequest().body(errMsg);
}
}
// The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail.
logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\".");
// Check whether the given directory is accessible.
File givenDir = new File(bulkImportDir);
if ( !givenDir.isDirectory() ) {
String errorMsg = "The bulkImportDir \"" + bulkImportDir + "\" is invalid!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
// Efficiently check if the dir is empty, without loading all the file-entries in memory.
try ( DirectoryStream<Path> directory = Files.newDirectoryStream(givenDir.toPath()) ) {
if ( !directory.iterator().hasNext() ) {
String errorMsg = "The givenDir \"" + givenDir + "\" is empty!";
logger.warn(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
// The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files.
// The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure has been accepted.
} catch (Exception e) {
String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!";
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
// After applying all the logic-checks and informing the user of any mistake, then we check and inform if the Service is about to shut down.
if ( ShutdownController.shouldShutdownService ) {
String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!";
logger.warn(warnMsg);
return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg);
}
// Detect if the same directory is scheduled for being processed. In that case, return a 429.
if ( ! bulkImportDirsUnderProcessing.add(bulkImportDir) ) {
// We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory.
String errorMsg = "There is a bulk-import request for the directory \"" + bulkImportDir + "\" that is being handled at the moment. Please wait until it's finished being processed, before making another request.";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg);
}
Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance, relativeBulkImportDir);
try {
Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir;
logger.error(errorMsg, e);
bulkImportDirsUnderProcessing.remove(bulkImportDir);
return ResponseEntity.internalServerError().body(errorMsg);
}
// Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir".
String bulkImportReportID = provenance + "/" + relativeBulkImportDir + "report_" + GenericUtils.getRandomNumber(10000, 99999);
String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json";
String msg = "The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be submitted for execution. "
+ (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.")
+ " You can request a report at any moment, using the reportID.";
BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID);
bulkImportReport.addEvent(msg);
String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport(), true);
if ( errorMsg != null ) {
bulkImportDirsUnderProcessing.remove(bulkImportDir);
return ResponseEntity.internalServerError().body(errorMsg);
}
logger.info(msg + " \"bulkImportReportID\": " + bulkImportReportID);
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir;
String finalRelativeBulkImportDir = relativeBulkImportDir;
try {
UrlsController.futuresOfBackgroundTasks.add(
UrlsController.backgroundExecutor.submit(
() -> bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
)
);
} catch (RejectedExecutionException ree) {
errorMsg = "The bulkImport request for bulkImportReportLocation \"" + bulkImportReportLocation + "\" and provenance " + provenance + " has failed to be executed!";
bulkImportReport.addEvent(msg);
logger.error(errorMsg, ree);
return ResponseEntity.internalServerError().body(errorMsg);
}
// This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes.
return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json".
}
@GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> getBulkImportReport(@RequestParam("id") String bulkImportReportId, @RequestParam(name = "pretty", defaultValue = "false") boolean prettyFormatting)
{
logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : ""));
// Even if the Service is set to shut down soon, we allow this endpoint to return the report up to the last minute,
// since the Service may be up for another hour, running a bulk-import procedure, for which we want to check its progress.
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.
final StringBuilder stringBuilder = new StringBuilder(25_000);
String line;
FileUtils.fileAccessLock.lock();
try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) {
while ( (line = in.readLine()) != null )
stringBuilder.append(line).append(GenericUtils.endOfLine); // The "readLine()" does not return the line-term char.
} catch (NoSuchFileException nsfe) {
logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!");
return ResponseEntity.notFound().build();
} catch (Exception e) {
String errorMsg = "Failed to read the contents of report-file with ID: " + bulkImportReportId;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path.
} finally {
FileUtils.fileAccessLock.unlock();
}
String json = stringBuilder.toString().trim();
if ( prettyFormatting ) {
final Gson gson = new GsonBuilder().setPrettyPrinting().create();
try {
json = gson.toJson(JsonParser.parseString(json));
} catch (JsonParseException jpe) {
logger.error("Problem when parsing the json-string: " + jpe.getMessage() + "\nIt is not a valid json!\n" + json);
}
}
return ResponseEntity.ok().body(json);
}
}