package eu.openaire.urls_controller.controllers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParseException; import com.google.gson.JsonParser; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.BulkImportResponse; import eu.openaire.urls_controller.services.BulkImportService; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.MissingServletRequestParameterException; import org.springframework.web.bind.annotation.*; import java.io.BufferedReader; import java.io.File; import java.io.InputStreamReader; import java.nio.file.*; import java.util.Collections; import java.util.HashMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @RestController @RequestMapping("") public class BulkImportController { private static final Logger logger = LoggerFactory.getLogger(BulkImportController.class); @Autowired private FileUtils fileUtils; private final BulkImportService bulkImportService; private final String baseBulkImportLocation; private final String bulkImportReportLocation; private final HashMap bulkImportSources; public static final Set bulkImportDirsUnderProcessing = Collections.newSetFromMap(new ConcurrentHashMap()); public static int numOfThreadsForBulkImportProcedures; public static ExecutorService bulkImportExecutor; public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport) { String bulkImportReportLocationTemp; this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation(); this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources()); bulkImportReportLocationTemp = bulkImport.getBulkImportReportLocation(); if ( !bulkImportReportLocationTemp.endsWith("/") ) bulkImportReportLocationTemp += "/"; this.bulkImportReportLocation = bulkImportReportLocationTemp; this.bulkImportService = bulkImportService; numOfThreadsForBulkImportProcedures = bulkImport.getNumOfThreadsForBulkImportProcedures(); if ( numOfThreadsForBulkImportProcedures <= 0 ) throw new RuntimeException("The given \"numOfThreadsForBulkImportProcedures\" is not a positive number: " + numOfThreadsForBulkImportProcedures); logger.info("Will use " + numOfThreadsForBulkImportProcedures + " threads for the bulk-import procedures."); bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsForBulkImportProcedures); // At most < numOfThreadsForBulkImportProcedures > threads will be used per bulk-import procedure.. } private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+/)$"); // This method shows the parameters which are missing when dealing with the bulk-import API. // Spring Boot does not show any specific messages to the user (like stacktraces), for security reasons. @ExceptionHandler(MissingServletRequestParameterException.class) public ResponseEntity handleMissingParams(MissingServletRequestParameterException ex) { return ResponseEntity.badRequest().body(String.format("Missing parameter: %s\n", ex.getParameterName())); } @PostMapping("bulkImportFullTexts") public ResponseEntity bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) { BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance); if ( bulkImportSource == null ) { String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!"; logger.error(errorMsg); return ResponseEntity.badRequest().body(errorMsg); // It's the user's fault that gave an unsupported provenance. } // Check if the given directory parameter exists. if ( bulkImportDir.isEmpty() ) { String errorMsg = "The \"bulkImportDir\" was missing from the request!"; logger.error(errorMsg); return ResponseEntity.badRequest().body(errorMsg); } String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolute path. // Make sure the whole path ends with "/", so that we can easily append file-names later. if ( !bulkImportDir.endsWith("/") ) bulkImportDir += "/"; String relativeBulkImportDir = null; // Check if we have "relative-path" so that we can append it to the "baseBulkImportLocation". if ( !bulkImportDir.startsWith("/") ) { // A relative path was given. relativeBulkImportDir = bulkImportDir; bulkImportDir = baseBulkImportLocation + bulkImportDir; } else { String errMsg = "The bulkImportDir \"" + bulkImportDir + "\" was problematic!"; Matcher matcher = LAST_DIR_REGEX.matcher(bulkImportDir); if ( !matcher.matches() ) { logger.error(errMsg); return ResponseEntity.badRequest().body(errMsg); } relativeBulkImportDir = matcher.group(1); if ( (relativeBulkImportDir == null) || relativeBulkImportDir.isEmpty() ) { logger.error(errMsg); return ResponseEntity.badRequest().body(errMsg); } } // The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail. logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\"."); // Check whether the given directory is accessible. File givenDir = new File(bulkImportDir); if ( !givenDir.isDirectory() ) { String errorMsg = "The bulkImportDir \"" + bulkImportDir + "\" is invalid!"; logger.error(errorMsg); return ResponseEntity.badRequest().body(errorMsg); } // Efficiently check if the dir is empty, without loading all the file-entries in memory. try ( DirectoryStream directory = Files.newDirectoryStream(givenDir.toPath()) ) { if ( !directory.iterator().hasNext() ) { String errorMsg = "The givenDir \"" + givenDir + "\" is empty!"; logger.warn(errorMsg); return ResponseEntity.badRequest().body(errorMsg); } // The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files. // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure has been accepted. } catch (Exception e) { String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!"; logger.error(errorMsg); return ResponseEntity.internalServerError().body(errorMsg); } // After applying all the logic-checks and informing the user of any mistake, then we check and inform if the Service is about to shut down. if ( ShutdownController.shouldShutdownService ) { String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!"; logger.warn(warnMsg); return ResponseEntity.status(HttpStatus.CONFLICT).body(warnMsg); } // Detect if the same directory is scheduled for being processed. In that case, return a 429. if ( ! bulkImportDirsUnderProcessing.add(bulkImportDir) ) { // We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory. String errorMsg = "There is a bulk-import request for the directory \"" + bulkImportDir + "\" that is being handled at the moment. Please wait until it's finished being processed, before making another request."; logger.error(errorMsg); return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg); } Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance, relativeBulkImportDir); try { Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" } catch (Exception e) { String errorMsg = "Could not create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir; logger.error(errorMsg, e); bulkImportDirsUnderProcessing.remove(bulkImportDir); return ResponseEntity.internalServerError().body(errorMsg); } // Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir". String bulkImportReportID = provenance + "/" + relativeBulkImportDir + "report_" + GenericUtils.getRandomNumber(10000, 99999); String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json"; String msg = "The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be submitted for execution. " + (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.") + " You can request a report at any moment, using the reportID."; BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID); bulkImportReport.addEvent(msg); String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport(), true); if ( errorMsg != null ) { bulkImportDirsUnderProcessing.remove(bulkImportDir); return ResponseEntity.internalServerError().body(errorMsg); } logger.info(msg + " \"bulkImportReportID\": " + bulkImportReportID); // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). String finalBulkImportDir = bulkImportDir; String finalRelativeBulkImportDir = relativeBulkImportDir; try { UrlsController.futuresOfBackgroundTasks.add( UrlsController.backgroundExecutor.submit( () -> bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) ) ); } catch (RejectedExecutionException ree) { errorMsg = "The bulkImport request for bulkImportReportLocation \"" + bulkImportReportLocation + "\" and provenance " + provenance + " has failed to be executed!"; bulkImportReport.addEvent(msg); logger.error(errorMsg, ree); return ResponseEntity.internalServerError().body(errorMsg); } // This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes. return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json". } @GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity getBulkImportReport(@RequestParam("id") String bulkImportReportId, @RequestParam(name = "pretty", defaultValue = "false") boolean prettyFormatting) { logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : "")); // Even if the Service is set to shut down soon, we allow this endpoint to return the report up to the last minute, // since the Service may be up for another hour, running a bulk-import procedure, for which we want to check its progress. // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. final StringBuilder stringBuilder = new StringBuilder(25_000); String line; FileUtils.fileAccessLock.lock(); try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) { while ( (line = in.readLine()) != null ) stringBuilder.append(line).append(GenericUtils.endOfLine); // The "readLine()" does not return the line-term char. } catch (NoSuchFileException nsfe) { logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!"); return ResponseEntity.notFound().build(); } catch (Exception e) { String errorMsg = "Failed to read the contents of report-file with ID: " + bulkImportReportId; logger.error(errorMsg, e); return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path. } finally { FileUtils.fileAccessLock.unlock(); } String json = stringBuilder.toString().trim(); if ( prettyFormatting ) { final Gson gson = new GsonBuilder().setPrettyPrinting().create(); try { json = gson.toJson(JsonParser.parseString(json)); } catch (JsonParseException jpe) { logger.error("Problem when parsing the json-string: " + jpe.getMessage() + "\nIt is not a valid json!\n" + json); } } return ResponseEntity.ok().body(json); } }