From 66a5b3c7dabdce3a69e830a1345bd065cf65c7f4 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 25 Jul 2023 11:59:47 +0300 Subject: [PATCH] Update Bulk-Import API: - Increase the "numOfThreadsPerBulkImportProcedure" to 6. - Fix Bulk import not working from a second-level subdirectory; the report-subDirectory was not created. - Fix not returning the bulk-import-report as "application/json". - Add useful messages for missing parameters. - Change the HTTP-method for the "bulkImportFullTexts" endpoint to "POST". - Show a structured json-response for the "bulkImportFullTexts" endpoint. - Fix uncommon date-format. - Remove single quotes from json-report, since they are returned as bytes, not characters. - Optimize the generation of the json-bulkImport-report. --- .../controllers/BulkImportController.java | 39 +++++++++----- .../models/BulkImportReport.java | 2 +- .../models/BulkImportResponse.java | 51 +++++++++++++++++++ .../services/BulkImportServiceImpl.java | 34 ++++++------- .../urls_controller/util/GenericUtils.java | 2 +- src/main/resources/application.yml | 2 +- 6 files changed, 96 insertions(+), 34 deletions(-) create mode 100644 src/main/java/eu/openaire/urls_controller/models/BulkImportResponse.java diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index b0e4ce5..f904d0a 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -2,6 +2,7 @@ package eu.openaire.urls_controller.controllers; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.models.BulkImportReport; +import eu.openaire.urls_controller.models.BulkImportResponse; import eu.openaire.urls_controller.services.BulkImportService; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; @@ -9,11 +10,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.MissingServletRequestParameterException; +import org.springframework.web.bind.annotation.*; import java.io.BufferedReader; import java.io.File; @@ -71,9 +71,18 @@ public class BulkImportController { } - private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+[/]?)$"); + private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+/)$"); - @GetMapping("bulkImportFullTexts") + + // THis method shows the parameters which are missing when dealing with the bulk-import API. + // Spring Boot does not show any specific messages to the user (like stacktraces), for security reasons. + @ExceptionHandler(MissingServletRequestParameterException.class) + public ResponseEntity handleMissingParams(MissingServletRequestParameterException ex) { + return ResponseEntity.badRequest().body(String.format("Missing parameter: %s\n", ex.getParameterName())); + } + + + @PostMapping("bulkImportFullTexts") public ResponseEntity bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) { BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance); @@ -90,7 +99,7 @@ public class BulkImportController { return ResponseEntity.badRequest().body(errorMsg); } - String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolut path. + String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolute path. // Make sure the whole path ends with "/", so that we can easily append file-names later. if ( !bulkImportDir.endsWith("/") ) @@ -118,6 +127,7 @@ public class BulkImportController { } // The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail. + logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\"."); // Check whether the given directory is accessible. @@ -136,7 +146,7 @@ public class BulkImportController { return ResponseEntity.badRequest().body(errorMsg); } // The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files. - // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare. + // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure being accepted. } catch (Exception e) { String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!"; logger.error(errorMsg); @@ -158,7 +168,7 @@ public class BulkImportController { return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg); } - Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance); + Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance, relativeBulkImportDir); try { Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" } catch (Exception e) { @@ -167,12 +177,13 @@ public class BulkImportController { return ResponseEntity.internalServerError().body(errorMsg); } + // Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir". String bulkImportReportID = provenance + "/" + relativeBulkImportDir.substring(0, (relativeBulkImportDir.length() -1)) + "_report_" + GenericUtils.getRandomNumber(10000, 99999); String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json"; - String msg = "The 'bulkImportFullTexts' request for '" + provenance + "' procedure and bulkImportDir: '" + givenBulkDir + "' was accepted and will be scheduled for execution. " + String msg = "The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be scheduled for execution. " + (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.") - + " You can request a report at any moment, using this reportFileID: " + bulkImportReportID; + + " You can request a report at any moment, using the reportID."; BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID); bulkImportReport.addEvent(msg); @@ -181,7 +192,7 @@ public class BulkImportController { if ( errorMsg != null ) return ResponseEntity.internalServerError().body(errorMsg); - logger.info(msg); + logger.info(msg + " \"bulkImportReportID\": " + bulkImportReportID); // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). String finalBulkImportDir = bulkImportDir; @@ -190,11 +201,11 @@ public class BulkImportController { bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) ); - return ResponseEntity.ok().body(msg); + return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); } - @GetMapping("getBulkImportReport") + @GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity getBulkImportReport(@RequestParam("id") String bulkImportReportId) { // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. diff --git a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java index b2b0855..b175425 100644 --- a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java +++ b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java @@ -50,7 +50,7 @@ public class BulkImportReport { { //Convert the LinkedHashMultiMap to Map>, since Gson cannot serialize Multimaps. eventsMap = eventsMultimap.asMap(); - return gson.toJson(this); + return gson.toJson(this, BulkImportReport.class); } public String getProvenance() { diff --git a/src/main/java/eu/openaire/urls_controller/models/BulkImportResponse.java b/src/main/java/eu/openaire/urls_controller/models/BulkImportResponse.java new file mode 100644 index 0000000..7303a04 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/models/BulkImportResponse.java @@ -0,0 +1,51 @@ +package eu.openaire.urls_controller.models; + + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonPropertyOrder({ + "message", + "reportID" +}) +public class BulkImportResponse { + + @JsonProperty("message") + String message; + + @JsonProperty("reportID") + String reportID; + + public BulkImportResponse() {} + + public BulkImportResponse(String message, String bulkImportReportID) { + this.message = message; + this.reportID = bulkImportReportID; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getReportID() { + return reportID; + } + + public void setReportID(String reportID) { + this.reportID = reportID; + } + + @Override + public String toString() { + return "BulkImportResponse{" + + "message='" + message + '\'' + + ", reportID='" + reportID + '\'' + + '}'; + } +} diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 16f115b..3c929f7 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -64,12 +64,12 @@ public class BulkImportServiceImpl implements BulkImportService { String bulkImportReportLocation = bulkImportReport.getReportLocation(); // Write to bulkImport-report file. - bulkImportReport.addEvent("Initializing the bulkImport '" + provenance + "' procedure with bulkImportDir '" + bulkImportDirName + "'."); + bulkImportReport.addEvent("Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + "."); // Do not write immediately to the file, wait for the following checks. if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed. && ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) { - String errorMsg = "The 'payloadsSchema' could not be parsed!"; + String errorMsg = "The payloadsSchema could not be parsed!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -113,7 +113,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before. String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir; if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick. - String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir; + String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -123,8 +123,8 @@ public class BulkImportServiceImpl implements BulkImportService { long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. - List> callables = new ArrayList<>(numOfFiles); - List> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files. + List> callableTasksForFileSegments = new ArrayList<>(numOfFiles); + List> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); @@ -132,7 +132,7 @@ public class BulkImportServiceImpl implements BulkImportService { for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; - callables.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. + callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish); }); } @@ -140,7 +140,7 @@ public class BulkImportServiceImpl implements BulkImportService { int numFailedSegments = 0; int numFailedFiles = 0; try { - List> futures = BulkImportController.bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish. + List> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish. int sizeOfFutures = futures.size(); for ( int i = 0; i < sizeOfFutures; ++i ) { try { @@ -174,7 +174,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Check the results. String msg; if ( numFailedFiles == numOfFiles ) { - String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!"; + String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -184,7 +184,7 @@ public class BulkImportServiceImpl implements BulkImportService { msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; logger.warn(msg); } else { - msg = "All " + numOfFiles + " files, from bulkImportDir '" + bulkImportDirName + "' were bulkImported successfully."; + msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully."; logger.info(msg); } bulkImportReport.addEvent(msg); @@ -202,7 +202,7 @@ public class BulkImportServiceImpl implements BulkImportService { } ImpalaConnector.databaseLock.unlock(); - String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName; + String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName; logger.info(successMsg); bulkImportReport.addEvent(successMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -222,7 +222,7 @@ public class BulkImportServiceImpl implements BulkImportService { String bulkImportReportLocation = bulkImportReport.getReportLocation(); int numOfFilesInSegment = fileLocationsSegment.size(); - String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure '" + provenance + "' | dir: '" + bulkImportDirName + "'.."; + String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName; logger.debug(msg); bulkImportReport.addEvent(msg); @@ -238,7 +238,7 @@ public class BulkImportServiceImpl implements BulkImportService { if ( record != null ) payloadRecords.add(record); else { - bulkImportReport.addEvent("An error caused the file: '" + fileLocation + "' to not be imported!"); + bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!"); failedFiles.add(fileLocation); } @@ -258,7 +258,7 @@ public class BulkImportServiceImpl implements BulkImportService { return numOfFilesInSegment; } else if ( numOfPayloadRecords != numOfFilesInSegment ) { // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. - String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + "!"; + String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + " !"; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -272,7 +272,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { - bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!"); + bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: " + parquetFileName + " !"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; @@ -284,7 +284,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Upload and insert the data to the "payload" Impala table. (no database-locking is required) String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. - bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!"); + bulkImportReport.addEvent("Could not upload the parquet-file " + parquetFileName + " to HDFS!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; @@ -308,7 +308,7 @@ public class BulkImportServiceImpl implements BulkImportService { bulkImportReport.addEvent(segmentSuccessMsg); if ( shouldDeleteFilesOnFinish ) { - segmentSuccessMsg = "As the user requested, the successfully imported files of '" + provenance + "' procedure, of bulk-import segment-" + segmentCounter + ", from directory '" + bulkImportDirName + "', will be deleted."; + segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted."; logger.info(segmentSuccessMsg); bulkImportReport.addEvent(segmentSuccessMsg); @@ -316,7 +316,7 @@ public class BulkImportServiceImpl implements BulkImportService { for ( String fileLocation : fileLocationsSegment ) { if ( !failedFiles.contains(fileLocation) ) if ( !fileUtils.deleteFile(fileLocation) ) - bulkImportReport.addEvent("The file '" + fileLocation + "' could not be deleted! Please make sure you have provided the WRITE-permission."); + bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission."); } } diff --git a/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java b/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java index 9e6cc85..ce9ac65 100644 --- a/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java @@ -6,7 +6,7 @@ import java.util.Date; public class GenericUtils { - private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss.SSS z"); + private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z"); public static String getReadableCurrentTimeAndZone() { return (simpleDateFormat.format(new Date(System.currentTimeMillis()))); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2855f0d..4c383f5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -39,7 +39,7 @@ services: bulk-import: baseBulkImportLocation: /mnt/bulk_import/ bulkImportReportLocation: /reports/bulkImportReports/ - numOfThreadsPerBulkImportProcedure: 4 + numOfThreadsPerBulkImportProcedure: 6 bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling. arxivImport: datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23